gondola_core/
monitoring.rs

1// This file is part of gaps-online-software and published 
2// under the GPLv3 license
3
4pub mod pa_moni_data;
5pub use pa_moni_data::{
6  PAMoniData,
7  PAMoniDataSeries
8};
9pub mod pb_moni_data;
10pub use pb_moni_data::{
11  PBMoniData,
12  PBMoniDataSeries
13};
14pub mod mtb_moni_data;
15pub use mtb_moni_data::{
16  MtbMoniData,
17  MtbMoniDataSeries
18};
19pub mod ltb_moni_data;
20pub use ltb_moni_data::{
21  LTBMoniData,
22  LTBMoniDataSeries
23};
24pub mod rb_moni_data;
25pub use rb_moni_data::{
26  RBMoniData,
27  RBMoniDataSeries
28};
29pub mod cpu_moni_data;
30pub use cpu_moni_data::{
31  CPUMoniData,
32  CPUMoniDataSeries
33};
34
35pub mod heartbeats;
36pub use heartbeats::{
37  DataSinkHB,
38  DataSinkHBSeries,
39  MasterTriggerHB,
40  MasterTriggerHBSeries,
41  EventBuilderHB,
42  EventBuilderHBSeries,
43};
44
45pub mod run_statistics;
46pub use run_statistics::RunStatistics;
47
48use std::collections::VecDeque;
49use std::collections::HashMap;
50
51#[cfg(feature="pybindings")]
52use crate::prelude::*;
53
54#[cfg(feature="pybindings")]
55use polars::frame::column::Column;
56#[cfg(feature="pybindings")]
57use polars::prelude::NamedFrom; 
58
59/// Monitoring data shall share the same kind 
60/// of interface. 
61pub trait MoniData {
62  /// Monitoring data is always tied to a specific
63  /// board. This might not be its own board, but 
64  /// maybe the RB the data was gathered from
65  /// This is an unique identifier for the 
66  /// monitoring data
67  fn get_board_id(&self) -> u8;
68  
69  /// Access the (data) members by name 
70  fn get(&self, varname : &str) -> Option<f32>;
71
72  /// A list of the variables in this MoniData
73  fn keys() -> Vec<&'static str>;
74
75  fn get_timestamp(&self) -> u64 {
76    0
77  }
78
79  fn set_timestamp(&mut self, _ts : u64) {
80  }
81  ///// access the internal timestamps as obtained from 
82  ///// MoniDat 
83  //fn get_timestamps_mut(&mut self) -> &Vec<u64> {
84  //}   
85}
86
87/// A MoniSeries is a collection of (primarily) monitoring
88/// data, which comes from multiple senders.
89/// E.g. a MoniSeries could hold RBMoniData from all 
90/// 40 ReadoutBoards.
91pub trait MoniSeries<T>
92  where T : Copy + MoniData {
93
94  fn get_first_ts(&self) -> u64;
95
96  fn get_data(&self) -> &HashMap<u8,VecDeque<T>>;
97
98  fn get_data_mut(&mut self) -> &mut HashMap<u8,VecDeque<T>>;
99 
100  fn get_max_size(&self) -> usize;
101
102  fn get_timestamps(&self) -> &Vec<u64>;
103
104  fn add_timestamp(&mut self, ts : u64);
105
106  /// A HashMap of -> rbid, Vec\<var\> 
107  fn get_var(&self, varname : &str) -> HashMap<u8, Vec<f32>> {
108    let mut values = HashMap::<u8, Vec<f32>>::new();
109    for k in self.get_data().keys() {
110      match self.get_var_for_board(varname, k) {
111        None => (),
112        Some(vals) => {
113          values.insert(*k, vals);
114        }
115      }
116      //values.insert(*k, Vec::<f32>::new());
117      //match self.get_data().get(k) {
118      //  None => (),
119      //  Some(vec_moni) => {
120      //    for moni in vec_moni {
121      //      match moni.get(varname) {
122      //        None => (),
123      //        Some(val) => {
124      //          values.get_mut(k).unwrap().push(val);
125      //        }
126      //      }
127      //    }
128      //  }
129      //}
130    }
131    values
132  }
133
134  /// Get a certain variable, but only for a single board
135  fn get_var_for_board(&self, varname : &str, rb_id : &u8) -> Option<Vec<f32>> {
136    let mut values = Vec::<f32>::new();
137    match self.get_data().get(&rb_id) {
138      None => (),
139      Some(vec_moni) => {
140        for moni in vec_moni {
141          match moni.get(varname) {
142            None => {
143              return None;
144            },
145            Some(val) => {
146              values.push(val);
147            }
148          }
149        }
150      }
151    }
152    // FIXME This needs to be returning a reference,
153    // not cloning
154    Some(values)
155  }
156
157  #[cfg(feature = "pybindings")]
158  fn get_dataframe(&self) -> PolarsResult<DataFrame> {
159    let mut series = Vec::<Column>::new();
160    for k in Self::keys() {
161      match self.get_series(k) {
162        None => {
163          error!("Unable to get series for {}", k);
164        }
165        Some(ser) => {
166          //println!("{}", ser);
167          series.push(ser.into());
168        }
169      }
170    }
171    //if self.get_timestamps().len() > 0 {
172    //  let timestamps  = Series::new("timestamps".into(), self.get_timestamps());
173    //  series.push(timestamps.into());
174    //}
175    // each column is now the specific variable but in terms for 
176    // all rbs
177    let df = DataFrame::new(series)?;
178    Ok(df)
179  }
180
181  #[cfg(feature = "pybindings")]
182  /// Get the variable for all boards. This keeps the order of the 
183  /// underlying VecDeque. Values of all boards intermixed.
184  /// To get a more useful version, use the Dataframe instead.
185  ///
186  /// # Arguments
187  ///
188  /// * varname : The name of the attribute of the underlying
189  ///             moni structure
190  fn get_series(&self, varname : &str) -> Option<Series> {
191    let mut data = Vec::<f32>::with_capacity(self.get_data().len());
192    let sorted_keys: Vec<u8> = self.get_data().keys().cloned().collect();
193    for board_id in sorted_keys.iter() {
194      let dqe = self.get_data().get(board_id).unwrap(); //uwrap is fine, bc we checked earlier
195      for moni in dqe {
196        match moni.get(varname) {
197          None => {
198            error!("This type of MoniData does not have a key called {}", varname);
199            return None;
200          }
201          Some(var) => {
202            data.push(var);
203          }
204        }
205      }
206    }
207    let series = Series::new(varname.into(), data);
208    Some(series)
209  }
210
211  /// A list of the variables in this MoniSeries
212  fn keys() -> Vec<&'static str> {
213    T::keys()
214  }
215
216  /// A list of boards in this series
217  fn get_board_ids(&self) -> Vec<u8> {
218    self.get_data().keys().cloned().collect()
219  }
220
221  /// Add another instance of the data container to the series
222  fn add(&mut self, data : T) {
223    let board_id = data.get_board_id();
224    if !self.get_data().contains_key(&board_id) {
225      self.get_data_mut().insert(board_id, VecDeque::<T>::new());
226    } 
227    self.get_data_mut().get_mut(&board_id).unwrap().push_back(data);
228    if self.get_data_mut().get_mut(&board_id).unwrap().len() > self.get_max_size() {
229      error!("The queue is too large, returning the first element! If you need a larger series size, set the max_size field");
230      self.get_data_mut().get_mut(&board_id).unwrap().pop_front();
231    }
232  }
233  
234  fn get_last_moni(&self, board_id : u8) -> Option<T> {
235    let size = self.get_data().get(&board_id)?.len();
236    Some(self.get_data().get(&board_id).unwrap()[size - 1])
237  }
238}
239
240//--------------------------------------------------
241
242/// Implements the moniseries trait for a MoniData 
243/// type of class
244#[macro_export]
245macro_rules! moniseries {
246  ($name : ident, $class:ty) => {
247    
248    use std::collections::VecDeque;
249    use std::collections::HashMap;
250
251    use crate::monitoring::MoniSeries;
252
253    #[cfg_attr(feature="pybindings",pyclass)]
254    #[derive(Debug, Clone, PartialEq)]
255    pub struct $name {
256      data        : HashMap<u8, VecDeque<$class>>,
257      max_size    : usize,
258      timestamps  : Vec<u64>,
259    }
260    
261    impl $name {
262      pub fn new() -> Self {
263        Self {
264          data       : HashMap::<u8, VecDeque<$class>>::new(),
265          max_size   : 10000,
266          timestamps : Vec::<u64>::new()
267        }
268      }
269    } 
270    
271    impl Default for $name {
272      fn default() -> Self {
273        Self::new()
274      }
275    }
276    
277    impl fmt::Display for $name {
278      fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
279        write!(f, "<{} : {} boards>", stringify!($name), self.data.len())
280      }
281    }
282    
283    impl MoniSeries<$class> for $name {
284   
285      fn get_first_ts(&self) -> u64 {
286        if self.timestamps.len() == 0 {
287          return 0;
288        } else {
289          self.timestamps[0]
290        }
291      }
292
293      fn get_data(&self) -> &HashMap<u8,VecDeque<$class>> {
294        return &self.data;
295      }
296    
297      fn get_data_mut(&mut self) -> &mut HashMap<u8,VecDeque<$class>> {
298        return &mut self.data;
299      }
300     
301      fn get_max_size(&self) -> usize {
302        return self.max_size;
303      }
304  
305      fn add_timestamp(&mut self, ts : u64) {
306        self.timestamps.push(ts);
307      }
308
309      fn get_timestamps(&self) -> &Vec<u64> {
310        //if self.timestamps.len() == 0 {
311        //  let mut timestamps = Vec::<u64>::new();
312        //  for k in self.
313        //} 
314        return &self.timestamps;
315      }
316    }
317  
318    #[cfg(feature="pybindings")]
319    #[pymethods]
320    impl $name {
321      #[new]
322      fn new_py() -> Self {
323        Self::new() 
324      }
325   
326      /// The maximum size of the series. If more data 
327      /// are added, data from the front will be removed 
328      #[getter]
329      #[pyo3(name="max_size")]
330      fn get_max_size_py(&self) -> usize {
331        self.get_max_size()
332      }
333
334      #[getter]
335      #[pyo3(name="get_first_ts")]
336      fn get_first_ts_py(&self) -> u64 {
337        self.get_first_ts()
338      }
339
340      /// If monitoring is retrieved from telemetry, we 
341      /// save the gcu timestamp of the packet, wich 
342      /// herein can be accessed.
343      #[getter] 
344      #[pyo3(name="timestamps")] 
345      fn get_timestamps_py(&self) -> Vec<u64> {
346        warn!("This returns a full copy and is a performance bottleneck!");
347        return self.timestamps.clone();
348      }
349
350      /// Add an additional (Caraspace) file to the series 
351      ///
352      /// # Arguments:
353      ///   * filename    : The name of the (caraspace) file to add
354      ///   * from_object : Since this adds caraspace files, it is possible 
355      ///                   to choose from where to get the information.
356      ///                   Either the telemetry packet, or the tofpacket, if 
357      ///                   either is present in the frame. When 
358      ///                   CRFrameObjectType = Unknown, it will figure it out 
359      ///                   automatically, preferring the telemetry since it has
360      ///                   the gcu timestamp
361      #[pyo3(signature = (filename, from_object = CRFrameObjectType::TelemetryPacket))]
362      fn add_crfile(&mut self, filename : String, from_object : CRFrameObjectType) {
363        let reader = CRReader::new(filename).expect("Unable to open file!");
364        // now we have a problem - from which frame should we get it?
365        // if we get it from the dedicated TOF stream it will be much 
366        // faster (if that is available) since it will be it's own 
367        // presence in the frame
368        //let address = &source.clone();
369        //let mut try_from_telly = false;
370        let tp_source     = String::from("TofPacketType.") + stringify!($class);
371        let tp_source_alt = String::from("PacketType.") + stringify!($class);
372        let tel_source    = "TelemetryPacketType.AnyTofHK";
373        for frame in reader {
374          match from_object { 
375            CRFrameObjectType::TofPacket =>  {
376              if frame.has(&tp_source) || frame.has(&tp_source_alt) {
377                if frame.has(&tp_source) {
378                  let moni_res = frame.get::<TofPacket>(&tp_source).unwrap().unpack::<$class>();
379                  match moni_res {
380                    Err(err) => {
381                      println!("Error unpacking! {err}");
382                    }
383                    Ok(moni) => {
384                      self.add(moni);
385                    }
386                  }
387                }
388                if frame.has(&tp_source_alt) {
389                  let moni_res = frame.get::<TofPacket>(&tp_source_alt).unwrap().unpack::<$class>();
390                  match moni_res { 
391                    Err(err) => {
392                      println!("Error unpacking! {err}");
393                    }
394                    Ok(moni) => {
395                      self.add(moni);
396                    }
397                  }
398                }
399              } 
400            }
401            CRFrameObjectType::TelemetryPacket | CRFrameObjectType::Unknown => {
402              if frame.has(tel_source) {
403                let hk_res = frame.get::<TelemetryPacket>(tel_source);
404                match hk_res {
405                  Err(err) => {
406                    println!("Error unpacking! {err}");
407                  }
408                  Ok(hk) => {
409                    let mut pos = 0;
410                    // subtract the 2020/1/1 midnight from the gcutime to make 
411                    // it f32
412                    let gcutime = hk.header.get_gcutime() as u64;
413                    match TofPacket::from_bytestream(&hk.payload, &mut pos) {
414                      Err(err) => {
415                        println!("Error unpackin! {err}");
416                      }
417                      Ok(tp) => {
418                        if tp.packet_type == <$class>::TOF_PACKET_TYPE  {
419                          match tp.unpack::<$class>() {
420                            Err(err) => {
421                              println!("Error unpacking! {err}");
422                            }
423                            Ok(mut moni) => {
424                              self.add_timestamp(gcutime);
425                              moni.set_timestamp(gcutime - self.get_first_ts()); 
426                              self.add(moni);
427                              //self.timestamps.push(gcutime);
428                            }
429                          }
430                        }
431                      }
432                    } 
433                  }
434                }
435              }
436            }
437          }
438        }
439      }
440      
441      /// Add an additional (Telemetry) file to the series 
442      ///
443      /// # Arguments:
444      ///   * filename    : The name of the (telemetry) file to add
445      fn add_telemetryfile(&mut self, filename : String) {
446        let reader = TelemetryPacketReader::new(filename, true, None, None);
447        for pack in reader {
448          if pack.header.packet_type == TelemetryPacketType::AnyTofHK {
449            let mut pos = 0;
450            // subtract the 2020/1/1 midnight from the gcutime to make 
451            // it f32
452            let gcutime = pack.header.get_gcutime() as u64;
453            match TofPacket::from_bytestream(&pack.payload, &mut pos) {
454              Err(err) => {
455                println!("Error unpackin! {err}");
456              }
457              Ok(tp) => {
458                if tp.packet_type == <$class>::TOF_PACKET_TYPE  {
459                  match tp.unpack::<$class>() {
460                    Err(err) => {
461                      println!("Error unpacking! {err}");
462                    }
463                    Ok(mut moni) => {
464                      self.add_timestamp(gcutime);
465                      moni.set_timestamp(gcutime - self.get_first_ts()); 
466                      self.add(moni);
467                      //self.timestamps.push(gcutime);
468                    }
469                  }
470                }
471              }
472            } 
473          }
474        }
475      }
476      
477      /// Add an additional (Tof) file to the series 
478      ///
479      /// # Arguments:
480      ///   * filename    : The name of the (caraspace) file to add
481      ///   * from_object : Since this adds caraspace files, it is possible 
482      ///                   to choose from where to get the information.
483      ///                   Either the telemetry packet, or the tofpacket, if 
484      ///                   either is present in the frame. When 
485      ///                   CRFrameObjectType = Unknown, it will figure it out 
486      ///                   automatically, preferring the telemetry since it has
487      ///                   the gcu timestamp
488      #[pyo3(signature = (filename))]
489      fn add_toffile(&mut self, filename : String) {
490        let reader = TofPacketReader::new(&filename);
491        for tp in reader { 
492          if tp.packet_type == <$class>::TOF_PACKET_TYPE  {
493            match tp.unpack::<$class>() {
494              Err(err) => {
495                println!("Error unpacking! {err}");
496              }
497              Ok(mut moni) => {
498                self.add_timestamp(moni.get_timestamp());
499                moni.set_timestamp(moni.get_timestamp()); 
500                self.add(moni);
501                //self.timestamps.push(gcutime);
502              }
503            }
504          }
505        }
506      }
507
508      /// Generate a polars dataframe with monitoring data from the 
509      /// given TOF file.
510      /// This will load ONLY data of the specific type of the 
511      /// MoniSeries itself
512      ///
513      /// # Arguments:
514      ///   * filename : A single .tof.gaps file with monitoring 
515      ///                information 
516      #[staticmethod]
517      fn from_tof_file(filename : String) -> PyResult<PyDataFrame> {
518        let mut reader = TofPacketReader::new(&filename);
519        let mut series = Self::new();
520        // it would be nice to set the filter here, but I 
521        // don't know how that can be done in the macro
522        reader.filter  = <$class>::TOF_PACKET_TYPE;
523        for tp in reader {
524          if let Ok(moni) =  tp.unpack::<$class>() {
525            series.add(moni);
526          }
527          // other packets will get thrown away 
528        }
529        match series.get_dataframe() {
530          Ok(df) => {
531            let pydf = PyDataFrame(df);
532            return Ok(pydf);
533          },
534          Err(err) => {
535            return Err(PyValueError::new_err(err.to_string()));
536          }
537        }
538      }
539      
540      #[pyo3(name="get_var_for_board")]
541      fn get_var_for_board_py(&self, varname : &str, rb_id : u8) -> Option<Vec<f32>> {
542        self.get_var_for_board(varname, &rb_id)
543      }
544
545      /// Reduces the MoniSeries to a single polars data frame
546      /// The structure itself will not be changed
547      #[pyo3(name="get_dataframe")]
548      fn get_dataframe_py(&self) -> PyResult<PyDataFrame> {
549        match self.get_dataframe() {
550          Ok(df) => {
551            let pydf = PyDataFrame(df);
552            return Ok(pydf);
553          },
554          Err(err) => {
555            return Err(PyValueError::new_err(err.to_string()));
556          }
557        }
558      }
559
560      //fn get_pl_series_py(&self) -> PyResult<PyS
561      //fn get_data(&self) -> &HashMap<u8,VecDeque<$class>> {
562      //  return &self.data;
563      //}
564    
565      //fn get_data_mut(&mut self) -> &mut HashMap<u8,VecDeque<$class>> {
566      //  return &mut self.data;
567      //}
568     
569      //fn get_max_size(&self) -> usize {
570      //  return self.max_size;
571      //}
572    }
573    
574    #[cfg(feature="pybindings")]
575    pythonize_display!($name);
576  }
577}
578