tof_dataclasses/
packets.rs

1//! Packets are a way to send data over the network.
2//!
3//! Data gets serialized to a bytestream and then 
4//! header and tail bytes are added to the front and
5//! end of the stream.
6//!
7//! A TofPacket has the following layout
8//! HEAD    : u16 = 0xAAAA
9//! TYPE    : u8  = PacketType
10//! SIZE    : u32
11//! PAYLOAD : [u8;6-SIZE]
12//! TAIL    : u16 = 0x5555 
13//!
14//! The total packet size is thus 13 + SIZE
15
16pub mod packet_type;
17pub use packet_type::PacketType;
18
19use std::time::Instant;
20use std::fmt;
21// re-exports
22pub use crate::monitoring::{
23    RBMoniData,
24    PBMoniData,
25    LTBMoniData,
26    PAMoniData,
27    MtbMoniData,
28    CPUMoniData
29};
30
31use crate::serialization::{
32    Serialization, 
33    Packable,
34    parse_u8,
35    parse_u16,
36    parse_u32
37};
38
39
40//use std::error::Error;
41use crate::errors::{
42    SerializationError,
43    //PacketError
44};
45
46use crate::events::{
47    RBEventHeader,
48    RBEvent,
49    MasterTriggerEvent,
50    TofEvent,
51    RBWaveform,
52    TofEventSummary,
53};
54
55use crate::commands::{
56    TofCommand,
57};
58
59use crate::calibrations::RBCalibrations;
60
61#[cfg(feature = "random")]
62use crate::FromRandom;
63#[cfg(feature = "random")]
64use rand::Rng;
65
66/// The most basic of all packets
67///  
68/// A type and a payload. This wraps
69/// all other packets.
70///
71/// Format when in bytestream
72/// HEAD : u16
73
74/// PAYLOAD_SIZE : u32
75/// PAYLOAD      : \[u8;PAYLOAD_SIZE\]
76/// TAIL         : u16
77///
78/// => Fixed size is 13
79///
80#[derive(Debug, Clone)]
81pub struct TofPacket {
82  /// Type of the structure encoded in payload
83  pub packet_type        : PacketType,
84  /// The bytestream encoded structure
85  pub payload            : Vec<u8>,
86  // fields which won't get serialized
87  /// mark a packet as not eligible to be written to disk
88  pub no_write_to_disk   : bool,
89  /// mark a packet as not eligible to be sent over network 
90  /// FIXME - future extension
91  pub no_send_over_nw    : bool,
92  /// creation_time for the instance
93  pub creation_time    : Instant,
94  pub valid            : bool, // will be always valid, unless invalidated
95}
96
97impl fmt::Display for TofPacket {
98  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
99    let p_len = self.payload.len();
100    if p_len < 4 {
101      write!(f, "<TofPacket: type {:?}, payload size {}>", self.packet_type, p_len)
102    } else {
103      write!(f, "<TofPacket: type {:?}, payload [ {} {} {} {} .. {} {} {} {}] of size {} >",
104             self.packet_type,
105             self.payload[0], self.payload[1], self.payload[2], self.payload[3],
106             self.payload[p_len-4], self.payload[p_len-3], self.payload[p_len - 2], self.payload[p_len-1], p_len ) 
107    }
108  }
109}
110
111impl Default for TofPacket {
112  fn default() -> Self {
113    Self::new()
114  }
115}
116
117/// Implement because TofPacket saves the creation time, 
118/// which never will be the same for 2 different instances
119impl PartialEq for TofPacket {
120  fn eq(&self, other: &Self) -> bool {
121    (self.packet_type == other.packet_type)           &&
122    (self.payload == other.payload)                   &&
123    (self.no_write_to_disk == other.no_write_to_disk) &&
124    (self.no_send_over_nw == other.no_send_over_nw)   &&
125    (self.valid == other.valid)
126  }
127}
128
129impl TofPacket {
130
131  pub fn new() -> Self {
132    let creation_time = Instant::now();
133    Self {
134      packet_type      : PacketType::Unknown,
135      payload          : Vec::<u8>::new(),
136      no_write_to_disk : false,
137      no_send_over_nw  : false,
138      creation_time    : creation_time,
139      valid            : true,
140    }
141  }
142
143  /// Generate a bytestream of self for ZMQ, prefixed with 
144  /// BRCT so all RBs will see it
145  pub fn zmq_payload_brdcast(&self) -> Vec<u8> {
146    let mut payload     = String::from("BRCT").into_bytes(); 
147    let mut stream  = self.to_bytestream();
148    payload.append(&mut stream);
149    payload
150  }
151  
152  /// Generate a bytestream of self for ZMQ, prefixed with 
153  /// RBX, to address only a certain board
154  pub fn zmq_payload_rb(&self, rb_id : u8) -> Vec<u8> {
155    let mut payload     = format!("RB{:02}", rb_id).into_bytes(); 
156    let mut stream  = self.to_bytestream();
157    payload.append(&mut stream);
158    payload
159  }
160
161  /// Unpack the TofPacket and return its content
162  pub fn unpack<T>(&self) -> Result<T, SerializationError>
163    where T: Packable + Serialization {
164    if T::PACKET_TYPE != self.packet_type {
165      error!("This bytestream is not for a {} packet!", self.packet_type);
166      return Err(SerializationError::IncorrectPacketType);
167    }
168    let unpacked : T = T::from_bytestream(&self.payload, &mut 0)?;
169    Ok(unpacked)
170  }
171  
172  pub fn age(&self) -> u64 {
173    self.creation_time.elapsed().as_secs()
174  }
175}
176
177
178#[cfg(feature="random")]
179impl FromRandom for TofPacket {
180
181  fn from_random() -> Self {
182    // FIXME - this should be an actual, realistic
183    // distribution
184    let choices = [
185      PacketType::TofEvent,
186      PacketType::TofEvent,
187      PacketType::TofEvent,
188      PacketType::TofEvent,
189      PacketType::TofEvent,
190      PacketType::TofEvent,
191      PacketType::TofEvent,
192      PacketType::RBWaveform,
193      PacketType::RBWaveform,
194      PacketType::TofEventSummary,
195      PacketType::TofEventSummary,
196      PacketType::TofEventSummary,
197      PacketType::TofEventSummary,
198      PacketType::TofEventSummary,
199      PacketType::TofEventSummary,
200      PacketType::MasterTrigger,
201      PacketType::MasterTrigger,
202      PacketType::MasterTrigger,
203      PacketType::RBMoniData,
204      PacketType::PBMoniData,
205      PacketType::LTBMoniData,
206      PacketType::PAMoniData,
207      PacketType::CPUMoniData,
208      PacketType::MonitorMtb,
209    ];
210    let mut rng  = rand::thread_rng();
211    let idx = rng.gen_range(0..choices.len());
212    let packet_type = choices[idx];
213    match packet_type {
214      PacketType::TofEvent => {
215        let te = TofEvent::from_random();
216        return te.pack()
217      }
218      PacketType::TofEventSummary => {
219        let te = TofEventSummary::from_random();
220        return te.pack()
221      }
222      PacketType::RBWaveform => {
223        let te = RBWaveform::from_random();
224        return te.pack()
225      }
226      PacketType::MasterTrigger => {
227        let te = MasterTriggerEvent::from_random();
228        return te.pack()
229      }
230      PacketType::RBMoniData => {
231        let te = RBMoniData::from_random();
232        return te.pack()
233      }
234      PacketType::PAMoniData => {
235        let te = PAMoniData::from_random();
236        return te.pack()
237      }
238      PacketType::LTBMoniData => {
239        let te = LTBMoniData::from_random();
240        return te.pack()
241      }
242      PacketType::PBMoniData => {
243        let te = PBMoniData::from_random();
244        return te.pack()
245      }
246      PacketType::CPUMoniData => {
247        let te = CPUMoniData::from_random();
248        return te.pack()
249      }
250      PacketType::MonitorMtb  => {
251        let te = MtbMoniData::from_random();
252        return te.pack()
253      }
254      _ => {
255        let te = TofEvent::from_random();
256        return te.pack()
257      }
258    }
259  }
260}
261
262
263/// FIXME - all these can go away now, because we have the
264/// Packable trait! Amazing!
265impl From<&RBWaveform> for TofPacket {
266  fn from(rbwave : &RBWaveform) -> Self {
267    let mut tp     = Self::new();
268    tp.packet_type = PacketType::RBWaveform;
269    tp.payload     = rbwave.to_bytestream();
270    tp
271  }
272}
273
274impl From<&TofEventSummary> for TofPacket {
275  fn from(tsum : &TofEventSummary) -> Self {
276    let mut tp     = Self::new();
277    tp.packet_type = PacketType::TofEventSummary;
278    tp.payload     = tsum.to_bytestream();
279    tp
280  }
281}
282
283impl From<&TofEvent> for TofPacket {
284  fn from(event : &TofEvent) -> Self {
285    let mut tp = Self::new();
286    tp.packet_type = PacketType::TofEvent;
287    tp.payload = event.to_bytestream();
288    tp
289  }
290}
291
292impl From<&mut TofEvent> for TofPacket {
293  fn from(event : &mut TofEvent) -> Self {
294    let mut tp     = Self::new();
295    tp.packet_type = PacketType::TofEvent;
296    tp.payload     = event.to_bytestream();
297    tp
298  }
299}
300
301impl From<&TofCommand> for TofPacket {
302  fn from(cmd : &TofCommand) -> Self {
303    let mut tp = Self::new();
304    tp.packet_type = PacketType::TofCommand;
305    tp.payload = cmd.to_bytestream();
306    tp
307  }
308}
309
310
311impl From<&RBCalibrations> for TofPacket {
312  fn from(calib : &RBCalibrations) -> Self {
313    let mut tp = Self::new();
314    tp.packet_type = PacketType::RBCalibration;
315    tp.payload = calib.to_bytestream();
316    tp
317  }
318}
319
320impl From<&CPUMoniData> for TofPacket {
321  fn from(moni : &CPUMoniData) -> Self {
322    let mut tp = Self::new();
323    tp.packet_type = PacketType::CPUMoniData;
324    tp.payload     = moni.to_bytestream();
325    tp
326  }
327}
328
329impl From<&mut RBCalibrations> for TofPacket {
330  fn from(calib : &mut RBCalibrations) -> Self {
331    let mut tp = Self::new();
332    tp.packet_type = PacketType::RBCalibration;
333    tp.payload = calib.to_bytestream();
334    tp
335  }
336}
337
338impl From<&RBEvent> for TofPacket {
339  fn from(event : &RBEvent) -> Self {
340    let mut tp = Self::new();
341    tp.packet_type = PacketType::RBEvent;
342    tp.payload = event.to_bytestream();
343    tp
344  }
345}
346
347impl From<&MasterTriggerEvent> for TofPacket {
348  fn from(mt : &MasterTriggerEvent) -> TofPacket {
349    let mut tp     = TofPacket::new();
350    tp.packet_type = PacketType::MasterTrigger;
351    tp.payload     = mt.to_bytestream();
352    tp
353  }
354}
355
356
357//impl From<&RBMoniData> for TofPacket {
358//  fn from(moni : &RBMoniData) -> Self {
359//    let mut tp     = Self::new();
360//    tp.packet_type = PacketType::RBMoniData;
361//    tp.payload     = moni.to_bytestream();
362//    tp
363//  }
364//}
365//
366//impl From<&PBMoniData> for TofPacket {
367//  fn from(moni : &PBMoniData) -> Self {
368//    let mut tp     = Self::new();
369//    tp.packet_type = PacketType::PBMoniData;
370//    tp.payload     = moni.to_bytestream();
371//    tp
372//  }
373//}
374//impl From<&LTBMoniData> for TofPacket {
375//  fn from(moni : &LTBMoniData) -> Self {
376//    let mut tp     = Self::new();
377//    tp.packet_type = PacketType::LTBMoniData;
378//    tp.payload     = moni.to_bytestream();
379//    tp
380//  }
381//}
382//
383//impl From<&PAMoniData> for TofPacket {
384//  fn from(moni : &PAMoniData) -> Self {
385//    let mut tp     = Self::new();
386//    tp.packet_type = PacketType::PAMoniData;
387//    tp.payload     = moni.to_bytestream();
388//    tp
389//  }
390//}
391
392impl From<&MtbMoniData> for TofPacket {
393  fn from(moni : &MtbMoniData) -> TofPacket {
394    let mut tp = TofPacket::new();
395    tp.packet_type = PacketType::MonitorMtb;
396    tp.payload = moni.to_bytestream();
397    tp
398  }
399}
400
401impl From<&RBEventHeader> for TofPacket {
402  fn from(ev_header : &RBEventHeader) -> TofPacket {
403    let mut tp     = TofPacket::new();
404    tp.packet_type = PacketType::RBEventHeader;
405    tp.payload     = ev_header.to_bytestream();
406    tp
407  }
408}
409
410// I would LOOVE to implement the Packable trait here and have 
411// a matroshka doll for TofPackets. I just don't know why that 
412// would be useful. It might be leading to a new approach 
413// for multipackets
414
415impl Serialization for TofPacket {
416  const HEAD : u16 = 0xaaaa;
417  const TAIL : u16 = 0x5555;
418  const SIZE : usize = 0; // FIXME - size/prelude_size 
419
420  fn from_bytestream(stream : &Vec<u8>, pos : &mut usize)
421  -> Result<Self, SerializationError> {
422    if stream.len() < 2 {
423      return Err(SerializationError::HeadInvalid {});
424    }
425    let head = parse_u16(stream, pos);
426    if Self::HEAD != head {
427      error!("Packet does not start with HEAD signature");
428      return Err(SerializationError::HeadInvalid {});
429    }
430    let packet_type : PacketType;
431    let packet_type_enc = parse_u8(stream, pos);
432    match PacketType::try_from(packet_type_enc) {
433      Ok(pt) => packet_type = pt,
434      Err(_) => {
435        error!("Can not decode packet with packet type {}", packet_type_enc);
436        return Err(SerializationError::UnknownPayload);}
437    }
438    let payload_size = parse_u32(stream, pos) as usize;
439    *pos += payload_size; 
440    let tail = parse_u16(stream, pos);
441    if Self::TAIL != tail {
442      error!("Packet does not end with TAIL signature");
443      return Err(SerializationError::TailInvalid {});
444    }
445    *pos -= 2; // for tail parsing
446    *pos -= payload_size;
447
448    let mut tp = TofPacket::new();
449    tp.packet_type = packet_type;
450    tp.payload.extend_from_slice(&stream[*pos..*pos+payload_size]);
451    // Fix position marker
452    *pos += 2 + payload_size;
453    Ok(tp) 
454  }
455  
456  fn to_bytestream(&self) 
457    -> Vec<u8> {
458    let mut bytestream = Vec::<u8>::with_capacity(6 + self.payload.len());
459    bytestream.extend_from_slice(&TofPacket::HEAD.to_le_bytes());
460    let p_type = self.packet_type as u8;
461    bytestream.push(p_type);
462    // payload size of 32 bit accomodates up to 4 GB packet
463    // a 16 bit size would only hold 65k, which might be not
464    // good enough if we sent multiple events in a batch in 
465    // the same TofPacket (in case we do that)
466    let payload_len = self.payload.len() as u32;
467    //let foo = &payload_len.to_le_bytes();
468    //debug!("TofPacket binary payload: {foo:?}");
469    bytestream.extend_from_slice(&payload_len.to_le_bytes());
470    bytestream.extend_from_slice(self.payload.as_slice());
471    bytestream.extend_from_slice(&TofPacket::TAIL.to_le_bytes());
472    bytestream
473  }
474}
475
476