gondola_core/io/
telemetry_reader.rs

1//! The TelemetryPacketReader allows to read a (file) stream of serialized
2//! TelemetryPackets, which are typically the .bin files as generated 
3//! by the gcu
4// This file is part of gaps-online-software and published 
5// under the GPLv3 license
6
7use std::io::{
8  BufReader,
9  Read,
10  Seek,
11  SeekFrom,
12};
13
14use crate::prelude::*;
15
16/// Read serialized TelemetryPackets from an existing file
17///
18/// Read GAPS binary files ("Berkeley binaries)
19#[derive(Debug)]
20#[cfg_attr(feature="pybindings", pyclass)]
21pub struct TelemetryPacketReader {
22  /// Reader will emit packets from these files,
23  /// if one file is exhausted, it moves on to 
24  /// the next file automatically
25  pub filenames        : Vec<String>,
26  /// The index of the file the reader is 
27  /// currently reading
28  pub file_idx         : usize,
29  /// depending on the source of the telemetry files, 
30  /// there might be duplicates, because we get them
31  /// over different streams. 
32  /// Suppress these multiple packets
33  pub dedup           : bool,
34  /// Ignore packets that have a gcu time earlier than start_time 
35  pub start_time      : Option<f64>,
36  /// Ignore packets that have a gcu time later than end_time
37  pub end_time        : Option<f64>,
38  file_reader         : BufReader<File>,
39  /// Current (byte) position in the file
40  cursor              : usize,
41  /// Read only packets of type == PacketType
42  pub filter          : TelemetryPacketType,
43  /// Number of read packets
44  n_packs_read        : usize,
45  /// Number of skipped packets
46  n_packs_skipped     : usize,
47  /// Skip the first n packets
48  pub skip_ahead      : usize,
49  /// Stop reading after n packets
50  pub stop_after      : usize,
51  /// Number of encountered duplicates 
52  pub n_duplicates    : usize,
53  /// A cache to allow to quench duplicates 
54  /// pkt counter -> pkt checksum
55  dedup_cache         : HashMap<u16, VecDeque<u16>>
56}
57
58
59impl TelemetryPacketReader {
60  pub fn new(filename_or_directory : String, dedup : bool, start_time : Option<f64>, end_time : Option<f64>) -> Self {
61    let firstfile : String;
62    let re = Regex::new(r"RAW(\d{6})_(\d{6})\.bin$").unwrap();
63    match list_path_contents_sorted(&filename_or_directory, Some(re)) {
64      Err(err) => {
65        error!("{} does not seem to be either a valid directory or an existing file! {err}", filename_or_directory);
66        panic!("Unable to open files!");
67      }
68      Ok(mut files) => {
69        
70        if let Some(start) = start_time {
71          info!("Removing files earlier than {}", start);
72          files.retain(|x| { get_unix_timestamp_from_telemetry(&x).unwrap() as f64 >= start} );
73        }
74        if let Some(end)   = end_time {
75          info!("Removing files later than {}", end);
76          files.retain(|x| { get_unix_timestamp_from_telemetry(&x).unwrap() as f64 <= end } ); 
77        }
78        firstfile = files[0].clone();
79        match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
80          Err(err) => {
81            error!("Unable to open file {firstfile}! {err}");
82            panic!("Unable to create reader from {filename_or_directory}!");
83          }
84          Ok(file) => {
85            // prime the cache 
86            let mut dedup_cache = HashMap::<u16, VecDeque<u16>>::with_capacity(u16::MAX as usize + 1);  
87            for k in 0..u16::MAX as usize + 1 {
88              dedup_cache.insert(k as u16, VecDeque::<u16>::with_capacity(4));
89            }
90            let packet_reader = Self { 
91              filenames         : files,
92              file_idx          : 0,
93              dedup             : dedup,
94              start_time        : start_time,
95              end_time          : end_time,
96              file_reader       : BufReader::new(file),
97              cursor            : 0,
98              filter            : TelemetryPacketType::Unknown,
99              n_packs_read      : 0,
100              skip_ahead        : 0,
101              stop_after        : 0,
102              n_packs_skipped   : 0,
103              n_duplicates      : 0,
104              dedup_cache       : dedup_cache,
105            };
106            packet_reader
107          }
108        }
109      }
110    }
111  } 
112  
113  pub fn clear_dedup_cache(&mut self) {
114    let mut dedup_cache = HashMap::<u16, VecDeque<u16>>::with_capacity(u16::MAX as usize + 1);  
115    for k in 0..u16::MAX as usize + 1 {
116      dedup_cache.insert(k as u16, VecDeque::<u16>::with_capacity(4));
117    }
118    self.dedup_cache = dedup_cache;
119  }
120
121  /// Preview the number of frames in this reader
122  pub fn count_packets(&mut self) -> (usize, usize, HashMap<TelemetryPacketType,usize>) {
123    let _ = self.rewind();
124    self.clear_dedup_cache();
125    let mut nframes = 0usize;
126    let mut buffer  = [0];
127    let mut incomplete  = 0usize;
128    let mut index   = HashMap::<TelemetryPacketType,usize>::new();
129    for k in TelemetryPacketType::iter() {
130      index.insert(k, 0);
131    }
132    let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
133    let bar_style  = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
134    let bar = ProgressBar::new(self.filenames.len() as u64);
135    bar.set_position(0);
136    bar.set_message (String::from("Counting packets.."));
137    bar.set_prefix  ("\u{2728}");
138    bar.set_style   (bar_style);
139    bar.set_position(self.file_idx as u64);
140    loop {
141      match self.file_reader.read_exact(&mut buffer) {
142        Err(err) => {
143          debug!("Unable to read from file! {err}");
144          match self.prime_next_file() {
145            None    => break,
146            Some(_) => {
147              bar.set_position(self.file_idx as u64);
148              continue;
149            }
150          };
151        }
152        Ok(_) => {
153          self.cursor += 1;
154        }
155      }
156      
157      //thead.sync      = 0x90eb;
158
159      if buffer[0] != 0xeb {
160        continue;
161      } else {
162        match self.file_reader.read_exact(&mut buffer) {
163          Err(err) => {
164            debug!("Unable to read from file! {err}");
165            match self.prime_next_file() {
166              None    => break,
167              Some(_) => {
168                bar.set_position(self.file_idx as u64);
169                continue;
170              }
171            };
172          }
173          Ok(_) => {
174            self.cursor += 1;
175          }
176        }
177        // check if the second byte of the header
178        if buffer[0] != 0x90 { 
179          continue;
180        } else {
181          // read packet type for index
182          match self.file_reader.read_exact(&mut buffer) {
183            Err(err) => {
184              debug!("Unable to read from file! {err}");
185              match self.prime_next_file() {
186                None    => break,
187                Some(_) => {
188                  bar.set_position(self.file_idx as u64);
189                  continue;
190                }
191              };
192            }
193            Ok(_) => {
194              *index.get_mut(&TelemetryPacketType::from(buffer[0])).unwrap() += 1;
195              self.cursor += 1;
196            }
197          }
198          // read the the size of the packet
199          // first we have to skip 6 bytes
200          let mut buffer_skip = [0,0,0,0,0,0,0];
201          match self.file_reader.read_exact(&mut buffer_skip) {
202            Err(err) => {
203              debug!("Unable to read from file! {err}");
204              match self.prime_next_file() {
205                None    => break,
206                Some(_) => {
207                  bar.set_position(self.file_idx as u64);
208                  continue;
209                }
210              };
211            }
212            Ok(_) => {
213              self.cursor += 7;
214            }
215          }
216          let mut buffer_psize = [0,0];
217          match self.file_reader.read_exact(&mut buffer_psize) {
218            Err(_err) => {
219              match self.prime_next_file() {
220                None    => break,
221                Some(_) => {
222                  bar.set_position(self.file_idx as u64);
223                  continue;
224                }
225              }
226            }
227            Ok(_) => {
228              self.cursor += 2;
229            }
230          }
231          let vec_data = buffer_psize.to_vec();
232          let size     = parse_u16(&vec_data, &mut 0);
233          let mut temp_buffer = vec![0; size as usize];
234          // skip 2 more bytes
235          match self.file_reader.read_exact(&mut buffer_psize) {
236            Err(_err) => {
237              match self.prime_next_file() {
238                None    => break,
239                Some(_) => {
240                  bar.set_position(self.file_idx as u64);
241                  continue;
242                }
243              }
244            }
245            Ok(_) => {
246              self.cursor += 2;
247            }
248          }
249          match self.file_reader.read_exact(&mut temp_buffer) { 
250          //match self.file_reader.seek(SeekFrom::Current(size as i64)) {
251          //match self.file_reader.seek_relative(size as i64) {
252            Err(err) => {
253              incomplete += 1;
254              warn!("Unable to read {size} bytes from {}! {err}", self.get_current_filename().unwrap());
255              match self.prime_next_file() {
256                None    => break,
257                Some(_) => {
258                  bar.set_position(self.file_idx as u64);
259                  continue;
260                }
261              }
262            }
263            Ok(_) => {
264              self.cursor += size as usize;
265              nframes += 1;
266            }
267          }
268        }
269      } // if no 0xAA found
270    } // end loop
271    bar.finish_with_message("Done!");
272    let _ = self.rewind();
273    (nframes, incomplete, index)
274  } // end fn
275
276  /// Return the next TelemetryPacket in the stream
277  ///
278  /// Will return none if the file has been exhausted.
279  /// Use ::rewind to start reading from the beginning
280  /// again.
281  pub fn read_next_item(&mut self) -> Option<TelemetryPacket> {
282    // filter::Unknown corresponds to allowing any
283    let mut buffer = [0];
284    loop {
285      match self.file_reader.read_exact(&mut buffer) {
286        Err(err) => {
287          debug!("Unable to read from file! {err}");
288          self.prime_next_file()?;
289          return self.read_next_item();
290        }
291        Ok(_) => {
292          self.cursor += 1;
293        }
294      }
295      if buffer[0] != 0xeb {
296        continue;
297      } else {
298        match self.file_reader.read_exact(&mut buffer) {
299          Err(err) => {
300            debug!("Unable to read from file! {err}");
301            self.prime_next_file()?;
302            return self.read_next_item();
303          }
304          Ok(_) => {
305            self.cursor += 1;
306          }
307        }
308
309        if buffer[0] != 0x90 { 
310          continue;
311        } else {
312          // FIXME - use TofPacketHeader::from_bytestream here
313          // the 3rd byte is the packet type
314          match self.file_reader.read_exact(&mut buffer) {
315             Err(err) => {
316              debug!("Unable to read from file! {err}");
317              self.prime_next_file()?;
318              return self.read_next_item();
319            }
320            Ok(_) => {
321              self.cursor += 1;
322            }
323          }
324          let mut thead     = TelemetryPacketHeader::new();
325          thead.sync        = 0x90eb;
326          thead.packet_type = TelemetryPacketType::from(buffer[0]);
327          let ptype    = TelemetryPacketType::from(buffer[0]);
328          // read the the size of the packet
329          let mut buffer_ts = [0,0,0,0];
330          match self.file_reader.read_exact(&mut buffer_ts) {
331            Err(err) => {
332              debug!("Unable to read from file! {err}");
333              self.prime_next_file()?;
334              return self.read_next_item();
335            }
336            Ok(_) => {
337              self.cursor += 4;
338              thead.timestamp = u32::from_le_bytes(buffer_ts);
339            }
340          }
341          let mut buffer_counter = [0,0];
342          match self.file_reader.read_exact(&mut buffer_counter) {
343            Err(err) => {
344              debug!("Unable to read from file! {err}");
345              self.prime_next_file()?;
346              return self.read_next_item();
347            }
348            Ok(_) => {
349              self.cursor += 2;
350              thead.counter   = u16::from_le_bytes(buffer_counter);
351            }
352          }
353          let mut buffer_length = [0,0];
354          match self.file_reader.read_exact(&mut buffer_length) {
355            Err(err) => {
356              debug!("Unable to read from file! {err}");
357              return None;
358            }
359            Ok(_) => {
360              self.cursor += 2;
361              thead.length    = u16::from_le_bytes(buffer_length);
362            }
363          }
364          let mut buffer_checksum = [0,0];
365          match self.file_reader.read_exact(&mut buffer_checksum) {
366            Err(err) => {
367              debug!("Unable to read from file! {err}");
368              self.prime_next_file()?;
369              return self.read_next_item();
370            }
371            Ok(_) => {
372              self.cursor += 2;
373              thead.checksum    = u16::from_le_bytes(buffer_checksum);
374            }
375          }
376          
377          let mut size     = thead.length;
378          // This size includes the header
379          if (size as usize) < TelemetryPacketHeader::SIZE {
380            error!("This packet might be empty or corrupt!");
381            return None;
382          }
383          size -= TelemetryPacketHeader::SIZE as u16;
384          if ptype != self.filter && self.filter != TelemetryPacketType::Unknown {
385            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
386              Err(err) => {
387                debug!("Unable to read more data! {err}");
388                self.prime_next_file()?;
389                return self.read_next_item();
390              }
391              Ok(_) => {
392                self.cursor += size as usize;
393              }
394            }
395            continue; // this is just not the packet we want
396          }
397          // now at this point, we want the packet!
398          // except we skip ahead or stop earlier
399          if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
400            // we don't want it
401            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
402              Err(err) => {
403                debug!("Unable to read more data! {err}");
404                self.prime_next_file()?;
405                return self.read_next_item();
406              }
407              Ok(_) => {
408                self.n_packs_skipped += 1;
409                self.cursor += size as usize;
410              }
411            }
412            continue; // this is just not the packet we want
413          }
414          if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
415            // we don't want it
416            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
417              Err(err) => {
418                debug!("Unable to read more data! {err}");
419                self.prime_next_file()?;
420                return self.read_next_item();
421              }
422              Ok(_) => {
423                self.cursor += size as usize;
424              }
425            }
426            continue; // this is just not the packet we want
427          }
428          
429
430          let mut tp = TelemetryPacket::new();
431          tp.header  = thead;
432          
433          //tp.packet_type = ptype;
434          //let mut payload = vec![0u8;TelemetryPacketHeader::SIZE];
435          //match self.file_reader.read_exact(&mut payload) {
436          //  Err(err) => {
437          //    debug!("Unable to read from file! {err}");
438          //    return None;
439          //  }
440          //  Ok(_) => {
441          //    self.cursor += size as usize;
442          //  }
443          //}
444
445          let mut payload = vec![0u8;size as usize];
446          match self.file_reader.read_exact(&mut payload) {
447            Err(err) => {
448              debug!("Unable to read from file! {err}");
449              self.prime_next_file()?;
450              return self.read_next_item();
451            }
452            Ok(_) => {
453              self.cursor += tp.header.length as usize;
454            }
455          }
456
457          tp.payload = payload;
458          self.n_packs_read += 1;
459          // check if the packet has been seen already
460          if self.dedup {
461            let mut will_send       : bool;
462            if self.dedup_cache[&tp.header.counter].len() == 0 {
463              will_send = true;
464            } else {
465              if !self.dedup_cache.contains_key(&tp.header.counter) {
466                panic!("The dedup cache does not contain {}", tp.header.counter);
467              }
468              will_send = true;
469              for checksum in &self.dedup_cache[&tp.header.counter] {
470                if checksum == &tp.header.checksum {
471                  will_send = false;
472                } 
473              }
474            }
475            // this happens when we have seen the packet counter, but not the actual 
476            // checksum
477            if will_send {
478              self.dedup_cache.get_mut(&tp.header.counter).unwrap().push_back(tp.header.checksum);  
479              return Some(tp);
480            } else {
481              // make sure the caches won't get too long, limit 
482              // our selves to 4 times the packet counter rollover 
483              self.n_duplicates += 1;
484              if self.dedup_cache[&tp.header.counter].len() > 4 {
485                self.dedup_cache.get_mut(&tp.header.counter).unwrap().pop_front();
486              }
487
488              return self.read_next_item();
489            }
490          }
491          if self.start_time.is_some() {
492            if tp.header.get_gcutime() < self.start_time.unwrap() {
493              return self.read_next_item(); 
494            }
495          }
496          if self.end_time.is_some() {
497            if tp.header.get_gcutime() > self.end_time.unwrap() {
498              return self.read_next_item();
499            }
500          }
501          return Some(tp);
502        }
503      } // if no 0xAA found
504    } // end loop
505  } // end fn
506}
507
508impl Default for TelemetryPacketReader {
509  fn default() -> Self {
510    TelemetryPacketReader::new(String::from(""), false, None, None)
511  }
512}
513
514impl fmt::Display for TelemetryPacketReader {
515  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
516    let mut range_repr = String::from("");
517    if self.skip_ahead > 0 {
518      range_repr += &(format!("({}", self.skip_ahead));
519    } else {
520      range_repr += "(";
521    }
522    if self.stop_after > 0 {
523      range_repr += &(format!("..{})", self.stop_after));
524    } else {
525      range_repr += "..)";
526    }
527    let repr = format!("<TelemetryPacketReader : read {} packets, filter {}, range {},\n files {:?}>", self.n_packs_read, self.filter, range_repr, self.filenames);
528    write!(f, "{}", repr)
529  }
530}
531
532reader!(TelemetryPacketReader, TelemetryPacket);
533
534#[cfg(feature="pybindings")]
535#[pymethods]
536impl TelemetryPacketReader {
537
538  #[new]
539  #[pyo3(signature = (filenames_or_directory, dedup = true, start_time = None, end_time = None))]
540  fn new_py(filenames_or_directory : &Bound<'_,PyAny>, dedup : bool, start_time : Option<f64>, end_time : Option<f64>) -> PyResult<Self> {
541    
542    let mut string_value = String::from("foo");
543    let mut fnames       = Vec::<String>::new();
544    if let Ok(s) = filenames_or_directory.extract::<String>() {
545       string_value = s;
546    } //else if let Ok(p) = filename_or_directory.extract::<&Path>() {
547    if let Ok(fspath_method) = filenames_or_directory.getattr("__fspath__") {
548      if let Ok(fspath_result) = fspath_method.call0() {
549        if let Ok(py_string) = fspath_result.extract::<String>() {
550          string_value = py_string;
551        }
552      }
553    }
554    if let Ok(list) = filenames_or_directory.extract::<Vec<String>>() {
555      for k in list {
556          fnames.push(k);
557      } //else if let Ok(p) = filename_or_directory.extract::<&Path>() {
558      //  if let Ok(py_pth_mth) = k.getattr("__fspath__") {
559      //    if let Ok(py_pth_rs) = py_pth_mth.call0(py_pth_mth) {
560      //      if let Ok(py_string) = py_pth_rs.extract::<String>() {
561      //        fnames.push(py_string);
562      //      }
563      //    }
564      //  }
565      //}
566    }
567    let mut reader : Self;
568    if fnames.len() > 0 {
569      string_value = fnames[0].clone();
570      reader = Self::new(string_value, dedup, start_time, end_time);
571      reader.filenames = fnames;
572    } else {
573      reader = Self::new(string_value, dedup, start_time, end_time);
574    }
575    Ok(reader)
576    //match Self::new(&string_value) {
577    //  Err(err) => {
578    //    return Err(PyValueError::new_err(err.to_string()));
579    //  }
580    //  Ok(reader) => {
581    //    return Ok(reader);
582    //  }
583    //}
584  }
585
586  #[getter]
587  fn get_n_duplicates(&self) -> usize {
588    self.n_duplicates
589  }
590
591  #[pyo3(name = "count_packets")]
592  fn count_packets_py(&mut self) -> (usize,usize,HashMap<TelemetryPacketType,usize>) {
593    self.count_packets()
594  }
595
596  //#[getter]
597  //fn first(&mut self) -> Option<TofPacket> {
598  //  self.first_packet()
599  //}
600
601  //#[getter]
602  //fn last(&mut self) -> Option<TofPacket> {
603  //  self.last_packet()
604  //}
605 
606  #[getter]
607  fn filenames(&self) -> Vec<String> {
608    self.filenames.clone()
609  }
610  //#[pyo3(name="set_tracker_calibrations_from_fnames")]
611  //#[pyo3(signature = (mask = None, pedestal = None, transfer_fn = None, cmn_noise = None))]
612  //fn set_tracker_calibrations_from_fnames_py(&mut self,
613  //                                            mask        : Option<String>,
614  //                                            pedestal    : Option<String>,
615  //                                            transfer_fn : Option<String>,
616  //                                            cmn_noise   : Option<String>) {
617  //  self.set_tracker_calibrations_from_fnames(mask, pedestal, transfer_fn, cmn_noise);
618  //}
619
620  /// This is the filename we are currently 
621  /// extracting frames from 
622  #[getter]
623  #[pyo3(name="current_filename")]
624  fn get_current_filename_py(&self) -> Option<&str> {
625    self.get_current_filename()
626  }
627
628  /// Start the reader from the beginning
629  /// This is equivalent to a re-initialization
630  /// of that reader.
631  #[pyo3(name="rewind")]
632  fn rewind_py(&mut self) -> PyResult<()> {
633    // HOTFIX to avoid python segfault. However, this has to go 
634    // into the rust rewind as well!
635    self.clear_dedup_cache();
636    match self.rewind() {
637      Err(err) => {
638        return Err(PyValueError::new_err(err.to_string()));
639      }
640      Ok(_) => Ok(())
641    }
642  }
643
644  //#[pyo3(name="count_packets")]
645  //fn count_packets_py(&mut self) -> usize {
646  //  self.count_packets()
647  //}
648
649  fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
650    slf 
651  }
652  
653  fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<TelemetryPacket> {
654    slf.next()
655    //match slf.next() { 
656    //  Some(packet) => {
657    //    return Some(packet)
658    //  }   
659    //  None => {
660    //    return None;
661    //  }   
662    //}   
663  }
664}
665
666#[cfg(feature="pybindings")]
667pythonize_display!(TelemetryPacketReader);
668
669