gondola_core/io/
telemetry_reader.rs

1//! The TelemetryPacketReader allows to read a (file) stream of serialized
2//! TelemetryPackets, which are typically the .bin files as generated 
3//! by the gcu
4// This file is part of gaps-online-software and published 
5// under the GPLv3 license
6
7use std::io::{
8  BufReader,
9  Read,
10  Seek,
11  SeekFrom,
12};
13use std::cmp::Ordering;
14
15use crate::prelude::*;
16
17/// Read serialized TelemetryPackets from an existing file
18///
19/// Read GAPS binary files ("Berkeley binaries)
20#[derive(Debug)]
21#[cfg_attr(feature="pybindings", pyclass)]
22pub struct TelemetryPacketReader {
23  /// Reader will emit packets from these files,
24  /// if one file is exhausted, it moves on to 
25  /// the next file automatically
26  pub filenames        : Vec<String>,
27  /// The index of the file the reader is 
28  /// currently reading
29  pub file_idx         : usize,
30  /// depending on the source of the telemetry files, 
31  /// there might be duplicates, because we get them
32  /// over different streams. 
33  /// Suppress these multiple packets
34  pub dedup           : bool,
35  /// Ignore packets that have a gcu time earlier than start_time 
36  pub start_time      : Option<f64>,
37  /// Ignore packets that have a gcu time later than end_time
38  pub end_time        : Option<f64>,
39  file_reader         : BufReader<File>,
40  /// Current (byte) position in the file
41  cursor              : usize,
42  /// Read only packets of type == PacketType
43  pub filter          : TelemetryPacketType,
44  /// Number of read packets
45  n_packs_read        : usize,
46  /// Number of skipped packets
47  n_packs_skipped     : usize,
48  /// Skip the first n packets
49  pub skip_ahead      : usize,
50  /// Stop reading after n packets
51  pub stop_after      : usize,
52  /// Number of encountered duplicates 
53  pub n_duplicates    : usize,
54  /// A cache to allow to quench duplicates 
55  /// pkt counter -> pkt checksum
56  dedup_cache         : HashMap<u16, VecDeque<u16>>,
57  /// If ::cache_all_packets is called, this will hold 
58  /// all TelemetryPackets sorted by timestamp and 
59  /// packet counter
60  pub packet_cache    : Vec<TelemetryPacket>,
61  /// Geometry of each TOF paddle
62  /// e.g. paddles 
63  pub tof_paddles     : Arc<HashMap<u8,TofPaddle>>,
64  /// Geometry of each tracker strip
65  pub trk_strips      : Arc<HashMap<u32, TrackerStrip>>,
66}
67
68
69impl TelemetryPacketReader {
70  pub fn new(filename_or_directory : String, dedup : bool, start_time : Option<f64>, end_time : Option<f64>) -> Self {
71    #[cfg(feature="database")]
72    let mut paddles  = HashMap::<u8, TofPaddle>::new();
73    #[cfg(not(feature="database"))]
74    let paddles = HashMap::<u8, TofPaddle>::new();
75    #[cfg(feature="database")]
76    let mut strips  = HashMap::<u32, TrackerStrip>::with_capacity(11520);
77    #[cfg(not(feature="database"))]
78    let strips  = HashMap::<u32, TrackerStrip>::with_capacity(11520);
79    #[cfg(feature="database")]
80    match TofPaddle::all_as_dict() {
81      Err(err) => {
82        error!("Unable to retrieve paddle information from DB! {err}");
83      }
84      Ok(pdls) => {
85        paddles   = pdls;         
86      }
87    }
88    #[cfg(feature="database")]
89    match TrackerStrip::all_as_dict() {
90      Err(err) => {
91        error!("Unable to retrieve tracker strip information from DB! {err}");
92        // if strips and paddles do not work, something is utterly fisy
93        //db_loaded = false;
94      }
95      Ok(strips_) => {
96        strips   = strips_;         
97      }
98    }
99    let firstfile : String;
100    let re = Regex::new(r"RAW(\d{6})_(\d{6})\.bin$").unwrap();
101    match list_path_contents_sorted(&filename_or_directory, Some(re)) {
102      Err(err) => {
103        error!("{} does not seem to be either a valid directory or an existing file! {err}", filename_or_directory);
104        panic!("Unable to open files!");
105      }
106      Ok(mut files) => {
107        
108        if let Some(start) = start_time {
109          info!("Removing files earlier than {}", start);
110          files.retain(|x| { get_unix_timestamp_from_telemetry(&x).unwrap() as f64 >= start} );
111        }
112        if let Some(end)   = end_time {
113          info!("Removing files later than {}", end);
114          files.retain(|x| { get_unix_timestamp_from_telemetry(&x).unwrap() as f64 <= end } ); 
115        }
116        firstfile = files[0].clone();
117        match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
118          Err(err) => {
119            error!("Unable to open file {firstfile}! {err}");
120            panic!("Unable to create reader from {filename_or_directory}!");
121          }
122          Ok(file) => {
123            // prime the cache 
124            let mut dedup_cache = HashMap::<u16, VecDeque<u16>>::with_capacity(u16::MAX as usize + 1);  
125            for k in 0..u16::MAX as usize + 1 {
126              dedup_cache.insert(k as u16, VecDeque::<u16>::with_capacity(4));
127            }
128            let packet_reader = Self { 
129              filenames         : files,
130              file_idx          : 0,
131              dedup             : dedup,
132              start_time        : start_time,
133              end_time          : end_time,
134              file_reader       : BufReader::new(file),
135              cursor            : 0,
136              filter            : TelemetryPacketType::Unknown,
137              n_packs_read      : 0,
138              skip_ahead        : 0,
139              stop_after        : 0,
140              n_packs_skipped   : 0,
141              n_duplicates      : 0,
142              dedup_cache       : dedup_cache,
143              packet_cache      : Vec::<TelemetryPacket>::new(),
144              tof_paddles       : Arc::new(paddles),
145              trk_strips        : Arc::new(strips),
146            };
147            packet_reader
148          }
149        }
150      }
151    }
152  } 
153 
154  /// Instead of reading packets one at a time, we can read the entire input 
155  /// at once and keep it in memory. This allows to sort the packeges.
156  ///
157  /// This comes with a performance cost and extended memory needs, however, 
158  /// it might be helpful for debugging
159  pub fn cache_all_packets(&mut self) {
160    loop {
161      match self.read_next_item() {
162        None => {
163          info!("Read all packets!");
164          break;
165        }
166        Some(pack) => {
167          self.packet_cache.push(pack);
168        }
169      }
170    }
171    // sort the packet cache by timestamp and counter of 
172    // the header 
173    self.packet_cache.sort_by(|a,b|{
174      b.header.get_gcutime().partial_cmp(&a.header.get_gcutime()).unwrap_or(Ordering::Equal)  
175      .then(b.header.counter.cmp(&a.header.counter))
176    });
177    // reverse the vector, so that the first packet gets 
178    // returned first 
179    self.packet_cache.reverse();
180  }
181
182  pub fn clear_dedup_cache(&mut self) {
183    let mut dedup_cache = HashMap::<u16, VecDeque<u16>>::with_capacity(u16::MAX as usize + 1);  
184    for k in 0..u16::MAX as usize + 1 {
185      dedup_cache.insert(k as u16, VecDeque::<u16>::with_capacity(4));
186    }
187    self.dedup_cache = dedup_cache;
188  }
189
190  /// Preview the number of frames in this reader
191  pub fn count_packets(&mut self) -> (usize, usize, HashMap<TelemetryPacketType,usize>) {
192    let _ = self.rewind();
193    self.clear_dedup_cache();
194    let mut nframes = 0usize;
195    let mut buffer  = [0];
196    let mut incomplete  = 0usize;
197    let mut index   = HashMap::<TelemetryPacketType,usize>::new();
198    for k in TelemetryPacketType::iter() {
199      index.insert(k, 0);
200    }
201    let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
202    let bar_style  = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
203    let bar = ProgressBar::new(self.filenames.len() as u64);
204    bar.set_position(0);
205    bar.set_message (String::from("Counting packets.."));
206    bar.set_prefix  ("\u{2728}");
207    bar.set_style   (bar_style);
208    bar.set_position(self.file_idx as u64);
209    loop {
210      match self.file_reader.read_exact(&mut buffer) {
211        Err(err) => {
212          debug!("Unable to read from file! {err}");
213          match self.prime_next_file() {
214            None    => break,
215            Some(_) => {
216              bar.set_position(self.file_idx as u64);
217              continue;
218            }
219          };
220        }
221        Ok(_) => {
222          self.cursor += 1;
223        }
224      }
225      
226      //thead.sync      = 0x90eb;
227
228      if buffer[0] != 0xeb {
229        continue;
230      } else {
231        match self.file_reader.read_exact(&mut buffer) {
232          Err(err) => {
233            debug!("Unable to read from file! {err}");
234            match self.prime_next_file() {
235              None    => break,
236              Some(_) => {
237                bar.set_position(self.file_idx as u64);
238                continue;
239              }
240            };
241          }
242          Ok(_) => {
243            self.cursor += 1;
244          }
245        }
246        // check if the second byte of the header
247        if buffer[0] != 0x90 { 
248          continue;
249        } else {
250          // read packet type for index
251          match self.file_reader.read_exact(&mut buffer) {
252            Err(err) => {
253              debug!("Unable to read from file! {err}");
254              match self.prime_next_file() {
255                None    => break,
256                Some(_) => {
257                  bar.set_position(self.file_idx as u64);
258                  continue;
259                }
260              };
261            }
262            Ok(_) => {
263              *index.get_mut(&TelemetryPacketType::from(buffer[0])).unwrap() += 1;
264              self.cursor += 1;
265            }
266          }
267          // read the the size of the packet
268          // first we have to skip 6 bytes
269          let mut buffer_skip = [0,0,0,0,0,0];
270          match self.file_reader.read_exact(&mut buffer_skip) {
271            Err(err) => {
272              debug!("Unable to read from file! {err}");
273              match self.prime_next_file() {
274                None    => break,
275                Some(_) => {
276                  bar.set_position(self.file_idx as u64);
277                  continue;
278                }
279              };
280            }
281            Ok(_) => {
282              self.cursor += 6;
283            }
284          }
285          let mut buffer_psize = [0,0];
286          match self.file_reader.read_exact(&mut buffer_psize) {
287            Err(_err) => {
288              match self.prime_next_file() {
289                None    => break,
290                Some(_) => {
291                  bar.set_position(self.file_idx as u64);
292                  continue;
293                }
294              }
295            }
296            Ok(_) => {
297              self.cursor += 2;
298            }
299          }
300          let vec_data = buffer_psize.to_vec();
301          // packet size is the size including the header, so for the 
302          // payload only we have to subtract that.
303          let size     = parse_u16(&vec_data, &mut 0) - 13;
304          let mut temp_buffer = vec![0; size as usize];
305          // skip 2 more bytes for the header checksum
306          match self.file_reader.read_exact(&mut buffer_psize) {
307            Err(_err) => {
308              match self.prime_next_file() {
309                None    => break,
310                Some(_) => {
311                  bar.set_position(self.file_idx as u64);
312                  continue;
313                }
314              }
315            }
316            Ok(_) => {
317              self.cursor += 2;
318            }
319          }
320          match self.file_reader.read_exact(&mut temp_buffer) { 
321          //match self.file_reader.seek(SeekFrom::Current(size as i64)) {
322          //match self.file_reader.seek_relative(size as i64) {
323            Err(err) => {
324              incomplete += 1;
325              warn!("Unable to read {size} bytes from {}! {err}", self.get_current_filename().unwrap());
326              match self.prime_next_file() {
327                None    => break,
328                Some(_) => {
329                  bar.set_position(self.file_idx as u64);
330                  continue;
331                }
332              }
333            }
334            Ok(_) => {
335              self.cursor += size as usize;
336              nframes += 1;
337            }
338          }
339        }
340      } // if no 0xAA found
341    } // end loop
342    bar.finish_with_message("Done!");
343    let _ = self.rewind();
344    (nframes, incomplete, index)
345  } // end fn
346
347  /// Return the next TelemetryPacket in the stream
348  ///
349  /// Will return none if the file has been exhausted.
350  /// Use ::rewind to start reading from the beginning
351  /// again.
352  pub fn read_next_item(&mut self) -> Option<TelemetryPacket> {
353    // filter::Unknown corresponds to allowing any
354    let mut buffer = [0];
355    loop {
356      match self.file_reader.read_exact(&mut buffer) {
357        Err(err) => {
358          debug!("Unable to read from file! {err}");
359          self.prime_next_file()?;
360          return self.read_next_item();
361        }
362        Ok(_) => {
363          self.cursor += 1;
364        }
365      }
366      if buffer[0] != 0xeb {
367        continue;
368      } else {
369        match self.file_reader.read_exact(&mut buffer) {
370          Err(err) => {
371            debug!("Unable to read from file! {err}");
372            self.prime_next_file()?;
373            return self.read_next_item();
374          }
375          Ok(_) => {
376            self.cursor += 1;
377          }
378        }
379
380        if buffer[0] != 0x90 { 
381          continue;
382        } else {
383          // FIXME - use TofPacketHeader::from_bytestream here
384          // the 3rd byte is the packet type
385          match self.file_reader.read_exact(&mut buffer) {
386             Err(err) => {
387              debug!("Unable to read from file! {err}");
388              self.prime_next_file()?;
389              return self.read_next_item();
390            }
391            Ok(_) => {
392              self.cursor += 1;
393            }
394          }
395          let mut thead     = TelemetryPacketHeader::new();
396          thead.sync        = 0x90eb;
397          thead.packet_type = TelemetryPacketType::from(buffer[0]);
398          //let ptype    = TelemetryPacketType::from(buffer[0]);
399          let mut buffer_ts = [0,0,0,0];
400          match self.file_reader.read_exact(&mut buffer_ts) {
401            Err(err) => {
402              debug!("Unable to read from file! {err}");
403              self.prime_next_file()?;
404              return self.read_next_item();
405            }
406            Ok(_) => {
407              self.cursor += 4;
408              thead.timestamp = u32::from_le_bytes(buffer_ts);
409            }
410          }
411          let mut buffer_counter = [0,0];
412          match self.file_reader.read_exact(&mut buffer_counter) {
413            Err(err) => {
414              debug!("Unable to read from file! {err}");
415              self.prime_next_file()?;
416              return self.read_next_item();
417            }
418            Ok(_) => {
419              self.cursor += 2;
420              thead.counter   = u16::from_le_bytes(buffer_counter);
421            }
422          }
423          let mut buffer_length = [0,0];
424          match self.file_reader.read_exact(&mut buffer_length) {
425            Err(err) => {
426              debug!("Unable to read from file! {err}");
427              return None;
428            }
429            Ok(_) => {
430              self.cursor += 2;
431              thead.length    = u16::from_le_bytes(buffer_length);
432            }
433          }
434          let mut buffer_checksum = [0,0];
435          match self.file_reader.read_exact(&mut buffer_checksum) {
436            Err(err) => {
437              debug!("Unable to read from file! {err}");
438              self.prime_next_file()?;
439              return self.read_next_item();
440            }
441            Ok(_) => {
442              self.cursor += 2;
443              thead.checksum    = u16::from_le_bytes(buffer_checksum);
444            }
445          }
446          
447          let mut size     = thead.length;
448          // This size includes the header
449          if (size as usize) < TelemetryPacketHeader::SIZE {
450            error!("This packet might be empty or corrupt!");
451            return None;
452          }
453          size -= TelemetryPacketHeader::SIZE as u16;
454          if thead.packet_type != self.filter && self.filter != TelemetryPacketType::Unknown {
455            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
456              Err(err) => {
457                debug!("Unable to read more data! {err}");
458                self.prime_next_file()?;
459                return self.read_next_item();
460              }
461              Ok(_) => {
462                self.cursor += size as usize;
463              }
464            }
465            continue; // this is just not the packet we want
466          }
467          // now at this point, we want the packet!
468          // except we skip ahead or stop earlier
469          if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
470            // we don't want it
471            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
472              Err(err) => {
473                debug!("Unable to read more data! {err}");
474                self.prime_next_file()?;
475                return self.read_next_item();
476              }
477              Ok(_) => {
478                self.n_packs_skipped += 1;
479                self.cursor += size as usize;
480              }
481            }
482            continue; // this is just not the packet we want
483          }
484          if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
485            // we don't want it
486            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
487              Err(err) => {
488                debug!("Unable to read more data! {err}");
489                self.prime_next_file()?;
490                return self.read_next_item();
491              }
492              Ok(_) => {
493                self.cursor += size as usize;
494              }
495            }
496            continue; // this is just not the packet we want
497          }
498          
499
500          let mut tp = TelemetryPacket::new();
501          tp.header  = thead;
502          
503          //tp.packet_type = ptype;
504          //let mut payload = vec![0u8;TelemetryPacketHeader::SIZE];
505          //match self.file_reader.read_exact(&mut payload) {
506          //  Err(err) => {
507          //    debug!("Unable to read from file! {err}");
508          //    return None;
509          //  }
510          //  Ok(_) => {
511          //    self.cursor += size as usize;
512          //  }
513          //}
514
515          let mut payload = vec![0u8;size as usize];
516          match self.file_reader.read_exact(&mut payload) {
517            Err(err) => {
518              debug!("Unable to read from file! {err}");
519              self.prime_next_file()?;
520              return self.read_next_item();
521            }
522            Ok(_) => {
523              self.cursor += tp.header.length as usize;
524            }
525          }
526
527          tp.payload = payload;
528          if tp.header.packet_type == TelemetryPacketType::InterestingEvent 
529          || tp.header.packet_type == TelemetryPacketType::BoringEvent 
530          || tp.header.packet_type == TelemetryPacketType::NoGapsTriggerEvent {
531            tp.tof_paddles = self.tof_paddles.clone();
532            tp.trk_strips  = self.trk_strips.clone();
533          }
534          self.n_packs_read += 1;
535          // check if the packet has been seen already
536          if self.dedup {
537            let mut will_send       : bool;
538            if self.dedup_cache[&tp.header.counter].len() == 0 {
539              will_send = true;
540            } else {
541              if !self.dedup_cache.contains_key(&tp.header.counter) {
542                panic!("The dedup cache does not contain {}", tp.header.counter);
543              }
544              will_send = true;
545              for checksum in &self.dedup_cache[&tp.header.counter] {
546                if checksum == &tp.header.checksum {
547                  will_send = false;
548                } 
549              }
550            }
551            // this happens when we have seen the packet counter, but not the actual 
552            // checksum
553            if will_send {
554              self.dedup_cache.get_mut(&tp.header.counter).unwrap().push_back(tp.header.checksum);  
555              return Some(tp);
556            } else {
557              // make sure the caches won't get too long, limit 
558              // our selves to 4 times the packet counter rollover 
559              self.n_duplicates += 1;
560              if self.dedup_cache[&tp.header.counter].len() > 4 {
561                self.dedup_cache.get_mut(&tp.header.counter).unwrap().pop_front();
562              }
563
564              return self.read_next_item();
565            }
566          }
567          if self.start_time.is_some() {
568            if tp.header.get_gcutime() < self.start_time.unwrap() {
569              return self.read_next_item(); 
570            }
571          }
572          if self.end_time.is_some() {
573            if tp.header.get_gcutime() > self.end_time.unwrap() {
574              return self.read_next_item();
575            }
576          }
577          return Some(tp);
578        }
579      } // if no 0xAA found
580    } // end loop
581  } // end fn
582}
583
584impl Default for TelemetryPacketReader {
585  fn default() -> Self {
586    TelemetryPacketReader::new(String::from(""), false, None, None)
587  }
588}
589
590impl fmt::Display for TelemetryPacketReader {
591  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
592    let mut range_repr = String::from("");
593    if self.skip_ahead > 0 {
594      range_repr += &(format!("({}", self.skip_ahead));
595    } else {
596      range_repr += "(";
597    }
598    if self.stop_after > 0 {
599      range_repr += &(format!("..{})", self.stop_after));
600    } else {
601      range_repr += "..)";
602    }
603    let repr = format!("<TelemetryPacketReader : read {} packets, filter {}, range {},\n files {:?}>", self.n_packs_read, self.filter, range_repr, self.filenames);
604    write!(f, "{}", repr)
605  }
606}
607
608reader!(TelemetryPacketReader, TelemetryPacket);
609
610#[cfg(feature="pybindings")]
611#[pymethods]
612impl TelemetryPacketReader {
613
614  #[new]
615  #[pyo3(signature = (filenames_or_directory, dedup = false, start_time = None, end_time = None))]
616  fn new_py(filenames_or_directory : &Bound<'_,PyAny>, dedup : bool, start_time : Option<f64>, end_time : Option<f64>) -> PyResult<Self> {
617    
618    let mut string_value = String::from("foo");
619    let mut fnames       = Vec::<String>::new();
620    if let Ok(s) = filenames_or_directory.extract::<String>() {
621       string_value = s;
622    } //else if let Ok(p) = filename_or_directory.extract::<&Path>() {
623    if let Ok(fspath_method) = filenames_or_directory.getattr("__fspath__") {
624      if let Ok(fspath_result) = fspath_method.call0() {
625        if let Ok(py_string) = fspath_result.extract::<String>() {
626          string_value = py_string;
627        }
628      }
629    }
630    if let Ok(list) = filenames_or_directory.extract::<Vec<String>>() {
631      for k in list {
632          fnames.push(k);
633      } //else if let Ok(p) = filename_or_directory.extract::<&Path>() {
634      //  if let Ok(py_pth_mth) = k.getattr("__fspath__") {
635      //    if let Ok(py_pth_rs) = py_pth_mth.call0(py_pth_mth) {
636      //      if let Ok(py_string) = py_pth_rs.extract::<String>() {
637      //        fnames.push(py_string);
638      //      }
639      //    }
640      //  }
641      //}
642    }
643    let mut reader : Self;
644    if fnames.len() > 0 {
645      string_value = fnames[0].clone();
646      reader = Self::new(string_value, dedup, start_time, end_time);
647      reader.filenames = fnames;
648    } else {
649      reader = Self::new(string_value, dedup, start_time, end_time);
650    }
651    Ok(reader)
652    //match Self::new(&string_value) {
653    //  Err(err) => {
654    //    return Err(PyValueError::new_err(err.to_string()));
655    //  }
656    //  Ok(reader) => {
657    //    return Ok(reader);
658    //  }
659    //}
660  }
661
662  #[getter]
663  fn get_n_duplicates(&self) -> usize {
664    self.n_duplicates
665  }
666
667  #[pyo3(name = "count_packets")]
668  fn count_packets_py(&mut self) -> (usize,usize,HashMap<TelemetryPacketType,usize>) {
669    self.count_packets()
670  }
671
672  #[pyo3(name = "cache_all_packets")]
673  fn cache_all_packets_py(&mut self) {
674    self.cache_all_packets();
675  }
676
677  /// Retrieve a copy of the internal packet cache.
678  /// This will only yield a meaningful result after 
679  /// a call to .cache_all_packets(). Since the entire
680  /// cache is copied in the processs, this is slow 
681  /// and might only be helpful for debugging. 
682  #[pyo3(name = "copy_packet_cache")]
683  fn copy_packet_cache(&self) -> Vec<TelemetryPacket> {
684    self.packet_cache.clone()
685  }
686
687  //#[getter]
688  //fn first(&mut self) -> Option<TofPacket> {
689  //  self.first_packet()
690  //}
691
692  //#[getter]
693  //fn last(&mut self) -> Option<TofPacket> {
694  //  self.last_packet()
695  //}
696 
697  #[getter]
698  fn filenames(&self) -> Vec<String> {
699    self.filenames.clone()
700  }
701  //#[pyo3(name="set_tracker_calibrations_from_fnames")]
702  //#[pyo3(signature = (mask = None, pedestal = None, transfer_fn = None, cmn_noise = None))]
703  //fn set_tracker_calibrations_from_fnames_py(&mut self,
704  //                                            mask        : Option<String>,
705  //                                            pedestal    : Option<String>,
706  //                                            transfer_fn : Option<String>,
707  //                                            cmn_noise   : Option<String>) {
708  //  self.set_tracker_calibrations_from_fnames(mask, pedestal, transfer_fn, cmn_noise);
709  //}
710
711  /// This is the filename we are currently 
712  /// extracting frames from 
713  #[getter]
714  #[pyo3(name="current_filename")]
715  fn get_current_filename_py(&self) -> Option<&str> {
716    self.get_current_filename()
717  }
718
719  /// Start the reader from the beginning
720  /// This is equivalent to a re-initialization
721  /// of that reader.
722  #[pyo3(name="rewind")]
723  fn rewind_py(&mut self) -> PyResult<()> {
724    // HOTFIX to avoid python segfault. However, this has to go 
725    // into the rust rewind as well!
726    self.clear_dedup_cache();
727    match self.rewind() {
728      Err(err) => {
729        return Err(PyValueError::new_err(err.to_string()));
730      }
731      Ok(_) => Ok(())
732    }
733  }
734
735  //#[pyo3(name="count_packets")]
736  //fn count_packets_py(&mut self) -> usize {
737  //  self.count_packets()
738  //}
739
740  fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
741    slf 
742  }
743  
744  fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<TelemetryPacket> {
745    slf.next()
746    //match slf.next() { 
747    //  Some(packet) => {
748    //    return Some(packet)
749    //  }   
750    //  None => {
751    //    return None;
752    //  }   
753    //}   
754  }
755}
756
757#[cfg(feature="pybindings")]
758pythonize_display!(TelemetryPacketReader);
759
760