tof_dataclasses/
packets.rs

1//! Packets are a way to send data over the network.
2//!
3//! Data gets serialized to a bytestream and then 
4//! header and tail bytes are added to the front and
5//! end of the stream.
6//!
7//! A TofPacket has the following layout
8//! HEAD    : u16 = 0xAAAA
9//! TYPE    : u8  = PacketType
10//! SIZE    : u32
11//! PAYLOAD : [u8;6-SIZE]
12//! TAIL    : u16 = 0x5555 
13//!
14//! The total packet size is thus 13 + SIZE
15
16pub mod packet_type;
17pub use packet_type::PacketType;
18
19use std::time::Instant;
20use std::fmt;
21// re-exports
22pub use crate::monitoring::{
23    RBMoniData,
24    PBMoniData,
25    LTBMoniData,
26    PAMoniData,
27    MtbMoniData,
28    CPUMoniData
29};
30
31use crate::serialization::{
32    Serialization, 
33    Packable,
34    parse_u8,
35    parse_u16,
36    parse_u32
37};
38
39
40//use std::error::Error;
41use crate::errors::{
42    SerializationError,
43    //PacketError
44};
45
46use crate::events::{
47    RBEventHeader,
48    RBEvent,
49    MasterTriggerEvent,
50    TofEvent,
51    RBWaveform,
52    TofEventSummary,
53};
54
55use crate::commands::{
56    TofCommand,
57};
58
59use crate::calibrations::RBCalibrations;
60
61#[cfg(feature = "random")]
62use crate::FromRandom;
63#[cfg(feature = "random")]
64use rand::Rng;
65
66/// The most basic of all packets
67///  
68/// A type and a payload. This wraps
69/// all other packets.
70///
71/// Format when in bytestream
72/// HEAD : u16
73
74/// PAYLOAD_SIZE : u32
75/// PAYLOAD      : \[u8;PAYLOAD_SIZE\]
76/// TAIL         : u16
77///
78/// => Fixed size is 13
79///
80#[derive(Debug, Clone)]
81pub struct TofPacket {
82  pub packet_type        : PacketType,
83  pub payload            : Vec<u8>,
84  // fields which won't get serialized
85  /// mark a packet as not eligible to be written to disk
86  pub no_write_to_disk   : bool,
87  /// mark a packet as not eligible to be sent over network 
88  /// FIXME - future extension
89  pub no_send_over_nw    : bool,
90  /// creation_time for the instance
91  pub creation_time    : Instant,
92  pub valid            : bool, // will be always valid, unless invalidated
93}
94
95impl fmt::Display for TofPacket {
96  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
97    let p_len = self.payload.len();
98    if p_len < 4 {
99      write!(f, "<TofPacket: type {:?}, payload size {}>", self.packet_type, p_len)
100    } else {
101      write!(f, "<TofPacket: type {:?}, payload [ {} {} {} {} .. {} {} {} {}] of size {} >",
102             self.packet_type,
103             self.payload[0], self.payload[1], self.payload[2], self.payload[3],
104             self.payload[p_len-4], self.payload[p_len-3], self.payload[p_len - 2], self.payload[p_len-1], p_len ) 
105    }
106  }
107}
108
109impl Default for TofPacket {
110  fn default() -> Self {
111    Self::new()
112  }
113}
114
115/// Implement because TofPacket saves the creation time, 
116/// which never will be the same for 2 different instances
117impl PartialEq for TofPacket {
118  fn eq(&self, other: &Self) -> bool {
119    (self.packet_type == other.packet_type)           &&
120    (self.payload == other.payload)                   &&
121    (self.no_write_to_disk == other.no_write_to_disk) &&
122    (self.no_send_over_nw == other.no_send_over_nw)   &&
123    (self.valid == other.valid)
124  }
125}
126
127impl TofPacket {
128
129  pub fn new() -> Self {
130    let creation_time = Instant::now();
131    Self {
132      packet_type      : PacketType::Unknown,
133      payload          : Vec::<u8>::new(),
134      no_write_to_disk : false,
135      no_send_over_nw  : false,
136      creation_time    : creation_time,
137      valid            : true,
138    }
139  }
140
141  /// Generate a bytestream of self for ZMQ, prefixed with 
142  /// BRCT so all RBs will see it
143  pub fn zmq_payload_brdcast(&self) -> Vec<u8> {
144    let mut payload     = String::from("BRCT").into_bytes(); 
145    let mut stream  = self.to_bytestream();
146    payload.append(&mut stream);
147    payload
148  }
149  
150  /// Generate a bytestream of self for ZMQ, prefixed with 
151  /// RBX, to address only a certain board
152  pub fn zmq_payload_rb(&self, rb_id : u8) -> Vec<u8> {
153    let mut payload     = format!("RB{:02}", rb_id).into_bytes(); 
154    let mut stream  = self.to_bytestream();
155    payload.append(&mut stream);
156    payload
157  }
158
159  /// Unpack the TofPacket and return its content
160  pub fn unpack<T>(&self) -> Result<T, SerializationError>
161    where T: Packable + Serialization {
162    if T::PACKET_TYPE != self.packet_type {
163      error!("This bytestream is not for a {} packet!", self.packet_type);
164      return Err(SerializationError::IncorrectPacketType);
165    }
166    let unpacked : T = T::from_bytestream(&self.payload, &mut 0)?;
167    Ok(unpacked)
168  }
169  
170  pub fn age(&self) -> u64 {
171    self.creation_time.elapsed().as_secs()
172  }
173}
174
175
176#[cfg(feature="random")]
177impl FromRandom for TofPacket {
178
179  fn from_random() -> Self {
180    // FIXME - this should be an actual, realistic
181    // distribution
182    let choices = [
183      PacketType::TofEvent,
184      PacketType::TofEvent,
185      PacketType::TofEvent,
186      PacketType::TofEvent,
187      PacketType::TofEvent,
188      PacketType::TofEvent,
189      PacketType::TofEvent,
190      PacketType::RBWaveform,
191      PacketType::RBWaveform,
192      PacketType::TofEventSummary,
193      PacketType::TofEventSummary,
194      PacketType::TofEventSummary,
195      PacketType::TofEventSummary,
196      PacketType::TofEventSummary,
197      PacketType::TofEventSummary,
198      PacketType::MasterTrigger,
199      PacketType::MasterTrigger,
200      PacketType::MasterTrigger,
201      PacketType::RBMoniData,
202      PacketType::PBMoniData,
203      PacketType::LTBMoniData,
204      PacketType::PAMoniData,
205      PacketType::CPUMoniData,
206      PacketType::MonitorMtb,
207    ];
208    let mut rng  = rand::thread_rng();
209    let idx = rng.gen_range(0..choices.len());
210    let packet_type = choices[idx];
211    match packet_type {
212      PacketType::TofEvent => {
213        let te = TofEvent::from_random();
214        return te.pack()
215      }
216      PacketType::TofEventSummary => {
217        let te = TofEventSummary::from_random();
218        return te.pack()
219      }
220      PacketType::RBWaveform => {
221        let te = RBWaveform::from_random();
222        return te.pack()
223      }
224      PacketType::MasterTrigger => {
225        let te = MasterTriggerEvent::from_random();
226        return te.pack()
227      }
228      PacketType::RBMoniData => {
229        let te = RBMoniData::from_random();
230        return te.pack()
231      }
232      PacketType::PAMoniData => {
233        let te = PAMoniData::from_random();
234        return te.pack()
235      }
236      PacketType::LTBMoniData => {
237        let te = LTBMoniData::from_random();
238        return te.pack()
239      }
240      PacketType::PBMoniData => {
241        let te = PBMoniData::from_random();
242        return te.pack()
243      }
244      PacketType::CPUMoniData => {
245        let te = CPUMoniData::from_random();
246        return te.pack()
247      }
248      PacketType::MonitorMtb  => {
249        let te = MtbMoniData::from_random();
250        return te.pack()
251      }
252      _ => {
253        let te = TofEvent::from_random();
254        return te.pack()
255      }
256    }
257  }
258}
259
260
261/// FIXME - all these can go away now, because we have the
262/// Packable trait! Amazing!
263impl From<&RBWaveform> for TofPacket {
264  fn from(rbwave : &RBWaveform) -> Self {
265    let mut tp     = Self::new();
266    tp.packet_type = PacketType::RBWaveform;
267    tp.payload     = rbwave.to_bytestream();
268    tp
269  }
270}
271
272impl From<&TofEventSummary> for TofPacket {
273  fn from(tsum : &TofEventSummary) -> Self {
274    let mut tp     = Self::new();
275    tp.packet_type = PacketType::TofEventSummary;
276    tp.payload     = tsum.to_bytestream();
277    tp
278  }
279}
280
281impl From<&TofEvent> for TofPacket {
282  fn from(event : &TofEvent) -> Self {
283    let mut tp = Self::new();
284    tp.packet_type = PacketType::TofEvent;
285    tp.payload = event.to_bytestream();
286    tp
287  }
288}
289
290impl From<&mut TofEvent> for TofPacket {
291  fn from(event : &mut TofEvent) -> Self {
292    let mut tp     = Self::new();
293    tp.packet_type = PacketType::TofEvent;
294    tp.payload     = event.to_bytestream();
295    tp
296  }
297}
298
299impl From<&TofCommand> for TofPacket {
300  fn from(cmd : &TofCommand) -> Self {
301    let mut tp = Self::new();
302    tp.packet_type = PacketType::TofCommand;
303    tp.payload = cmd.to_bytestream();
304    tp
305  }
306}
307
308
309impl From<&RBCalibrations> for TofPacket {
310  fn from(calib : &RBCalibrations) -> Self {
311    let mut tp = Self::new();
312    tp.packet_type = PacketType::RBCalibration;
313    tp.payload = calib.to_bytestream();
314    tp
315  }
316}
317
318impl From<&CPUMoniData> for TofPacket {
319  fn from(moni : &CPUMoniData) -> Self {
320    let mut tp = Self::new();
321    tp.packet_type = PacketType::CPUMoniData;
322    tp.payload     = moni.to_bytestream();
323    tp
324  }
325}
326
327impl From<&mut RBCalibrations> for TofPacket {
328  fn from(calib : &mut RBCalibrations) -> Self {
329    let mut tp = Self::new();
330    tp.packet_type = PacketType::RBCalibration;
331    tp.payload = calib.to_bytestream();
332    tp
333  }
334}
335
336impl From<&RBEvent> for TofPacket {
337  fn from(event : &RBEvent) -> Self {
338    let mut tp = Self::new();
339    tp.packet_type = PacketType::RBEvent;
340    tp.payload = event.to_bytestream();
341    tp
342  }
343}
344
345impl From<&MasterTriggerEvent> for TofPacket {
346  fn from(mt : &MasterTriggerEvent) -> TofPacket {
347    let mut tp     = TofPacket::new();
348    tp.packet_type = PacketType::MasterTrigger;
349    tp.payload     = mt.to_bytestream();
350    tp
351  }
352}
353
354
355//impl From<&RBMoniData> for TofPacket {
356//  fn from(moni : &RBMoniData) -> Self {
357//    let mut tp     = Self::new();
358//    tp.packet_type = PacketType::RBMoniData;
359//    tp.payload     = moni.to_bytestream();
360//    tp
361//  }
362//}
363//
364//impl From<&PBMoniData> for TofPacket {
365//  fn from(moni : &PBMoniData) -> Self {
366//    let mut tp     = Self::new();
367//    tp.packet_type = PacketType::PBMoniData;
368//    tp.payload     = moni.to_bytestream();
369//    tp
370//  }
371//}
372//impl From<&LTBMoniData> for TofPacket {
373//  fn from(moni : &LTBMoniData) -> Self {
374//    let mut tp     = Self::new();
375//    tp.packet_type = PacketType::LTBMoniData;
376//    tp.payload     = moni.to_bytestream();
377//    tp
378//  }
379//}
380//
381//impl From<&PAMoniData> for TofPacket {
382//  fn from(moni : &PAMoniData) -> Self {
383//    let mut tp     = Self::new();
384//    tp.packet_type = PacketType::PAMoniData;
385//    tp.payload     = moni.to_bytestream();
386//    tp
387//  }
388//}
389
390impl From<&MtbMoniData> for TofPacket {
391  fn from(moni : &MtbMoniData) -> TofPacket {
392    let mut tp = TofPacket::new();
393    tp.packet_type = PacketType::MonitorMtb;
394    tp.payload = moni.to_bytestream();
395    tp
396  }
397}
398
399impl From<&RBEventHeader> for TofPacket {
400  fn from(ev_header : &RBEventHeader) -> TofPacket {
401    let mut tp     = TofPacket::new();
402    tp.packet_type = PacketType::RBEventHeader;
403    tp.payload     = ev_header.to_bytestream();
404    tp
405  }
406}
407
408// I would LOOVE to implement the Packable trait here and have 
409// a matroshka doll for TofPackets. I just don't know why that 
410// would be useful. It might be leading to a new approach 
411// for multipackets
412
413impl Serialization for TofPacket {
414  const HEAD : u16 = 0xaaaa;
415  const TAIL : u16 = 0x5555;
416  const SIZE : usize = 0; // FIXME - size/prelude_size 
417
418  fn from_bytestream(stream : &Vec<u8>, pos : &mut usize)
419  -> Result<Self, SerializationError> {
420    if stream.len() < 2 {
421      return Err(SerializationError::HeadInvalid {});
422    }
423    let head = parse_u16(stream, pos);
424    if Self::HEAD != head {
425      error!("Packet does not start with HEAD signature");
426      return Err(SerializationError::HeadInvalid {});
427    }
428    let packet_type : PacketType;
429    let packet_type_enc = parse_u8(stream, pos);
430    match PacketType::try_from(packet_type_enc) {
431      Ok(pt) => packet_type = pt,
432      Err(_) => {
433        error!("Can not decode packet with packet type {}", packet_type_enc);
434        return Err(SerializationError::UnknownPayload);}
435    }
436    let payload_size = parse_u32(stream, pos);
437    *pos += payload_size as usize; 
438    let tail = parse_u16(stream, pos);
439    if Self::TAIL != tail {
440      error!("Packet does not end with TAIL signature");
441      return Err(SerializationError::TailInvalid {});
442    }
443    *pos -= 2; // for tail parsing
444    *pos -= payload_size as usize;
445
446    let mut tp = TofPacket::new();
447    tp.packet_type = packet_type;
448    tp.payload.extend_from_slice(&stream[*pos..*pos+payload_size as usize]);
449    Ok(tp) 
450  }
451  
452  fn to_bytestream(&self) 
453    -> Vec<u8> {
454    let mut bytestream = Vec::<u8>::with_capacity(6 + self.payload.len());
455    bytestream.extend_from_slice(&TofPacket::HEAD.to_le_bytes());
456    let p_type = self.packet_type as u8;
457    bytestream.push(p_type);
458    // payload size of 32 bit accomodates up to 4 GB packet
459    // a 16 bit size would only hold 65k, which might be not
460    // good enough if we sent multiple events in a batch in 
461    // the same TofPacket (in case we do that)
462    let payload_len = self.payload.len() as u32;
463    //let foo = &payload_len.to_le_bytes();
464    //debug!("TofPacket binary payload: {foo:?}");
465    bytestream.extend_from_slice(&payload_len.to_le_bytes());
466    bytestream.extend_from_slice(self.payload.as_slice());
467    bytestream.extend_from_slice(&TofPacket::TAIL.to_le_bytes());
468    bytestream
469  }
470}
471
472