gondola_core/packets/
tof_packet.rs

1//! TofPacket provides a wrapper to write objects which implement
2//! TofPackable into files
3// The following file is part of gaps-online-software and published 
4// under the GPLv3 license
5
6use crate::prelude::*;
7
8/// Internal Tof communication protocol.
9/// Simple, yet powerful
10///
11/// A TofPacket has the following layout
12/// on disk
13/// HEAD    : u16 = 0xAAAA
14/// TYPE    : u8  = PacketType
15/// SIZE    : u32
16/// PAYLOAD : [u8;6-SIZE]
17/// TAIL    : u16 = 0x5555 
18///
19/// The total packet size is thus 13 + SIZE
20#[derive(Debug, Clone)]
21#[cfg_attr(feature="pybindings", pyclass)]
22pub struct TofPacket {
23  /// Type of the structure encoded in payload
24  pub packet_type        : TofPacketType,
25  /// The bytestream encoded structure
26  pub payload            : Vec<u8>,
27  // fields which won't get serialized
28  /// mark a packet as not eligible to be written to disk
29  pub no_write_to_disk   : bool,
30  /// mark a packet as not eligible to be sent over network 
31  /// FIXME - future extension
32  pub no_send_over_nw    : bool,
33  /// creation_time for the instance
34  pub creation_time      : Instant,
35  pub valid              : bool, // will be always valid, unless invalidated
36}
37
38impl fmt::Display for TofPacket {
39  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
40    let p_len = self.payload.len();
41    if p_len < 4 {
42      write!(f, "<TofPacket: type {:?}, payload size {}>", self.packet_type, p_len)
43    } else {
44      write!(f, "<TofPacket: type {:?}, payload [ {} {} {} {} .. {} {} {} {}] of size {} >",
45             self.packet_type,
46             self.payload[0], self.payload[1], self.payload[2], self.payload[3],
47             self.payload[p_len-4], self.payload[p_len-3], self.payload[p_len - 2], self.payload[p_len-1], p_len ) 
48    }
49  }
50}
51
52impl Default for TofPacket {
53  fn default() -> Self {
54    Self::new()
55  }
56}
57
58/// Implement because TofPacket saves the creation time, 
59/// which never will be the same for 2 different instances
60impl PartialEq for TofPacket {
61  fn eq(&self, other: &Self) -> bool {
62    (self.packet_type == other.packet_type)           &&
63    (self.payload == other.payload)                   &&
64    (self.no_write_to_disk == other.no_write_to_disk) &&
65    (self.no_send_over_nw == other.no_send_over_nw)   &&
66    (self.valid == other.valid)
67  }
68}
69
70impl TofPacket {
71
72  pub fn new() -> Self {
73    let creation_time = Instant::now();
74    Self {
75      packet_type      : TofPacketType::Unknown,
76      payload          : Vec::<u8>::new(),
77      no_write_to_disk : false,
78      no_send_over_nw  : false,
79      creation_time    : creation_time,
80      valid            : true,
81    }
82  }
83
84  /// Generate a bytestream of self for ZMQ, prefixed with 
85  /// BRCT so all RBs will see it
86  pub fn zmq_payload_brdcast(&self) -> Vec<u8> {
87    let mut payload     = String::from("BRCT").into_bytes(); 
88    let mut stream  = self.to_bytestream();
89    payload.append(&mut stream);
90    payload
91  }
92  
93  /// Generate a bytestream of self for ZMQ, prefixed with 
94  /// RBX, to address only a certain board
95  pub fn zmq_payload_rb(&self, rb_id : u8) -> Vec<u8> {
96    let mut payload     = format!("RB{:02}", rb_id).into_bytes(); 
97    let mut stream  = self.to_bytestream();
98    payload.append(&mut stream);
99    payload
100  }
101
102  /// Unpack the TofPacket and return its content
103  pub fn unpack<T>(&self) -> Result<T, SerializationError>
104    where T: TofPackable + Serialization {
105    // if the alternative tof packet type is unknown, it must 
106    // be of type TOF_PACKET_TYPE, alternatively either one 
107    // of Both
108    if T::TOF_PACKET_TYPE_ALT != TofPacketType::Unknown {
109      if T::TOF_PACKET_TYPE_ALT == self.packet_type {
110        let unpacked : T = T::from_bytestream_alt(&self.payload, &mut 0)?;
111        return Ok(unpacked); 
112      } else if T::TOF_PACKET_TYPE == self.packet_type{
113        let unpacked : T = T::from_bytestream(&self.payload, &mut 0)?;
114        return Ok(unpacked); 
115      } else {
116        error!("This packet of type {} is neither for a {} nor a {}  packet!", self.packet_type, T::TOF_PACKET_TYPE, T::TOF_PACKET_TYPE_ALT);
117        return Err(SerializationError::IncorrectPacketType); 
118      }
119    } else {
120      if T::TOF_PACKET_TYPE != self.packet_type {
121        error!("This bytestream is not for a {} packet!", self.packet_type);
122        return Err(SerializationError::IncorrectPacketType);
123      }
124    }
125    let unpacked : T = T::from_bytestream(&self.payload, &mut 0)?;
126    Ok(unpacked)
127  }
128  
129  pub fn age(&self) -> u64 {
130    self.creation_time.elapsed().as_secs()
131  }
132}
133
134
135impl Serialization for TofPacket {
136  const HEAD : u16 = 0xaaaa;
137  const TAIL : u16 = 0x5555;
138  const SIZE : usize = 0; // FIXME - size/prelude_size 
139
140  fn from_bytestream(stream : &Vec<u8>, pos : &mut usize)
141  -> Result<Self, SerializationError> {
142    if stream.len() < 2 {
143      return Err(SerializationError::StreamTooShort);
144    }
145    let head = parse_u16(stream, pos);
146    if Self::HEAD != head {
147      error!("TofPacket does not start with HEAD signature! {}", Self::HEAD);
148      return Err(SerializationError::HeadInvalid);
149    }
150    let packet_type : TofPacketType;
151    let packet_type_enc = parse_u8(stream, pos);
152    match TofPacketType::try_from(packet_type_enc) {
153      Ok(pt) => packet_type = pt,
154      Err(_) => {
155        error!("Can not decode packet with packet type {}", packet_type_enc);
156        return Err(SerializationError::UnknownPayload);}
157    }
158    let payload_size = parse_u32(stream, pos) as usize;
159    *pos += payload_size; 
160    let tail = parse_u16(stream, pos);
161    if Self::TAIL != tail {
162      error!("Packet does not end with TAIL signature");
163      return Err(SerializationError::TailInvalid);
164    }
165    *pos -= 2; // for tail parsing
166    *pos -= payload_size;
167
168    let mut tp = TofPacket::new();
169    tp.packet_type = packet_type;
170    tp.payload.extend_from_slice(&stream[*pos..*pos+payload_size]);
171    // Fix position marker
172    *pos += 2 + payload_size;
173    Ok(tp) 
174  }
175  
176  fn to_bytestream(&self) 
177    -> Vec<u8> {
178    let mut bytestream = Vec::<u8>::with_capacity(6 + self.payload.len());
179    bytestream.extend_from_slice(&TofPacket::HEAD.to_le_bytes());
180    let p_type = self.packet_type as u8;
181    bytestream.push(p_type);
182    // payload size of 32 bit accomodates up to 4 GB packet
183    // a 16 bit size would only hold 65k, which might be not
184    // good enough if we sent multiple events in a batch in 
185    // the same TofPacket (in case we do that)
186    let payload_len = self.payload.len() as u32;
187    //let foo = &payload_len.to_le_bytes();
188    //debug!("TofPacket binary payload: {foo:?}");
189    bytestream.extend_from_slice(&payload_len.to_le_bytes());
190    bytestream.extend_from_slice(self.payload.as_slice());
191    bytestream.extend_from_slice(&TofPacket::TAIL.to_le_bytes());
192    bytestream
193  }
194}
195
196#[cfg(feature="random")]
197impl FromRandom for TofPacket {
198
199  fn from_random() -> Self {
200    // FIXME - this should be an actual, realistic
201    // distribution
202    let choices = [
203      TofPacketType::TofEvent,
204      TofPacketType::TofEvent,
205      TofPacketType::TofEvent,
206      TofPacketType::RBWaveform,
207      TofPacketType::RBWaveform,
208      TofPacketType::TofEvent,
209      TofPacketType::TofEvent,
210      TofPacketType::TofEvent,
211      TofPacketType::TofEvent,
212      TofPacketType::TofEvent,
213      TofPacketType::TofEvent,
214      TofPacketType::MasterTrigger,
215      TofPacketType::MasterTrigger,
216      TofPacketType::MasterTrigger,
217      TofPacketType::RBMoniData,
218      TofPacketType::PBMoniData,
219      TofPacketType::LTBMoniData,
220      TofPacketType::PAMoniData,
221      TofPacketType::CPUMoniData,
222      TofPacketType::MtbMoniData,
223    ];
224    let mut rng  = rand::rng();
225    let idx = rng.random_range(0..choices.len());
226    let packet_type = choices[idx];
227    match packet_type {
228      TofPacketType::TofEvent => {
229        let te = TofEvent::from_random();
230        return te.pack()
231      }
232      TofPacketType::RBWaveform => {
233        let te = RBWaveform::from_random();
234        return te.pack()
235      }
236      TofPacketType::RBMoniData => {
237        let te = RBMoniData::from_random();
238        return te.pack()
239      }
240      TofPacketType::PAMoniData => {
241        let te = PAMoniData::from_random();
242        return te.pack()
243      }
244      TofPacketType::LTBMoniData => {
245        let te = LTBMoniData::from_random();
246        return te.pack()
247      }
248      TofPacketType::PBMoniData => {
249        let te = PBMoniData::from_random();
250        return te.pack()
251      }
252      TofPacketType::CPUMoniData => {
253        let te = CPUMoniData::from_random();
254        return te.pack()
255      }
256      TofPacketType::MtbMoniData  => {
257        let te = MtbMoniData::from_random();
258        return te.pack()
259      }
260      _ => {
261        let te = TofEvent::from_random();
262        return te.pack()
263      }
264    }
265  }
266}
267
268impl Frameable for TofPacket {
269  const CRFRAMEOBJECT_TYPE : CRFrameObjectType = CRFrameObjectType::TofPacket;
270}
271
272#[cfg(feature="pybindings")]
273#[pymethods]
274impl TofPacket {
275
276  #[getter]
277  fn get_packet_type(&self) -> TofPacketType {
278    self.packet_type
279  }
280 
281  // FIXME - trust in te process that it referenceces te input vector and not clones it
282  /// Factory function for TofPackets
283  ///
284  /// # Arguments:
285  ///
286  ///   * stream    : bytes presumably representing
287  ///                 a TofPacket
288  ///   * start_pos : the assumed position of 
289  ///                 HEAD identifier in the
290  ///                 bytestream (start of 
291  ///                 TofPacket)
292  #[staticmethod]
293  #[pyo3(name = "from_bytestream")]
294  fn from_bytestream_py<'_py>(stream : Vec<u8>, start_pos : usize) -> PyResult<Self>{
295    let mut pos = start_pos;  
296    match Self::from_bytestream(&stream, &mut pos) {
297      Ok(tp) => {
298        return Ok(tp);
299      }
300      Err(err) => {
301        let err_msg = format!("Unable to TofPacket from bytestream! {err}");
302        return Err(PyIOError::new_err(err_msg));
303      }
304    }
305  }
306
307  #[getter]
308  #[pyo3(name="payload")]
309  fn get_payload_py(&self) -> Vec<u8> {
310    self.payload.clone()
311  }
312
313  #[pyo3(name="unpack")]
314  fn unpack_py(&self,py: Python) -> PyResult<Py<PyAny>> {
315    match self.packet_type {
316      TofPacketType::Unknown               => {
317        let msg = "TofPacket is of type 'Unknown' and thus can't be unpacked!";
318        return Err(PyValueError::new_err(msg));
319      }, 
320      TofPacketType::RBEvent               => {
321        match self.unpack::<RBEvent>() {
322          Ok(data) => {
323            return Ok(Py::new(py, data)?.into_any());
324          }
325          Err(err) => {
326            return Err(PyValueError::new_err(err.to_string()));
327          }
328        }
329      }, 
330      TofPacketType::TofEventDeprecated  => {
331        match self.unpack::<TofEvent>() {
332          Ok(data) => {
333            return Ok(Py::new(py, data)?.into_any());
334          }
335          Err(err) => {
336            return Err(PyValueError::new_err(err.to_string()));
337          }
338        }
339      }, 
340      TofPacketType::RBWaveform               => {
341        match self.unpack::<RBWaveform>() {
342          Ok(data) => {
343            return Ok(Py::new(py, data)?.into_any());
344          }
345          Err(err) => {
346            return Err(PyValueError::new_err(err.to_string()));
347          }
348        }
349      }, 
350      TofPacketType::TofEvent               => {
351        match self.unpack::<TofEvent>() {
352          Ok(data) => {
353            return Ok(Py::new(py, data)?.into_any());
354          }
355          Err(err) => {
356            return Err(PyValueError::new_err(err.to_string()));
357          }
358        }
359      }, 
360      TofPacketType::DataSinkHB               => {
361        match self.unpack::<DataSinkHB>() {
362          Ok(data) => {
363            return Ok(Py::new(py, data)?.into_any());
364          }
365          Err(err) => {
366            return Err(PyValueError::new_err(err.to_string()));
367          }
368        }
369      }, 
370      //TofPacketType::MasterTrigger         => {}, 
371      //TofPacketType::TriggerConfig         => {},
372      //TofPacketType::MasterTriggerHB       => {}, 
373      //TofPacketType::EventBuilderHB        => {},
374      //TofPacketType::RBChannelMaskConfig   => {},
375      //TofPacketType::TofRBConfig           => {},
376      //TofPacketType::AnalysisEngineConfig  => {},
377      //TofPacketType::RBEventHeader         => {},    
378      //TofPacketType::TOFEventBuilderConfig => {},
379      //TofPacketType::DataPublisherConfig   => {},
380      //TofPacketType::TofRunConfig          => {},
381      //TofPacketType::CPUMoniData           => {},
382      //TofPacketType::MtbMoniData           => {},
383      //TofPacketType::RBMoniData            => {},
384      //TofPacketType::PBMoniData            => {},
385      //TofPacketType::LTBMoniData           => {},
386      //TofPacketType::PAMoniData            => {},
387      //TofPacketType::RBEventMemoryView     => {}, 
388      //TofPacketType::RBCalibration         => {},
389      //TofPacketType::TofCommand            => {},
390      //TofPacketType::TofCommandV2          => {},
391      //TofPacketType::TofResponse           => {},
392      //TofPacketType::RBCommand             => {},
393      //TofPacketType::RBPing                => {},
394      //TofPacketType::PreampBiasConfig      => {},
395      //TofPacketType::RunConfig             => {},
396      //TofPacketType::LTBThresholdConfig    => {},
397      //TofPacketType::TofDetectorStatus     => {},
398      //TofPacketType::ConfigBinary          => {},
399      //TofPacketType::LiftofRBBinary        => {},
400      //TofPacketType::LiftofBinaryService   => {},
401      //TofPacketType::LiftofCCBinary        => {},
402      //TofPacketType::RBCalibrationFlightV  => {},
403      //TofPacketType::RBCalibrationFlightT  => {},
404      //TofPacketType::BfswAckPacket         => {},
405      //TofPacketType::MultiPacket           => {},
406      _               => {
407        match self.unpack::<TofEvent>() {
408          Ok(data) => {
409            return Ok(Py::new(py, data)?.into_any());
410          }
411          Err(err) => {
412            return Err(PyValueError::new_err(err.to_string()));
413          }
414        }
415      }, 
416    }
417  }
418}
419
420#[cfg(feature="pybindings")]
421pythonize!(TofPacket);
422