gondola_core/io/
telemetry_reader.rs

1//! The TelemetryPacketReader allows to read a (file) stream of serialized
2//! TelemetryPackets, which are typically the .bin files as generated 
3//! by the gcu
4// This file is part of gaps-online-software and published 
5// under the GPLv3 license
6
7use std::io::{
8  BufReader,
9  Read,
10  Seek,
11  SeekFrom,
12};
13use std::cmp::Ordering;
14
15use crate::prelude::*;
16
17/// Read serialized TelemetryPackets from an existing file
18///
19/// Read GAPS binary files ("Berkeley binaries)
20#[derive(Debug)]
21#[cfg_attr(feature="pybindings", pyclass)]
22pub struct TelemetryPacketReader {
23  /// Reader will emit packets from these files,
24  /// if one file is exhausted, it moves on to 
25  /// the next file automatically
26  pub filenames        : Vec<String>,
27  /// The index of the file the reader is 
28  /// currently reading
29  pub file_idx         : usize,
30  /// depending on the source of the telemetry files, 
31  /// there might be duplicates, because we get them
32  /// over different streams. 
33  /// Suppress these multiple packets
34  pub dedup           : bool,
35  /// Ignore packets that have a gcu time earlier than start_time 
36  pub start_time      : Option<f64>,
37  /// Ignore packets that have a gcu time later than end_time
38  pub end_time        : Option<f64>,
39  file_reader         : BufReader<File>,
40  /// Current (byte) position in the file
41  cursor              : usize,
42  /// Read only packets of type == PacketType
43  pub filter          : TelemetryPacketType,
44  /// Number of read packets
45  n_packs_read        : usize,
46  /// Number of skipped packets
47  n_packs_skipped     : usize,
48  /// Skip the first n packets
49  pub skip_ahead      : usize,
50  /// Stop reading after n packets
51  pub stop_after      : usize,
52  /// Number of encountered duplicates 
53  pub n_duplicates    : usize,
54  /// A cache to allow to quench duplicates 
55  /// pkt counter -> pkt checksum
56  dedup_cache         : HashMap<u16, VecDeque<u16>>,
57  /// If ::cache_all_packets is called, this will hold 
58  /// all TelemetryPackets sorted by timestamp and 
59  /// packet counter
60  pub packet_cache    : Vec<TelemetryPacket> 
61}
62
63
64impl TelemetryPacketReader {
65  pub fn new(filename_or_directory : String, dedup : bool, start_time : Option<f64>, end_time : Option<f64>) -> Self {
66    let firstfile : String;
67    let re = Regex::new(r"RAW(\d{6})_(\d{6})\.bin$").unwrap();
68    match list_path_contents_sorted(&filename_or_directory, Some(re)) {
69      Err(err) => {
70        error!("{} does not seem to be either a valid directory or an existing file! {err}", filename_or_directory);
71        panic!("Unable to open files!");
72      }
73      Ok(mut files) => {
74        
75        if let Some(start) = start_time {
76          info!("Removing files earlier than {}", start);
77          files.retain(|x| { get_unix_timestamp_from_telemetry(&x).unwrap() as f64 >= start} );
78        }
79        if let Some(end)   = end_time {
80          info!("Removing files later than {}", end);
81          files.retain(|x| { get_unix_timestamp_from_telemetry(&x).unwrap() as f64 <= end } ); 
82        }
83        firstfile = files[0].clone();
84        match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
85          Err(err) => {
86            error!("Unable to open file {firstfile}! {err}");
87            panic!("Unable to create reader from {filename_or_directory}!");
88          }
89          Ok(file) => {
90            // prime the cache 
91            let mut dedup_cache = HashMap::<u16, VecDeque<u16>>::with_capacity(u16::MAX as usize + 1);  
92            for k in 0..u16::MAX as usize + 1 {
93              dedup_cache.insert(k as u16, VecDeque::<u16>::with_capacity(4));
94            }
95            let packet_reader = Self { 
96              filenames         : files,
97              file_idx          : 0,
98              dedup             : dedup,
99              start_time        : start_time,
100              end_time          : end_time,
101              file_reader       : BufReader::new(file),
102              cursor            : 0,
103              filter            : TelemetryPacketType::Unknown,
104              n_packs_read      : 0,
105              skip_ahead        : 0,
106              stop_after        : 0,
107              n_packs_skipped   : 0,
108              n_duplicates      : 0,
109              dedup_cache       : dedup_cache,
110              packet_cache      : Vec::<TelemetryPacket>::new()
111            };
112            packet_reader
113          }
114        }
115      }
116    }
117  } 
118 
119  /// Instead of reading packets one at a time, we can read the entire input 
120  /// at once and keep it in memory. This allows to sort the packeges.
121  ///
122  /// This comes with a performance cost and extended memory needs, however, 
123  /// it might be helpful for debugging
124  pub fn cache_all_packets(&mut self) {
125    loop {
126      match self.read_next_item() {
127        None => {
128          info!("Read all packets!");
129          break;
130        }
131        Some(pack) => {
132          self.packet_cache.push(pack);
133        }
134      }
135    }
136    // sort the packet cache by timestamp and counter of 
137    // the header 
138    self.packet_cache.sort_by(|a,b|{
139      b.header.get_gcutime().partial_cmp(&a.header.get_gcutime()).unwrap_or(Ordering::Equal)  
140      .then(b.header.counter.cmp(&a.header.counter))
141    });
142    // reverse the vector, so that the first packet gets 
143    // returned first 
144    self.packet_cache.reverse();
145  }
146
147  pub fn clear_dedup_cache(&mut self) {
148    let mut dedup_cache = HashMap::<u16, VecDeque<u16>>::with_capacity(u16::MAX as usize + 1);  
149    for k in 0..u16::MAX as usize + 1 {
150      dedup_cache.insert(k as u16, VecDeque::<u16>::with_capacity(4));
151    }
152    self.dedup_cache = dedup_cache;
153  }
154
155  /// Preview the number of frames in this reader
156  pub fn count_packets(&mut self) -> (usize, usize, HashMap<TelemetryPacketType,usize>) {
157    let _ = self.rewind();
158    self.clear_dedup_cache();
159    let mut nframes = 0usize;
160    let mut buffer  = [0];
161    let mut incomplete  = 0usize;
162    let mut index   = HashMap::<TelemetryPacketType,usize>::new();
163    for k in TelemetryPacketType::iter() {
164      index.insert(k, 0);
165    }
166    let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
167    let bar_style  = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
168    let bar = ProgressBar::new(self.filenames.len() as u64);
169    bar.set_position(0);
170    bar.set_message (String::from("Counting packets.."));
171    bar.set_prefix  ("\u{2728}");
172    bar.set_style   (bar_style);
173    bar.set_position(self.file_idx as u64);
174    loop {
175      match self.file_reader.read_exact(&mut buffer) {
176        Err(err) => {
177          debug!("Unable to read from file! {err}");
178          match self.prime_next_file() {
179            None    => break,
180            Some(_) => {
181              bar.set_position(self.file_idx as u64);
182              continue;
183            }
184          };
185        }
186        Ok(_) => {
187          self.cursor += 1;
188        }
189      }
190      
191      //thead.sync      = 0x90eb;
192
193      if buffer[0] != 0xeb {
194        continue;
195      } else {
196        match self.file_reader.read_exact(&mut buffer) {
197          Err(err) => {
198            debug!("Unable to read from file! {err}");
199            match self.prime_next_file() {
200              None    => break,
201              Some(_) => {
202                bar.set_position(self.file_idx as u64);
203                continue;
204              }
205            };
206          }
207          Ok(_) => {
208            self.cursor += 1;
209          }
210        }
211        // check if the second byte of the header
212        if buffer[0] != 0x90 { 
213          continue;
214        } else {
215          // read packet type for index
216          match self.file_reader.read_exact(&mut buffer) {
217            Err(err) => {
218              debug!("Unable to read from file! {err}");
219              match self.prime_next_file() {
220                None    => break,
221                Some(_) => {
222                  bar.set_position(self.file_idx as u64);
223                  continue;
224                }
225              };
226            }
227            Ok(_) => {
228              *index.get_mut(&TelemetryPacketType::from(buffer[0])).unwrap() += 1;
229              self.cursor += 1;
230            }
231          }
232          // read the the size of the packet
233          // first we have to skip 6 bytes
234          let mut buffer_skip = [0,0,0,0,0,0];
235          match self.file_reader.read_exact(&mut buffer_skip) {
236            Err(err) => {
237              debug!("Unable to read from file! {err}");
238              match self.prime_next_file() {
239                None    => break,
240                Some(_) => {
241                  bar.set_position(self.file_idx as u64);
242                  continue;
243                }
244              };
245            }
246            Ok(_) => {
247              self.cursor += 6;
248            }
249          }
250          let mut buffer_psize = [0,0];
251          match self.file_reader.read_exact(&mut buffer_psize) {
252            Err(_err) => {
253              match self.prime_next_file() {
254                None    => break,
255                Some(_) => {
256                  bar.set_position(self.file_idx as u64);
257                  continue;
258                }
259              }
260            }
261            Ok(_) => {
262              self.cursor += 2;
263            }
264          }
265          let vec_data = buffer_psize.to_vec();
266          // packet size is the size including the header, so for the 
267          // payload only we have to subtract that.
268          let size     = parse_u16(&vec_data, &mut 0) - 13;
269          let mut temp_buffer = vec![0; size as usize];
270          // skip 2 more bytes for the header checksum
271          match self.file_reader.read_exact(&mut buffer_psize) {
272            Err(_err) => {
273              match self.prime_next_file() {
274                None    => break,
275                Some(_) => {
276                  bar.set_position(self.file_idx as u64);
277                  continue;
278                }
279              }
280            }
281            Ok(_) => {
282              self.cursor += 2;
283            }
284          }
285          match self.file_reader.read_exact(&mut temp_buffer) { 
286          //match self.file_reader.seek(SeekFrom::Current(size as i64)) {
287          //match self.file_reader.seek_relative(size as i64) {
288            Err(err) => {
289              incomplete += 1;
290              warn!("Unable to read {size} bytes from {}! {err}", self.get_current_filename().unwrap());
291              match self.prime_next_file() {
292                None    => break,
293                Some(_) => {
294                  bar.set_position(self.file_idx as u64);
295                  continue;
296                }
297              }
298            }
299            Ok(_) => {
300              self.cursor += size as usize;
301              nframes += 1;
302            }
303          }
304        }
305      } // if no 0xAA found
306    } // end loop
307    bar.finish_with_message("Done!");
308    let _ = self.rewind();
309    (nframes, incomplete, index)
310  } // end fn
311
312  /// Return the next TelemetryPacket in the stream
313  ///
314  /// Will return none if the file has been exhausted.
315  /// Use ::rewind to start reading from the beginning
316  /// again.
317  pub fn read_next_item(&mut self) -> Option<TelemetryPacket> {
318    // filter::Unknown corresponds to allowing any
319    let mut buffer = [0];
320    loop {
321      match self.file_reader.read_exact(&mut buffer) {
322        Err(err) => {
323          debug!("Unable to read from file! {err}");
324          self.prime_next_file()?;
325          return self.read_next_item();
326        }
327        Ok(_) => {
328          self.cursor += 1;
329        }
330      }
331      if buffer[0] != 0xeb {
332        continue;
333      } else {
334        match self.file_reader.read_exact(&mut buffer) {
335          Err(err) => {
336            debug!("Unable to read from file! {err}");
337            self.prime_next_file()?;
338            return self.read_next_item();
339          }
340          Ok(_) => {
341            self.cursor += 1;
342          }
343        }
344
345        if buffer[0] != 0x90 { 
346          continue;
347        } else {
348          // FIXME - use TofPacketHeader::from_bytestream here
349          // the 3rd byte is the packet type
350          match self.file_reader.read_exact(&mut buffer) {
351             Err(err) => {
352              debug!("Unable to read from file! {err}");
353              self.prime_next_file()?;
354              return self.read_next_item();
355            }
356            Ok(_) => {
357              self.cursor += 1;
358            }
359          }
360          let mut thead     = TelemetryPacketHeader::new();
361          thead.sync        = 0x90eb;
362          thead.packet_type = TelemetryPacketType::from(buffer[0]);
363          //let ptype    = TelemetryPacketType::from(buffer[0]);
364          let mut buffer_ts = [0,0,0,0];
365          match self.file_reader.read_exact(&mut buffer_ts) {
366            Err(err) => {
367              debug!("Unable to read from file! {err}");
368              self.prime_next_file()?;
369              return self.read_next_item();
370            }
371            Ok(_) => {
372              self.cursor += 4;
373              thead.timestamp = u32::from_le_bytes(buffer_ts);
374            }
375          }
376          let mut buffer_counter = [0,0];
377          match self.file_reader.read_exact(&mut buffer_counter) {
378            Err(err) => {
379              debug!("Unable to read from file! {err}");
380              self.prime_next_file()?;
381              return self.read_next_item();
382            }
383            Ok(_) => {
384              self.cursor += 2;
385              thead.counter   = u16::from_le_bytes(buffer_counter);
386            }
387          }
388          let mut buffer_length = [0,0];
389          match self.file_reader.read_exact(&mut buffer_length) {
390            Err(err) => {
391              debug!("Unable to read from file! {err}");
392              return None;
393            }
394            Ok(_) => {
395              self.cursor += 2;
396              thead.length    = u16::from_le_bytes(buffer_length);
397            }
398          }
399          let mut buffer_checksum = [0,0];
400          match self.file_reader.read_exact(&mut buffer_checksum) {
401            Err(err) => {
402              debug!("Unable to read from file! {err}");
403              self.prime_next_file()?;
404              return self.read_next_item();
405            }
406            Ok(_) => {
407              self.cursor += 2;
408              thead.checksum    = u16::from_le_bytes(buffer_checksum);
409            }
410          }
411          
412          let mut size     = thead.length;
413          // This size includes the header
414          if (size as usize) < TelemetryPacketHeader::SIZE {
415            error!("This packet might be empty or corrupt!");
416            return None;
417          }
418          size -= TelemetryPacketHeader::SIZE as u16;
419          if thead.packet_type != self.filter && self.filter != TelemetryPacketType::Unknown {
420            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
421              Err(err) => {
422                debug!("Unable to read more data! {err}");
423                self.prime_next_file()?;
424                return self.read_next_item();
425              }
426              Ok(_) => {
427                self.cursor += size as usize;
428              }
429            }
430            continue; // this is just not the packet we want
431          }
432          // now at this point, we want the packet!
433          // except we skip ahead or stop earlier
434          if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
435            // we don't want it
436            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
437              Err(err) => {
438                debug!("Unable to read more data! {err}");
439                self.prime_next_file()?;
440                return self.read_next_item();
441              }
442              Ok(_) => {
443                self.n_packs_skipped += 1;
444                self.cursor += size as usize;
445              }
446            }
447            continue; // this is just not the packet we want
448          }
449          if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
450            // we don't want it
451            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
452              Err(err) => {
453                debug!("Unable to read more data! {err}");
454                self.prime_next_file()?;
455                return self.read_next_item();
456              }
457              Ok(_) => {
458                self.cursor += size as usize;
459              }
460            }
461            continue; // this is just not the packet we want
462          }
463          
464
465          let mut tp = TelemetryPacket::new();
466          tp.header  = thead;
467          
468          //tp.packet_type = ptype;
469          //let mut payload = vec![0u8;TelemetryPacketHeader::SIZE];
470          //match self.file_reader.read_exact(&mut payload) {
471          //  Err(err) => {
472          //    debug!("Unable to read from file! {err}");
473          //    return None;
474          //  }
475          //  Ok(_) => {
476          //    self.cursor += size as usize;
477          //  }
478          //}
479
480          let mut payload = vec![0u8;size as usize];
481          match self.file_reader.read_exact(&mut payload) {
482            Err(err) => {
483              debug!("Unable to read from file! {err}");
484              self.prime_next_file()?;
485              return self.read_next_item();
486            }
487            Ok(_) => {
488              self.cursor += tp.header.length as usize;
489            }
490          }
491
492          tp.payload = payload;
493          self.n_packs_read += 1;
494          // check if the packet has been seen already
495          if self.dedup {
496            let mut will_send       : bool;
497            if self.dedup_cache[&tp.header.counter].len() == 0 {
498              will_send = true;
499            } else {
500              if !self.dedup_cache.contains_key(&tp.header.counter) {
501                panic!("The dedup cache does not contain {}", tp.header.counter);
502              }
503              will_send = true;
504              for checksum in &self.dedup_cache[&tp.header.counter] {
505                if checksum == &tp.header.checksum {
506                  will_send = false;
507                } 
508              }
509            }
510            // this happens when we have seen the packet counter, but not the actual 
511            // checksum
512            if will_send {
513              self.dedup_cache.get_mut(&tp.header.counter).unwrap().push_back(tp.header.checksum);  
514              return Some(tp);
515            } else {
516              // make sure the caches won't get too long, limit 
517              // our selves to 4 times the packet counter rollover 
518              self.n_duplicates += 1;
519              if self.dedup_cache[&tp.header.counter].len() > 4 {
520                self.dedup_cache.get_mut(&tp.header.counter).unwrap().pop_front();
521              }
522
523              return self.read_next_item();
524            }
525          }
526          if self.start_time.is_some() {
527            if tp.header.get_gcutime() < self.start_time.unwrap() {
528              return self.read_next_item(); 
529            }
530          }
531          if self.end_time.is_some() {
532            if tp.header.get_gcutime() > self.end_time.unwrap() {
533              return self.read_next_item();
534            }
535          }
536          return Some(tp);
537        }
538      } // if no 0xAA found
539    } // end loop
540  } // end fn
541}
542
543impl Default for TelemetryPacketReader {
544  fn default() -> Self {
545    TelemetryPacketReader::new(String::from(""), false, None, None)
546  }
547}
548
549impl fmt::Display for TelemetryPacketReader {
550  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
551    let mut range_repr = String::from("");
552    if self.skip_ahead > 0 {
553      range_repr += &(format!("({}", self.skip_ahead));
554    } else {
555      range_repr += "(";
556    }
557    if self.stop_after > 0 {
558      range_repr += &(format!("..{})", self.stop_after));
559    } else {
560      range_repr += "..)";
561    }
562    let repr = format!("<TelemetryPacketReader : read {} packets, filter {}, range {},\n files {:?}>", self.n_packs_read, self.filter, range_repr, self.filenames);
563    write!(f, "{}", repr)
564  }
565}
566
567reader!(TelemetryPacketReader, TelemetryPacket);
568
569#[cfg(feature="pybindings")]
570#[pymethods]
571impl TelemetryPacketReader {
572
573  #[new]
574  #[pyo3(signature = (filenames_or_directory, dedup = false, start_time = None, end_time = None))]
575  fn new_py(filenames_or_directory : &Bound<'_,PyAny>, dedup : bool, start_time : Option<f64>, end_time : Option<f64>) -> PyResult<Self> {
576    
577    let mut string_value = String::from("foo");
578    let mut fnames       = Vec::<String>::new();
579    if let Ok(s) = filenames_or_directory.extract::<String>() {
580       string_value = s;
581    } //else if let Ok(p) = filename_or_directory.extract::<&Path>() {
582    if let Ok(fspath_method) = filenames_or_directory.getattr("__fspath__") {
583      if let Ok(fspath_result) = fspath_method.call0() {
584        if let Ok(py_string) = fspath_result.extract::<String>() {
585          string_value = py_string;
586        }
587      }
588    }
589    if let Ok(list) = filenames_or_directory.extract::<Vec<String>>() {
590      for k in list {
591          fnames.push(k);
592      } //else if let Ok(p) = filename_or_directory.extract::<&Path>() {
593      //  if let Ok(py_pth_mth) = k.getattr("__fspath__") {
594      //    if let Ok(py_pth_rs) = py_pth_mth.call0(py_pth_mth) {
595      //      if let Ok(py_string) = py_pth_rs.extract::<String>() {
596      //        fnames.push(py_string);
597      //      }
598      //    }
599      //  }
600      //}
601    }
602    let mut reader : Self;
603    if fnames.len() > 0 {
604      string_value = fnames[0].clone();
605      reader = Self::new(string_value, dedup, start_time, end_time);
606      reader.filenames = fnames;
607    } else {
608      reader = Self::new(string_value, dedup, start_time, end_time);
609    }
610    Ok(reader)
611    //match Self::new(&string_value) {
612    //  Err(err) => {
613    //    return Err(PyValueError::new_err(err.to_string()));
614    //  }
615    //  Ok(reader) => {
616    //    return Ok(reader);
617    //  }
618    //}
619  }
620
621  #[getter]
622  fn get_n_duplicates(&self) -> usize {
623    self.n_duplicates
624  }
625
626  #[pyo3(name = "count_packets")]
627  fn count_packets_py(&mut self) -> (usize,usize,HashMap<TelemetryPacketType,usize>) {
628    self.count_packets()
629  }
630
631  #[pyo3(name = "cache_all_packets")]
632  fn cache_all_packets_py(&mut self) {
633    self.cache_all_packets();
634  }
635
636  /// Retrieve a copy of the internal packet cache.
637  /// This will only yield a meaningful result after 
638  /// a call to .cache_all_packets(). Since the entire
639  /// cache is copied in the processs, this is slow 
640  /// and might only be helpful for debugging. 
641  #[pyo3(name = "copy_packet_cache")]
642  fn copy_packet_cache(&self) -> Vec<TelemetryPacket> {
643    self.packet_cache.clone()
644  }
645
646  //#[getter]
647  //fn first(&mut self) -> Option<TofPacket> {
648  //  self.first_packet()
649  //}
650
651  //#[getter]
652  //fn last(&mut self) -> Option<TofPacket> {
653  //  self.last_packet()
654  //}
655 
656  #[getter]
657  fn filenames(&self) -> Vec<String> {
658    self.filenames.clone()
659  }
660  //#[pyo3(name="set_tracker_calibrations_from_fnames")]
661  //#[pyo3(signature = (mask = None, pedestal = None, transfer_fn = None, cmn_noise = None))]
662  //fn set_tracker_calibrations_from_fnames_py(&mut self,
663  //                                            mask        : Option<String>,
664  //                                            pedestal    : Option<String>,
665  //                                            transfer_fn : Option<String>,
666  //                                            cmn_noise   : Option<String>) {
667  //  self.set_tracker_calibrations_from_fnames(mask, pedestal, transfer_fn, cmn_noise);
668  //}
669
670  /// This is the filename we are currently 
671  /// extracting frames from 
672  #[getter]
673  #[pyo3(name="current_filename")]
674  fn get_current_filename_py(&self) -> Option<&str> {
675    self.get_current_filename()
676  }
677
678  /// Start the reader from the beginning
679  /// This is equivalent to a re-initialization
680  /// of that reader.
681  #[pyo3(name="rewind")]
682  fn rewind_py(&mut self) -> PyResult<()> {
683    // HOTFIX to avoid python segfault. However, this has to go 
684    // into the rust rewind as well!
685    self.clear_dedup_cache();
686    match self.rewind() {
687      Err(err) => {
688        return Err(PyValueError::new_err(err.to_string()));
689      }
690      Ok(_) => Ok(())
691    }
692  }
693
694  //#[pyo3(name="count_packets")]
695  //fn count_packets_py(&mut self) -> usize {
696  //  self.count_packets()
697  //}
698
699  fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
700    slf 
701  }
702  
703  fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<TelemetryPacket> {
704    slf.next()
705    //match slf.next() { 
706    //  Some(packet) => {
707    //    return Some(packet)
708    //  }   
709    //  None => {
710    //    return None;
711    //  }   
712    //}   
713  }
714}
715
716#[cfg(feature="pybindings")]
717pythonize_display!(TelemetryPacketReader);
718
719