gondola_core/
monitoring.rs

1// This file is part of gaps-online-software and published 
2// under the GPLv3 license
3
4pub mod pa_moni_data;
5pub use pa_moni_data::{
6  PAMoniData,
7  PAMoniDataSeries
8};
9pub mod pb_moni_data;
10pub use pb_moni_data::{
11  PBMoniData,
12  PBMoniDataSeries
13};
14pub mod mtb_moni_data;
15pub use mtb_moni_data::{
16  MtbMoniData,
17  MtbMoniDataSeries
18};
19pub mod ltb_moni_data;
20pub use ltb_moni_data::{
21  LTBMoniData,
22  LTBMoniDataSeries
23};
24pub mod rb_moni_data;
25pub use rb_moni_data::{
26  RBMoniData,
27  RBMoniDataSeries
28};
29pub mod cpu_moni_data;
30pub use cpu_moni_data::{
31  CPUMoniData,
32  CPUMoniDataSeries
33};
34
35pub mod heartbeats;
36pub use heartbeats::{
37  DataSinkHB,
38  DataSinkHBSeries,
39  MasterTriggerHB,
40  MasterTriggerHBSeries,
41  EventBuilderHB,
42  EventBuilderHBSeries,
43};
44
45pub mod run_statistics;
46pub use run_statistics::RunStatistics;
47
48use std::collections::VecDeque;
49use std::collections::HashMap;
50
51use crate::prelude::*;
52
53#[cfg(feature="pybindings")]
54use polars::frame::column::Column;
55#[cfg(feature="pybindings")]
56use polars::prelude::NamedFrom; 
57
58/// Monitoring data shall share the same kind 
59/// of interface. 
60pub trait MoniData {
61  /// Monitoring data is always tied to a specific
62  /// board. This might not be its own board, but 
63  /// maybe the RB the data was gathered from
64  /// This is an unique identifier for the 
65  /// monitoring data
66  fn get_board_id(&self) -> u8;
67  
68  /// Access the (data) members by name 
69  fn get(&self, varname : &str) -> Option<f32>;
70
71  /// A list of the variables in this MoniData
72  fn keys() -> Vec<&'static str>;
73
74  fn get_timestamp(&self) -> u64 {
75    0
76  }
77
78  fn set_timestamp(&mut self, ts : u64) {
79  
80  }
81  ///// access the internal timestamps as obtained from 
82  ///// MoniDat 
83  //fn get_timestamps_mut(&mut self) -> &Vec<u64> {
84  //}   
85
86
87}
88
89/// A MoniSeries is a collection of (primarily) monitoring
90/// data, which comes from multiple senders.
91/// E.g. a MoniSeries could hold RBMoniData from all 
92/// 40 ReadoutBoards.
93pub trait MoniSeries<T>
94  where T : Copy + MoniData {
95
96  fn get_data(&self) -> &HashMap<u8,VecDeque<T>>;
97
98  fn get_data_mut(&mut self) -> &mut HashMap<u8,VecDeque<T>>;
99 
100  fn get_max_size(&self) -> usize;
101
102  fn get_timestamps(&self) -> &Vec<u64>;
103
104  /// A HashMap of -> rbid, Vec\<var\> 
105  fn get_var(&self, varname : &str) -> HashMap<u8, Vec<f32>> {
106    let mut values = HashMap::<u8, Vec<f32>>::new();
107    for k in self.get_data().keys() {
108      match self.get_var_for_board(varname, k) {
109        None => (),
110        Some(vals) => {
111          values.insert(*k, vals);
112        }
113      }
114      //values.insert(*k, Vec::<f32>::new());
115      //match self.get_data().get(k) {
116      //  None => (),
117      //  Some(vec_moni) => {
118      //    for moni in vec_moni {
119      //      match moni.get(varname) {
120      //        None => (),
121      //        Some(val) => {
122      //          values.get_mut(k).unwrap().push(val);
123      //        }
124      //      }
125      //    }
126      //  }
127      //}
128    }
129    values
130  }
131
132  /// Get a certain variable, but only for a single board
133  fn get_var_for_board(&self, varname : &str, rb_id : &u8) -> Option<Vec<f32>> {
134    let mut values = Vec::<f32>::new();
135    match self.get_data().get(&rb_id) {
136      None => (),
137      Some(vec_moni) => {
138        for moni in vec_moni {
139          match moni.get(varname) {
140            None => {
141              return None;
142            },
143            Some(val) => {
144              values.push(val);
145            }
146          }
147        }
148      }
149    }
150    // FIXME This needs to be returning a reference,
151    // not cloning
152    Some(values)
153  }
154
155  #[cfg(feature = "pybindings")]
156  fn get_dataframe(&self) -> PolarsResult<DataFrame> {
157    let mut series = Vec::<Column>::new();
158    for k in Self::keys() {
159      match self.get_series(k) {
160        None => {
161          error!("Unable to get series for {}", k);
162        }
163        Some(ser) => {
164          series.push(ser.into());
165        }
166      }
167    }
168    //if self.get_timestamps().len() > 0 {
169    //  let timestamps  = Series::new("timestamps".into(), self.get_timestamps());
170    //  series.push(timestamps.into());
171    //}
172    // each column is now the specific variable but in terms for 
173    // all rbs
174    let df = DataFrame::new(series)?;
175    Ok(df)
176  }
177
178  #[cfg(feature = "pybindings")]
179  /// Get the variable for all boards. This keeps the order of the 
180  /// underlying VecDeque. Values of all boards intermixed.
181  /// To get a more useful version, use the Dataframe instead.
182  ///
183  /// # Arguments
184  ///
185  /// * varname : The name of the attribute of the underlying
186  ///             moni structure
187  fn get_series(&self, varname : &str) -> Option<Series> {
188    let mut data = Vec::<f32>::with_capacity(self.get_data().len());
189    let sorted_keys: Vec<u8> = self.get_data().keys().cloned().collect();
190    for rbid in sorted_keys.iter() {
191      let dqe = self.get_data().get(rbid).unwrap(); //uwrap is fine, bc we checked earlier
192      for moni in dqe {
193        match moni.get(varname) {
194          None => {
195            error!("This type of MoniData does not have a key called {}", varname);
196            return None;
197          }
198          Some(var) => {
199            data.push(var);
200          }
201        }
202      }
203    }
204    let series = Series::new(varname.into(), data);
205    Some(series)
206  }
207
208  /// A list of the variables in this MoniSeries
209  fn keys() -> Vec<&'static str> {
210    T::keys()
211  }
212
213  /// A list of boards in this series
214  fn get_board_ids(&self) -> Vec<u8> {
215    self.get_data().keys().cloned().collect()
216  }
217
218  /// Add another instance of the data container to the series
219  fn add(&mut self, data : T) {
220    let board_id = data.get_board_id();
221    if !self.get_data().contains_key(&board_id) {
222      self.get_data_mut().insert(board_id, VecDeque::<T>::new());
223    } 
224    self.get_data_mut().get_mut(&board_id).unwrap().push_back(data);
225    if self.get_data_mut().get_mut(&board_id).unwrap().len() > self.get_max_size() {
226      error!("The queue is too large, returning the first element! If you need a larger series size, set the max_size field");
227      self.get_data_mut().get_mut(&board_id).unwrap().pop_front();
228    }
229  }
230  
231  fn get_last_moni(&self, board_id : u8) -> Option<T> {
232    let size = self.get_data().get(&board_id)?.len();
233    Some(self.get_data().get(&board_id).unwrap()[size - 1])
234  }
235}
236
237//--------------------------------------------------
238
239/// Implements the moniseries trait for a MoniData 
240/// type of class
241#[macro_export]
242macro_rules! moniseries {
243  ($name : ident, $class:ty) => {
244    
245    use std::collections::VecDeque;
246    use std::collections::HashMap;
247
248    use crate::monitoring::MoniSeries;
249
250    #[cfg_attr(feature="pybindings",pyclass)]
251    #[derive(Debug, Clone, PartialEq)]
252    pub struct $name {
253      data        : HashMap<u8, VecDeque<$class>>,
254      max_size    : usize,
255      timestamps  : Vec<u64>,
256    }
257    
258    impl $name {
259      pub fn new() -> Self {
260        Self {
261          data       : HashMap::<u8, VecDeque<$class>>::new(),
262          max_size   : 10000,
263          timestamps : Vec::<u64>::new()
264        }
265      }
266    } 
267    
268    impl Default for $name {
269      fn default() -> Self {
270        Self::new()
271      }
272    }
273    
274    impl fmt::Display for $name {
275      fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
276        write!(f, "<{} : {} boards>", stringify!($name), self.data.len())
277      }
278    }
279    
280    impl MoniSeries<$class> for $name {
281    
282      fn get_data(&self) -> &HashMap<u8,VecDeque<$class>> {
283        return &self.data;
284      }
285    
286      fn get_data_mut(&mut self) -> &mut HashMap<u8,VecDeque<$class>> {
287        return &mut self.data;
288      }
289     
290      fn get_max_size(&self) -> usize {
291        return self.max_size;
292      }
293    
294      fn get_timestamps(&self) -> &Vec<u64> {
295        return &self.timestamps;
296      }
297    }
298  
299    #[cfg(feature="pybindings")]
300    #[pymethods]
301    impl $name {
302      #[new]
303      fn new_py() -> Self {
304        Self::new() 
305      }
306   
307      /// The maximum size of the series. If more data 
308      /// are added, data from the front will be removed 
309      #[getter]
310      #[pyo3(name="max_size")]
311      fn get_max_size_py(&self) -> usize {
312        self.get_max_size()
313      }
314
315      /// If monitoring is retrieved from telemetry, we 
316      /// save the gcu timestamp of the packet, wich 
317      /// herein can be accessed.
318      #[getter] 
319      #[pyo3(name="timestamps")] 
320      fn get_timestamps_py(&self) -> Vec<u64> {
321        warn!("This returns a full copy and is a performance bottleneck!");
322        return self.timestamps.clone();
323      }
324
325      /// Add an additional (Caraspace) file to the series 
326      ///
327      /// # Arguments:
328      ///   * filename    : The name of the (caraspace) file to add
329      ///   * from_object : Since this adds caraspace files, it is possible 
330      ///                   to choose from where to get the information.
331      ///                   Either the telemetry packet, or the tofpacket, if 
332      ///                   either is present in the frame. When 
333      ///                   CRFrameObjectType = Unknown, it will figure it out 
334      ///                   automatically, preferring the telemetry since it has
335      ///                   the gcu timestamp
336      #[pyo3(signature = (filename, from_object = CRFrameObjectType::TelemetryPacket))]
337      fn add_crfile(&mut self, filename : String, from_object : CRFrameObjectType) {
338        let reader = CRReader::new(filename).expect("Unable to open file!");
339        // now we have a problem - from which frame should we get it?
340        // if we get it from the dedicated TOF stream it will be much 
341        // faster (if that is available) since it will be it's own 
342        // presence in the frame
343        //let address = &source.clone();
344        //let mut try_from_telly = false;
345        let tp_source     = String::from("TofPacketType.") + stringify!($class);
346        let tp_source_alt = String::from("PacketType.") + stringify!($class);
347        let tel_source    = "TelemetryPacketType.AnyTofHK";
348        for frame in reader {
349          match from_object { 
350            CRFrameObjectType::TofPacket =>  {
351              if frame.has(&tp_source) || frame.has(&tp_source_alt) {
352                if frame.has(&tp_source) {
353                  let moni_res = frame.get::<TofPacket>(&tp_source).unwrap().unpack::<$class>();
354                  match moni_res {
355                    Err(err) => {
356                      println!("Error unpacking! {err}");
357                    }
358                    Ok(moni) => {
359                      self.add(moni);
360                    }
361                  }
362                }
363                if frame.has(&tp_source_alt) {
364                  let moni_res = frame.get::<TofPacket>(&tp_source_alt).unwrap().unpack::<$class>();
365                  match moni_res { 
366                    Err(err) => {
367                      println!("Error unpacking! {err}");
368                    }
369                    Ok(moni) => {
370                      self.add(moni);
371                    }
372                  }
373                }
374              } 
375            }
376            CRFrameObjectType::TelemetryPacket | CRFrameObjectType::Unknown => {
377              if frame.has(tel_source) {
378                let hk_res = frame.get::<TelemetryPacket>(tel_source);
379                match hk_res {
380                  Err(err) => {
381                    println!("Error unpacking! {err}");
382                  }
383                  Ok(hk) => {
384                    let mut pos = 0;
385                    let gcutime = hk.header.get_gcutime() as u64;
386                    match TofPacket::from_bytestream(&hk.payload, &mut pos) {
387                      Err(err) => {
388                        println!("Error unpackin! {err}");
389                      }
390                      Ok(tp) => {
391                        if tp.packet_type == <$class>::TOF_PACKET_TYPE  {
392                          match tp.unpack::<$class>() {
393                            Err(err) => {
394                              println!("Error unpacking! {err}");
395                            }
396                            Ok(mut moni) => {
397                              moni.set_timestamp(gcutime); 
398                              self.add(moni);
399                              //self.timestamps.push(gcutime);
400                            }
401                          }
402                        }
403                      }
404                    } 
405                  }
406                }
407              }
408            }
409          }
410        }
411      }
412
413      /// Generate a polars dataframe with monitoring data from the 
414      /// given TOF file.
415      /// This will load ONLY data of the specific type of the 
416      /// MoniSeries itself
417      ///
418      /// # Arguments:
419      ///   * filename : A single .tof.gaps file with monitoring 
420      ///                information 
421      #[staticmethod]
422      fn from_tof_file(filename : String) -> PyResult<PyDataFrame> {
423        let mut reader = TofPacketReader::new(&filename);
424        let mut series = Self::new();
425        // it would be nice to set the filter here, but I 
426        // don't know how that can be done in the macro
427        reader.filter  = <$class>::TOF_PACKET_TYPE;
428        for tp in reader {
429          if let Ok(moni) =  tp.unpack::<$class>() {
430            series.add(moni);
431          }
432          // other packets will get thrown away 
433        }
434        match series.get_dataframe() {
435          Ok(df) => {
436            let pydf = PyDataFrame(df);
437            return Ok(pydf);
438          },
439          Err(err) => {
440            return Err(PyValueError::new_err(err.to_string()));
441          }
442        }
443      }
444      
445      #[pyo3(name="get_var_for_board")]
446      fn get_var_for_board_py(&self, varname : &str, rb_id : u8) -> Option<Vec<f32>> {
447        self.get_var_for_board(varname, &rb_id)
448      }
449
450      /// Reduces the MoniSeries to a single polars data frame
451      /// The structure itself will not be changed
452      #[pyo3(name="get_dataframe")]
453      fn get_dataframe_py(&self) -> PyResult<PyDataFrame> {
454        match self.get_dataframe() {
455          Ok(df) => {
456            let pydf = PyDataFrame(df);
457            return Ok(pydf);
458          },
459          Err(err) => {
460            return Err(PyValueError::new_err(err.to_string()));
461          }
462        }
463      }
464
465      //fn get_pl_series_py(&self) -> PyResult<PyS
466      //fn get_data(&self) -> &HashMap<u8,VecDeque<$class>> {
467      //  return &self.data;
468      //}
469    
470      //fn get_data_mut(&mut self) -> &mut HashMap<u8,VecDeque<$class>> {
471      //  return &mut self.data;
472      //}
473     
474      //fn get_max_size(&self) -> usize {
475      //  return self.max_size;
476      //}
477    }
478    
479    #[cfg(feature="pybindings")]
480    pythonize_display!($name);
481  }
482}
483