gondola_core/io/
tof_reader.rs

1//! The TofPacketReader allows to read a (file) stream of serialized
2//! TofPackets
3//!
4//!
5// The following file is part of gaps-online-software and published 
6// under the GPLv3 license
7
8use crate::prelude::*;
9
10/// Read serialized TofPackets from an existing file or directory
11///
12/// This can read the "TOF stream" files, typically suffixed with .tof.gaps
13/// These files are typically written by a TofPacketReader instance, e.g. as 
14/// on the TOF flight computer
15#[derive(Debug)]
16#[cfg_attr(feature="pybindings", pyclass)]
17pub struct TofPacketReader {
18  /// Read from this file
19  pub filenames       : Vec<String>,
20  file_reader         : BufReader<File>,
21  /// Current (byte) position in the file
22  cursor              : usize,
23  /// Read only packets of type == PacketType
24  pub filter          : TofPacketType,
25  /// Number of read packets
26  n_packs_read        : usize,
27  /// Number of skipped packets
28  n_packs_skipped     : usize,
29  /// Skip the first n packets
30  pub skip_ahead      : usize,
31  /// Stop reading after n packets
32  pub stop_after      : usize,
33  /// The index of the current file in the internal "filenames" vector.
34  pub file_idx        : usize,
35}
36
37impl TofPacketReader {
38  
39  /// Setup a new Reader, allowing the argument to be either the name of a single file or 
40  /// the name of a directory
41  /// FIXME - make this return Result, like the Caraspace reader
42  pub fn new(filename_or_directory : &str) -> Self {
43    let firstfile : String;
44    match list_path_contents_sorted(&filename_or_directory, None) {
45      Err(err) => {
46        error!("{} does not seem to be either a valid directory or an existing file! {err}", filename_or_directory);
47        panic!("Unable to open files!");
48      }
49      Ok(files) => {
50        firstfile = files[0].clone();
51        match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
52          Err(err) => {
53            error!("Unable to open file {firstfile}! {err}");
54            panic!("Unable to create reader from {filename_or_directory}!");
55          }
56          Ok(file) => {
57            let packet_reader = Self { 
58              filenames       : files,
59              file_reader     : BufReader::new(file),
60              cursor          : 0,
61              filter          : TofPacketType::Unknown,
62              n_packs_read    : 0,
63              skip_ahead      : 0,
64              stop_after      : 0,
65              n_packs_skipped : 0,
66              file_idx        : 0,
67            };
68            packet_reader
69          }
70        }
71      }
72    } 
73  }
74
75
76  /// Return the next tofpacket in the stream
77  ///
78  /// Will return none if the file has been exhausted.
79  /// Use ::rewind to start reading from the beginning
80  /// again.
81  ///
82  /// If a filter is set, only packets of type as set 
83  /// in the filter will be read, all others will be 
84  /// ignored
85  pub fn read_next_item(&mut self) -> Option<TofPacket> {
86    // filter::Unknown corresponds to allowing any
87    let mut buffer = [0];
88    loop {
89      match self.file_reader.read_exact(&mut buffer) {
90        Err(err) => {
91          debug!("Unable to read from file! {err}");
92          self.prime_next_file()?;
93          return self.read_next_item();
94        }
95        Ok(_) => {
96          self.cursor += 1;
97        }
98      }
99      if buffer[0] != 0xAA {
100        continue;
101      } else {
102        match self.file_reader.read_exact(&mut buffer) {
103          Err(err) => {
104            debug!("Unable to read from file! {err}");
105            self.prime_next_file()?;
106            return self.read_next_item();
107          }
108          Ok(_) => {
109            self.cursor += 1;
110          }
111        }
112
113        if buffer[0] != 0xAA { 
114          continue;
115        } else {
116          // the 3rd byte is the packet type
117          match self.file_reader.read_exact(&mut buffer) {
118             Err(err) => {
119              debug!("Unable to read from file! {err}");
120              self.prime_next_file()?;
121              return self.read_next_item();
122            }
123            Ok(_) => {
124              self.cursor += 1;
125            }
126          }
127          let ptype    = TofPacketType::from(buffer[0]);
128          // read the the size of the packet
129          let mut buffer_psize = [0,0,0,0];
130          match self.file_reader.read_exact(&mut buffer_psize) {
131            Err(err) => {
132              debug!("Unable to read from file! {err}");
133              self.prime_next_file()?;
134              return self.read_next_item();
135            } 
136            Ok(_) => {
137              self.cursor += 4;
138            }
139          }
140          let vec_data = buffer_psize.to_vec();
141          let size     = parse_u32(&vec_data, &mut 0);
142          if ptype != self.filter && self.filter != TofPacketType::Unknown {
143            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
144              Err(err) => {
145                debug!("Unable to read more data! {err}");
146                self.prime_next_file()?;
147                return self.read_next_item(); 
148              }
149              Ok(_) => {
150                self.cursor += size as usize;
151              }
152            }
153            continue; // this is just not the packet we want
154          }
155          // now at this point, we want the packet!
156          // except we skip ahead or stop earlier
157          if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
158            // we don't want it
159            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
160              Err(err) => {
161                debug!("Unable to read more data! {err}");
162                self.prime_next_file()?;
163                return self.read_next_item(); 
164              }
165              Ok(_) => {
166                self.n_packs_skipped += 1;
167                self.cursor += size as usize;
168              }
169            }
170            continue; // this is just not the packet we want
171          }
172          if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
173            // we don't want it
174            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
175              Err(err) => {
176                debug!("Unable to read more data! {err}");
177                self.prime_next_file()?;
178                return self.read_next_item(); 
179              }
180              Ok(_) => {
181                self.cursor += size as usize;
182              }
183            }
184            continue; // this is just not the packet we want
185          }
186
187          let mut tp = TofPacket::new();
188          tp.packet_type = ptype;
189          let mut payload = vec![0u8;size as usize];
190
191          match self.file_reader.read_exact(&mut payload) {
192            Err(err) => {
193              debug!("Unable to read from file! {err}");
194              self.prime_next_file()?;
195              return self.read_next_item(); 
196            }
197            Ok(_) => {
198              self.cursor += size as usize;
199            }
200          }
201          tp.payload = payload;
202          // we don't filter, so we like this packet
203          let mut tail = vec![0u8; 2];
204          match self.file_reader.read_exact(&mut tail) {
205            Err(err) => {
206              debug!("Unable to read from file! {err}");
207              self.prime_next_file()?;
208              return self.read_next_item(); 
209            }
210            Ok(_) => {
211              self.cursor += 2;
212            }
213          }
214          let tail = parse_u16(&tail,&mut 0);
215          if tail != TofPacket::TAIL {
216            debug!("TofPacket TAIL signature wrong!");
217            return None;
218          }
219          self.n_packs_read += 1;
220          return Some(tp);
221        }
222      } // if no 0xAA found
223    } // end loop
224  } // end fn
225
226  /// This is the file the current cursor is located 
227  /// in and frames are currently read out from 
228  pub fn get_current_filename(&self) -> Option<String> {
229    // should only happen when it is empty
230    if self.filenames.len() <= self.file_idx {
231      return None;
232    }
233    Some(self.filenames[self.file_idx].clone())
234  }
235  
236  /// Preview the number of frames in this reader
237  pub fn count_packets(&mut self) -> usize {
238    let _ = self.rewind();
239    let mut nframes = 0usize;
240    let mut buffer  = [0];
241    let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
242    let bar_style  = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
243    let bar = ProgressBar::new(self.filenames.len() as u64);
244    bar.set_position(0);
245    bar.set_message (String::from("Counting packets.."));
246    bar.set_prefix  ("\u{2728}");
247    bar.set_style   (bar_style);
248    bar.set_position(self.file_idx as u64);
249    loop {
250      match self.file_reader.read_exact(&mut buffer) {
251        Err(err) => {
252          debug!("Unable to read from file! {err}");
253          match self.prime_next_file() {
254            None    => break,
255            Some(_) => {
256              bar.set_position(self.file_idx as u64);
257              continue;
258            }
259          };
260        }
261        Ok(_) => {
262          self.cursor += 1;
263        }
264      }
265      if buffer[0] != 0xAA {
266        continue;
267      } else {
268        match self.file_reader.read_exact(&mut buffer) {
269          Err(err) => {
270            debug!("Unable to read from file! {err}");
271            match self.prime_next_file() {
272              None    => break,
273              Some(_) => {
274                bar.set_position(self.file_idx as u64);
275                continue;
276              }
277            };
278          }
279          Ok(_) => {
280            self.cursor += 1;
281          }
282        }
283        // check if the second byte of the header
284        if buffer[0] != 0xAA { 
285          continue;
286        } else {
287          // read the the size of the packet
288          // first we have to skip one byte for the packet type
289          match self.file_reader.read_exact(&mut buffer) {
290            Err(err) => {
291              debug!("Unable to read from file! {err}");
292              match self.prime_next_file() {
293                None    => break,
294                Some(_) => {
295                  bar.set_position(self.file_idx as u64);
296                  continue;
297                }
298              };
299            }
300            Ok(_) => {
301              self.cursor += 1;
302            }
303          }
304          let mut buffer_psize = [0,0,0,0];
305          match self.file_reader.read_exact(&mut buffer_psize) {
306            Err(_err) => {
307              match self.prime_next_file() {
308                None    => break,
309                Some(_) => {
310                  bar.set_position(self.file_idx as u64);
311                  continue;
312                }
313              }
314            }
315            Ok(_) => {
316              self.cursor += 8;
317            }
318          }
319          let vec_data = buffer_psize.to_vec();
320          let size     = parse_u64(&vec_data, &mut 0);
321          let mut temp_buffer = vec![0; size as usize];
322          match self.file_reader.read_exact(&mut temp_buffer) { 
323          //match self.file_reader.seek(SeekFrom::Current(size as i64)) {
324          //match self.file_reader.seek_relative(size as i64) {
325            Err(err) => {
326              error!("Unable to read {size} bytes from {}! {err}", self.get_current_filename().unwrap());
327              match self.prime_next_file() {
328                None    => break,
329                Some(_) => {
330                  bar.set_position(self.file_idx as u64);
331                  continue;
332                }
333              }
334            }
335            Ok(_) => {
336              self.cursor += size as usize;
337              nframes += 1;
338            }
339          }
340        }
341      } // if no 0xAA found
342    } // end loop
343    bar.finish_with_message("Done!");
344    let _ = self.rewind();
345    nframes
346  } // end fn
347
348  /// The very first TofPacket for a reader
349  pub fn first_packet(&mut self) -> Option<TofPacket> {
350    match self.rewind() {
351      Err(err) => {
352        error!("Error when rewinding files! {err}");
353      }
354      Ok(_) => ()
355    }
356    let pack = self.read_next_item();
357    match self.rewind() {
358      Err(err) => {
359        error!("Error when rewinding files! {err}");
360      }
361      Ok(_) => ()
362    }
363    return pack;
364  }
365
366  /// The very last TofPacket for a reader
367  pub fn last_packet(&mut self) -> Option<TofPacket> { 
368    self.file_idx    = self.filenames.len() - 1;
369    let lastfilename = self.filenames[self.file_idx].clone();
370    let lastfile     = OpenOptions::new().create(false).append(false).read(true).open(lastfilename).expect("Unable to open file {nextfilename}");
371    self.file_reader = BufReader::new(lastfile);
372    self.cursor      = 0;
373    let mut tp = TofPacket::new();
374    let mut idx = 0;
375    loop {
376      match self.read_next_item() {
377        None => {
378          match self.rewind() {
379            Err(err) => {
380              error!("Error when rewinding files! {err}");
381            }
382            Ok(_) => ()
383          }
384          if idx == 0 {
385            return None;
386          } else {
387            return Some(tp);
388          }
389        }
390        Some(pack) => {
391          idx += 1;
392          tp = pack;
393          continue;
394        }
395      }
396    }
397  }
398
399
400}
401
402impl fmt::Display for TofPacketReader {
403  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
404    let mut range_repr = String::from("");
405    if self.skip_ahead > 0 {
406      range_repr += &(format!("({}", self.skip_ahead));
407    } else {
408      range_repr += "(";
409    }
410    if self.stop_after > 0 {
411      range_repr += &(format!("..{})", self.stop_after));
412    } else {
413      range_repr += "..)";
414    }
415    let repr = format!("<TofPacketReader :read {} packets, filter {}, range {}\n files {:?}>", self.n_packs_read, self.filter, range_repr, self.filenames);
416    write!(f, "{}", repr)
417  }
418}
419
420reader!(TofPacketReader,TofPacket);
421
422#[cfg(feature="pybindings")]
423#[pymethods]
424impl TofPacketReader {
425
426  #[new]
427  #[pyo3(signature = (filename_or_directory, filter = None))] 
428  fn new_py(filename_or_directory : &Bound<'_,PyAny>, filter : Option<TofPacketType>) -> PyResult<Self> {
429    let mut string_value = String::from("foo");
430    if let Ok(s) = filename_or_directory.extract::<String>() {
431       string_value = s;
432    } //else if let Ok(p) = filename_or_directory.extract::<&Path>() {
433    if let Ok(fspath_method) = filename_or_directory.getattr("__fspath__") {
434      if let Ok(fspath_result) = fspath_method.call0() {
435        if let Ok(py_string) = fspath_result.extract::<String>() {
436          string_value = py_string;
437        }
438      }
439    }
440    let mut reader = Self::new(&string_value);
441    match filter {
442      None => (),
443      Some(ftr) => {
444        reader.filter = ftr;
445      }
446    }
447    Ok(reader)
448    //match Self::new(&string_value) {
449    //  Err(err) => {
450    //    return Err(PyValueError::new_err(err.to_string()));
451    //  }
452    //  Ok(reader) => {
453    //    return Ok(reader);
454    //  }
455    //}
456  }
457
458  #[getter]
459  fn first(&mut self) -> Option<TofPacket> {
460    self.first_packet()
461  }
462
463  #[getter]
464  fn last(&mut self) -> Option<TofPacket> {
465    self.last_packet()
466  }
467 
468  #[getter]
469  fn filenames(&self) -> Vec<String> {
470    self.filenames.clone()
471  }
472  //#[pyo3(name="set_tracker_calibrations_from_fnames")]
473  //#[pyo3(signature = (mask = None, pedestal = None, transfer_fn = None, cmn_noise = None))]
474  //fn set_tracker_calibrations_from_fnames_py(&mut self,
475  //                                            mask        : Option<String>,
476  //                                            pedestal    : Option<String>,
477  //                                            transfer_fn : Option<String>,
478  //                                            cmn_noise   : Option<String>) {
479  //  self.set_tracker_calibrations_from_fnames(mask, pedestal, transfer_fn, cmn_noise);
480  //}
481
482  /// This is the filename we are currently 
483  /// extracting frames from 
484  #[getter]
485  #[pyo3(name="current_filename")]
486  fn get_current_filename_py(&self) -> Option<String> {
487    self.get_current_filename()
488  }
489
490  /// Start the reader from the beginning
491  /// This is equivalent to a re-initialization
492  /// of that reader.
493  #[pyo3(name="rewind")]
494  fn rewind_py(&mut self) -> PyResult<()> {
495    match self.rewind() {
496      Err(err) => {
497        return Err(PyValueError::new_err(err.to_string()));
498      }
499      Ok(_) => Ok(())
500    }
501  }
502
503  #[pyo3(name="count_packets")]
504  fn count_packets_py(&mut self) -> usize {
505    self.count_packets()
506  }
507
508  fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
509    slf 
510  }
511  
512  fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<TofPacket> {
513    slf.next()
514    //match slf.next() { 
515    //  Some(packet) => {
516    //    return Some(packet)
517    //  }   
518    //  None => {
519    //    return None;
520    //  }   
521    //}   
522  }
523}
524
525#[cfg(feature="pybindings")]
526pythonize_display!(TofPacketReader);
527
528