tof_dataclasses/
ipbus.rs

1//! Implementation of the IPBus protocoll for GAPS 
2//!
3//! Documentation about the IPBus protocoll can be found here.
4//! [see docs here](https://ipbus.web.cern.ch/doc/user/html/)
5//!
6//! We are using only IPBus control packets
7//!
8
9use std::fmt;
10use std::thread;
11use std::io;
12use std::net::{
13    UdpSocket,
14    SocketAddr
15};
16
17use std::error::Error;
18use std::time::{
19    Duration,
20    Instant
21};
22
23use crate::errors::IPBusError;
24use crate::serialization::{
25    //parse_u32,
26    parse_u32_be
27};
28
29// we have some header and then the board mask (4byte)
30// + at max 20*2 byte for the individual LTBs.
31// -> guestimate says 128 byte are enough
32pub const MT_MAX_PACKSIZE        : usize = 128;
33
34/// Sleeptime between consequtive UDP queries
35/// in microsec
36pub const UDP_SOCKET_SLEEP_USEC  : u64 = 100;
37
38/// The IPBus standard encodes several packet types.
39///
40/// The packet type then will 
41/// instruct the receiver to either 
42/// write/read/etc. values from its
43/// registers.
44///
45/// Technically, the IPBusPacketType is 
46/// only 1 byte!
47#[derive(Debug, Copy, Clone, PartialEq)]
48pub enum IPBusPacketType {
49  Read                 = 0,
50  Write                = 1,
51  /// For reading multiple words,
52  /// this will read the same 
53  /// register multiple times
54  ReadNonIncrement     = 2,
55  WriteNonIncrement    = 3,
56  RMW                  = 4,
57  /// This is not following IPBus packet
58  /// specs
59  Unknown              = 99
60}
61
62impl IPBusPacketType {
63
64  pub fn to_u8(&self) -> u8 {
65    let ret_val : u8;
66    match self {
67      IPBusPacketType::Read => {
68        ret_val = 0;
69      }
70      IPBusPacketType::Write => {
71        ret_val = 1;
72      }
73      IPBusPacketType::ReadNonIncrement => {
74        ret_val = 2;
75      }
76      IPBusPacketType::WriteNonIncrement => {
77        ret_val = 3;
78      }
79      IPBusPacketType::RMW => {
80        ret_val = 4;
81      }
82      IPBusPacketType::Unknown => {
83        ret_val = 99;
84      }
85    }
86    ret_val
87  }
88
89  pub fn from_u8(ptype : u8) -> Self {
90    let ptype_val : Self;
91    match ptype {
92      0 => {ptype_val = IPBusPacketType::Read;}
93      1 => {ptype_val = IPBusPacketType::Write;}
94      2 => {ptype_val = IPBusPacketType::ReadNonIncrement;}
95      3 => {ptype_val = IPBusPacketType::WriteNonIncrement;}
96      4 => {ptype_val = IPBusPacketType::RMW;}
97      _ => {ptype_val = IPBusPacketType::Unknown;}
98    }
99    return ptype_val;
100  }
101}
102
103impl fmt::Display for IPBusPacketType {
104  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
105    let repr : String;
106    match self {
107      IPBusPacketType::Read                 => {repr = String::from("Read");} 
108      IPBusPacketType::Write                => {repr = String::from("Write");} 
109      IPBusPacketType::ReadNonIncrement     => {repr = String::from("ReadNonIncrement");} 
110      IPBusPacketType::WriteNonIncrement    => {repr = String::from("WriteNonIncrement");} 
111      IPBusPacketType::RMW                  => {repr = String::from("RMW");} 
112      IPBusPacketType::Unknown              => {repr = String::from("Unknown");}
113    }
114    write!(f, "<IPBusPacketType: {}>", repr)
115  }
116}
117
118#[derive(Debug, Clone)]
119pub struct IPBusPacket {
120  pub pid   : u16,
121  pub ptype : IPBusPacketType,
122  pub data  : [u8;MT_MAX_PACKSIZE]
123}
124
125/// Implementation of an IPBus control packet
126#[derive(Debug)]
127pub struct IPBus {
128  pub socket         : UdpSocket,
129  //pub target_address : String,
130  pub packet_type    : IPBusPacketType,
131  /// IPBus Packet ID - this is then NEXT
132  /// pid which will be sent
133  pub pid            : u16,
134  pub expected_pid   : u16,
135  pub last_pid       : u16,
136  pub buffer         : [u8;MT_MAX_PACKSIZE]
137}
138
139impl IPBus {
140  
141  pub fn new(target_address : &str) 
142    -> io::Result<Self> {
143    let socket = Self::connect(target_address)?;
144    let mut bus = Self {
145      socket         : socket,
146      //target_address : target_address,
147      packet_type    : IPBusPacketType::Read,
148      pid            : 0,
149      expected_pid   : 0,
150      last_pid       : 0,
151      buffer         : [0;MT_MAX_PACKSIZE]
152    };
153    match bus.realign_packet_id() {
154      Err(err) => {
155        error!("Packet ID realign failed! {}", err); 
156        return Err(std::io::Error::new(std::io::ErrorKind::Other, "Can not realign packet id"));
157      },
158      Ok(_) => ()
159    }
160    Ok(bus)
161  }
162
163  /// Connect to MTB Utp socket
164  ///
165  /// This will try a number of options to bind 
166  /// to the local port.
167  /// 
168  /// # Arguments 
169  ///
170  /// * target_address  : IP/port of the target 
171  ///                     probably some kind of
172  ///                     FPGA
173  pub fn connect(target_address : &str) 
174    ->io::Result<UdpSocket> {
175    // provide a number of local ports to try
176    let local_addrs = [
177      SocketAddr::from(([0, 0, 0, 0], 50100)),
178      SocketAddr::from(([0, 0, 0, 0], 50101)),
179      SocketAddr::from(([0, 0, 0, 0], 50102)),
180      SocketAddr::from(([0, 0, 0, 0], 50103)),
181      SocketAddr::from(([0, 0, 0, 0], 50104)),
182    ];
183    let local_socket = UdpSocket::bind(&local_addrs[..]);
184    let socket : UdpSocket;
185    match local_socket {
186      Err(err)   => {
187        error!("Can not create local UDP socket for master trigger connection!, err {}", err);
188        return Err(err);
189      }
190      Ok(value)  => {
191        info!("Successfully bound UDP socket for master trigger communcations to {:?}", value);
192        socket = value;
193        // this is not strrictly necessary, but 
194        // it is nice to limit communications
195        match socket.set_read_timeout(Some(Duration::from_millis(1))) {
196          Err(err) => error!("Can not set read timeout for Udp socket! {err}"),
197          Ok(_)    => ()
198        }
199        match socket.set_write_timeout(Some(Duration::from_millis(1))) {
200          Err(err) => error!("Can not set write timeout for Udp socket! {err}"),
201          Ok(_)    => ()
202        }
203        match socket.connect(target_address) {
204          Err(err) => {
205            error!("Can not connect to IPBus socket to target address {}! {}", target_address, err);
206            return Err(err);
207          }
208          Ok(_)    => info!("Successfully connected IPBus to target address {}!", target_address)
209        }
210        match socket.set_nonblocking(false) {
211          Err(err) => {
212            error!("Can not set socket to blocking mode! {err}");
213          },
214          Ok(_) => ()
215        }
216        return Ok(socket);
217      }
218    } // end match
219  }  
220
221  ///// Reconnect to the same address after timeout
222  //pub fn reconnect(&mut self) 
223  //  -> io::Result<()> {
224  //  self.socket = Self::connect(&self.target_address)?;
225  //  Ok(())
226  //}
227
228
229  /// Get the next 12bit transaction ID. 
230  /// If we ran out, wrap around and 
231  /// start at 0
232  fn get_next_pid(&mut self) -> u16 {
233    let pid = self.pid;
234    self.expected_pid = self.pid;
235    //// get the next transaction id 
236    self.pid += 1;
237    // wrap around
238    if self.pid > u16::MAX {
239      self.pid = 0;
240      return 0;
241    }
242    return pid;
243  }
244
245  /// Receive number_of_bytes from UdpSocket and sleep after
246  /// to avoid too many queries
247  pub fn receive(&mut self) -> io::Result<usize> {
248    let number_of_bytes = self.socket.recv(&mut self.buffer)?;
249    //thread::sleep(Duration::from_micros(UDP_SOCKET_SLEEP_USEC));
250    Ok(number_of_bytes)
251  }
252 
253
254  /// Receive number_of_bytes from UdpSocket and sleep after
255  /// to avoid too many queries
256  pub fn send(&mut self, data : &Vec<u8>) -> io::Result<()> {
257    self.socket.send(data.as_slice())?;
258    thread::sleep(Duration::from_micros(UDP_SOCKET_SLEEP_USEC));
259    Ok(())
260  }
261  
262
263
264  /// Send a ipbus status packet and receive the response
265  ///
266  /// Inspect self.buffer after the call if interested in 
267  /// the result.
268  pub fn get_status(&mut self) 
269    -> Result<(), Box<dyn Error>> {
270    let mut udp_data = Vec::<u8>::new();
271    let mut phead  = self.create_packetheader(true);
272    phead = phead & 0xfffffff0;
273    phead = phead | 0x00000001;
274    udp_data.extend_from_slice(&phead.to_be_bytes());
275    for _ in 0..15 {
276      udp_data.push(0);
277      udp_data.push(0);
278      udp_data.push(0);
279      udp_data.push(0);
280    }
281    let mut send_again = true;
282    let mut number_of_bytes : usize;
283    loop {
284      if send_again {
285        match self.send(&udp_data) {
286          Err(err) => error!("Unable to send udp data! {err}"),
287          Ok(_)    => ()
288        }
289      }
290      trace!("[IPBus::get_status => message {:?} sent!", udp_data);
291      match self.receive() {
292        Err(err) => {
293          error!("Can not receive status packet from Udp Socket! {err}");
294          return Err(Box::new(IPBusError::UdpReceiveFailed));
295        },
296        Ok(_number_of_bytes)    => {
297          number_of_bytes = _number_of_bytes;
298        }
299      }
300      // check if this is really a status packet
301      let status_byte = self.buffer[3];
302      if status_byte & 0x1 != 1 {
303        // not a status packet
304        //return Err(Box::new(IPBusError::NotAStatusPacket));
305        send_again = false;
306        continue;
307      } else {
308        break;
309      }
310    }
311    trace!("[IPBus::get_status] => {} bytes received!", number_of_bytes);
312    //println!("[IPBus::get_status] => buffer {:?}", self. buffer);
313    for word in 0..16 {
314      trace!("[IPBus::get_status] => WORD {word} : [{},{},{},{}]", self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4+2], self.buffer[word*4+3]);
315    }
316    Ok(())
317  }
318
319
320  /// Assemble the 32bit packet header 
321  ///
322  /// This will include the (presume) next
323  /// packet id
324  fn create_packetheader(&mut self, status : bool) -> u32 {
325    // we use this to switch the byteorder
326    let pid : u16;
327    if status {
328      pid = 0;
329    } else {
330      pid = self.get_next_pid();
331    }
332    let pid_bytes = pid.to_be_bytes(); 
333    let pid_be0   = (pid_bytes[0] as u32) << 16;
334    let pid_be1   = (pid_bytes[1] as u32) << 8;
335    let header = (0x2 << 28) as u32
336               | (0x0 << 24) as u32
337               | pid_be0
338               | pid_be1
339               | (0xf << 4) as u32
340               | 0x0 as u32; // 0 means control packet, we will 
341                             // only use control packets in GAPS
342    trace!("[IPBus::create_packetheader] => Will use packet ID {pid}");
343    trace!("[IPBus::create_packetheader] => Generated header {:?}", header.to_be_bytes());
344    header
345  }
346
347  fn create_transactionheader(&self, nwords : u8) -> u32 {
348    let header = (0x2 << 28) as u32
349               | (0x0 << 24) as u32
350               | (0x0 << 20) as u32
351               | (0x0 << 16) as u32
352               | (nwords as u32) << 8
353               | ((self.packet_type.to_u8() & 0xf) << 4) as u32
354               | 0xf as u32; // 0xf is for outbound request 
355    header
356  }
357
358  /// Encode register addresses and values in IPBus packet
359  ///
360  /// # Arguments:
361  ///
362  /// * addr        : register addresss
363  /// * packet_type : read/write register?
364  /// * data        : the data value at the specific
365  ///                 register.
366  ///                 In case packet type is Write/Read
367  ///                 len of data has to be 1
368  ///
369  fn encode_payload(&mut self,
370                    addr        : u32,
371                    data        : &Vec<u32>) -> Vec<u8> {
372    let mut udp_data = Vec::<u8>::new();
373    let pheader = self.create_packetheader(false);
374    let nwords  = data.len() as u8;
375    trace!("[IPBus::encode_payload] => Encoding payload for packet type {}!", self.packet_type);
376    let theader = self.create_transactionheader(nwords);
377    udp_data.extend_from_slice(&pheader.to_be_bytes());
378    udp_data.extend_from_slice(&theader.to_be_bytes());
379    udp_data.extend_from_slice(&addr.to_be_bytes());
380    if self.packet_type    == IPBusPacketType::Write
381     || self.packet_type == IPBusPacketType::WriteNonIncrement { 
382      for i in data {
383        udp_data.extend_from_slice(&i.to_be_bytes());
384      }
385    }
386    trace!("[IPBus::encode_payload] => payload {:?}", udp_data);
387    udp_data
388  }
389 
390  pub fn get_pid_from_current_buffer(&self) -> u16 {
391    let buffer   = self.buffer.to_vec();
392    let pheader  = parse_u32_be(&buffer, &mut 0);
393    let pid      = ((0x00ffff00 & pheader) >> 8) as u16;
394    pid
395  }
396
397  /// Unpack a binary representation of an IPBusPacket
398  ///
399  /// # Arguments:
400  ///
401  /// * message : The binary representation following 
402  ///             the specs of IPBus protocoll
403  /// * verbose : print information for debugging.
404  ///
405  fn decode_payload(&mut self,
406                    verbose : bool)
407    -> Result<Vec<u32>, IPBusError> {
408    let mut pos  : usize = 0;
409    let mut data = Vec::<u32>::new();
410    let buffer   = self.buffer.to_vec();
411    // check if this is a status packet
412    let is_status = buffer[3] & 0x1 == 1;
413    trace!("[IPBus::decode_payload] => buffer (vec) {:?}", buffer); 
414    let pheader  = parse_u32_be(&buffer, &mut pos);
415    let theader  = parse_u32_be(&buffer, &mut pos);
416    trace!("[IPBus::decode_payload] => pheader {pheader}"); 
417    trace!("[IPBus::decode_payload] => theader {theader}"); 
418    let pid      = ((0x00ffff00 & pheader) >> 8) as u16;
419    let size     = ((0x0000ff00 & theader) >> 8) as u16;
420    let ptype    = ((0x000000f0 & theader) >> 4) as u8;
421    let packet_type = IPBusPacketType::from_u8(ptype);
422    trace!("[IPBus::decode_payload] => PID, SIZE, PTYPE : {} {} {}", pid, size, packet_type);
423    if pid != self.expected_pid {
424      if !is_status {
425        error!("Invalid packet ID. Expected {}, received {}", self.expected_pid, pid);
426        // we do know that the next expected packet id should be the latest one + 1
427        //if pid == u16::MAX {
428        //  self.expected_pid = 0; 
429        //} else {
430        //  self.expected_pid = pid + 1;
431        //}
432        return Err(IPBusError::InvalidPacketID);
433      }
434    }
435    match packet_type {
436      IPBusPacketType::Unknown => {
437        return Err(IPBusError::DecodingFailed);
438      }
439      IPBusPacketType::Read |
440      IPBusPacketType::ReadNonIncrement => {
441        if (((size as usize) * 4) + 11) < MT_MAX_PACKSIZE { 
442          for i in 0..size as usize {
443            data.push(  ((self.buffer[8 + i * 4]  as u32) << 24) 
444                      | ((self.buffer[9 + i * 4]  as u32) << 16) 
445                      | ((self.buffer[10 + i * 4] as u32) << 8)  
446                      |   self.buffer[11 + i * 4]  as u32)
447          }
448        } else {
449          error!("Size {} larger than bufffer len {}", size, data.len());
450        }
451      },
452      IPBusPacketType::Write => {
453        data.push(0);
454      },
455      IPBusPacketType::WriteNonIncrement => {
456        error!("Decoding of WriteNonIncrement packet not supported!");
457      },
458      IPBusPacketType::RMW => {
459        error!("Decoding of RMW packet not supported!!");
460      }
461    }
462    if verbose { 
463      println!("[IPBus::decode_payload] ==> Decoding IPBus Packet:");
464      println!(" >> Msg            : {:?}", self.buffer);
465      //println!(" >> IPBus version  : {}", ipbus_version);
466      //println!(" >> Transaction ID : {}", tid);
467      //println!(" >> ID             : {}", id);
468      //println!(" >> Size           : {}", size);
469      //println!(" >> Type           : {:?}", packet_type);
470      //println!(" >> Info           : {}", info_code);
471      println!(" >> data           : {:?}", data);
472    }
473    Ok(data)
474  }
475
476  /// Set the packet id to that what is expected from the targetr
477  pub fn realign_packet_id(&mut self) 
478    -> Result<(), Box<dyn Error>> {
479    trace!("[IPBus::realign_packet_id] - aligning...");
480    let pid = self.get_target_next_expected_packet_id()?;
481    self.pid = pid;
482    self.expected_pid = pid;
483    ////match self.get_target_next_expected_packet_id() {
484    //  Ok(pid) => {
485    //    self.pid = pid;
486    //  }
487    //  Err(err) => {
488    //    error!("Can not get next expected packet id from target, will use 0! {err}");
489    //    self.pid = 0;
490    //  }
491    //}
492    //self.expected_pid = self.pid;
493    trace!("[IPBus::realign_packet_id] - aligned {}", self.pid);
494    Ok(())
495  }
496
497  pub fn buffer_is_status(&self) -> bool {
498    self.buffer[3] & 0x1 == 1
499  }
500
501  /// Get the packet id which is expected by the target
502  pub fn get_target_next_expected_packet_id(&mut self)
503    -> Result<u16, Box<dyn Error>> {
504    self.get_status()?;
505    // the expected packet id is in WORD 3
506    let word = 3usize;
507    trace!("[IPBus::get_status] => WORD {word} : [{},{},{},{}]", self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4+2], self.buffer[word*4+3]);
508    let word3 = [self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4 + 2], self.buffer[word*4 + 3]];
509    let target_exp_pid = u16::from_be_bytes([word3[1], word3[2]]);
510    trace!("[IPBus::target_next_pid] => Get expected packet id {target_exp_pid}");
511    Ok(target_exp_pid)
512  }
513
514
515  /// Multiple read operations with a single UDP request
516  ///
517  /// Read either the same register multiple times 
518  /// or read from  incrementing register addresses 
519  ///
520  /// # Arguments:
521  ///
522  /// * addr           : register addresss to read 
523  ///                    from
524  /// * nwords         : number of read operations
525  /// * increment_addr : if true, increment the 
526  ///                    register address after
527  ///                    each read operation
528  pub fn read_multiple(&mut self,
529                       addr           : u32,
530                       nwords         : usize,
531                       increment_addr : bool) 
532    -> Result<Vec<u32>, Box<dyn Error>> {
533    let send_data = vec![0u32;nwords];
534    if increment_addr {
535      self.packet_type = IPBusPacketType::Read;
536    } else {
537      self.packet_type = IPBusPacketType::ReadNonIncrement;
538    }
539    let mut message = self.encode_payload(addr, &send_data);
540    let mut send_again = true;
541    let timeout = Instant::now();
542    loop {
543      if send_again {
544        self.send(&message)?;
545      }
546      match self.receive() {
547        Err(_err) => {
548          // In case the address is correct, this
549          // MUST be some kind of timeout/udp issue. 
550          // We assume the packet is lost, realign
551          // the packet id and try again
552          //
553          //// this will be the last pid we have received
554          //error!("Can not receive from socket! {err}. self.pid {}, self.expected_pid {}, buffer pid {}", self.pid, self.expected_pid, pid_from_buffer);
555          self.realign_packet_id()?;
556          // we need to rewrite the message with the 
557          // new packet id
558          message    = self.encode_payload(addr, &send_data);
559          send_again = true;
560          if timeout.elapsed().as_millis() == 10 {
561            return Err(Box::new(IPBusError::UdpReceiveFailed));
562          }
563          continue;
564        }
565        Ok(number_of_bytes) => {
566          
567          if self.buffer_is_status() {
568            // if we received a stray status message, let's just try again
569            continue;
570          }
571          
572          let pid_from_buffer = self.get_pid_from_current_buffer();
573          if pid_from_buffer != self.expected_pid {
574            error!("We got a packet, but the PacketID is not as expected!");
575            error!("-- self.pid {}, self.expected_pid {}, buffer pid {}", self.pid, self.expected_pid, pid_from_buffer);
576            // we try to fix it. If it is behind, we can just call receive again
577            if self.expected_pid > pid_from_buffer {
578              send_again = false;
579              continue;
580            } else {
581              // we have missed out on messages and need to send our original message
582              // again
583              self.realign_packet_id()?;
584              message    = self.encode_payload(addr, &send_data);
585              send_again = true;
586              continue;
587            }
588          }
589          trace!("[IPBus::read] => Received {} bytes from master trigger! Message {:?}", number_of_bytes, self.buffer);
590          break;
591        }
592      } // end match
593    } // here we must have either gotten the message or 
594    let data = self.decode_payload(false)?;  
595    if data.len() == 0 {
596      error!("Received empty data!");
597      return Err(Box::new(IPBusError::DecodingFailed));
598    }
599    Ok(data)
600  }
601  
602  /// Read a single value from a register
603  ///
604  /// # Arguments:
605  ///
606  /// * addr : register address to be read from
607  pub fn read(&mut self, 
608              addr : u32) 
609    -> Result<u32, Box<dyn Error>> {
610  
611    let data  = self.read_multiple(addr,1,false)?;
612    Ok(data[0])
613  }
614  
615  /// Write a single value to a register
616  ///
617  /// # Arguments
618  ///
619  /// * addr        : target register address
620  /// * data        : word to write in register
621  pub fn write(&mut self,
622               addr   : u32,
623               data   : u32)
624      -> Result<(), Box<dyn Error>> {
625    // we don't expect any issues with sending the data
626    let send_data = Vec::<u32>::from([data]);
627    self.packet_type = IPBusPacketType::Write;
628    let message = self.encode_payload(addr, &send_data);
629    match self.send(&message) {
630      Err(err) => {
631        error!("Sending Udp message failed! {err}");
632        return Err(Box::new(IPBusError::UdpSendFailed));
633      },
634      Ok(_) => ()
635    }
636    match self.receive() {
637      // this can have two failure modes
638      // 1) we receive nothing (timeout)
639      Err(err) => {
640        //if err.kind = std::io::ErrorKind  
641        error!("Unable to receive data! i/o error : {}", err.kind());
642        let target_exp_pid = self.get_target_next_expected_packet_id()?;
643        let buffer_pid = self.get_pid_from_current_buffer();
644        error!("self.pid {}, self.expected.pid {}, target expects pid {:?}, buffer pid {} ", self.pid, self.expected_pid, target_exp_pid, buffer_pid);
645        return Err(Box::new(IPBusError::UdpReceiveFailed));
646        //if err == IPBusError::InvalidPacketID {
647      }
648      Ok(_data) => {
649        // _data is tne number of bytes received
650        trace!("[ipbus::write] => Got buffer {:?}", self.buffer);
651        return Ok(());  
652      }
653    }
654  }
655  
656}
657
658impl fmt::Display for IPBus {
659  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
660    let mut repr  = String::from("<IPBus:");
661    repr         += &(format!("  pid : {}>", self.pid)); 
662    write!(f, "{}", repr)
663  }
664}
665