gondola_core/io/
tof_reader.rs

1//! The TofPacketReader allows to read a (file) stream of serialized
2//! TofPackets
3//!
4//!
5// The following file is part of gaps-online-software and published 
6// under the GPLv3 license
7
8use crate::prelude::*;
9
10/// Read serialized TofPackets from an existing file or directory
11///
12/// This can read the "TOF stream" files, typically suffixed with .tof.gaps
13/// These files are typically written by a TofPacketReader instance, e.g. as 
14/// on the TOF flight computer
15#[derive(Debug)]
16#[cfg_attr(feature="pybindings", pyclass)]
17pub struct TofPacketReader {
18  /// Read from this file
19  pub filenames       : Vec<String>,
20  file_reader         : BufReader<File>,
21  /// Current (byte) position in the file
22  cursor              : usize,
23  /// Read only packets of type == PacketType
24  pub filter          : TofPacketType,
25  /// Number of read packets
26  n_packs_read        : usize,
27  /// Number of skipped packets
28  n_packs_skipped     : usize,
29  /// Skip the first n packets
30  pub skip_ahead      : usize,
31  /// Stop reading after n packets
32  pub stop_after      : usize,
33  /// The index of the current file in the internal "filenames" vector.
34  pub file_idx        : usize,
35  /// Geometry of each TOF paddle
36  /// e.g. paddles 
37  pub tof_paddles     : Arc<HashMap<u8,TofPaddle>>,
38}
39
40impl TofPacketReader {
41  
42  /// Setup a new Reader, allowing the argument to be either the name of a single file or 
43  /// the name of a directory
44  /// FIXME - make this return Result, like the Caraspace reader
45  pub fn new(filename_or_directory : &str) -> Self {
46    let firstfile : String;
47    #[cfg(feature="database")]
48    let mut paddles  = HashMap::<u8, TofPaddle>::new();
49    #[cfg(not(feature="database"))]
50    let paddles = HashMap::<u8, TofPaddle>::new();
51    #[cfg(feature="database")]
52    match TofPaddle::all_as_dict() {
53      Err(err) => {
54        error!("Unable to retrieve paddle information from DB! {err}");
55      }
56      Ok(pdls) => {
57        paddles   = pdls;         
58      }
59    }
60    match list_path_contents_sorted(&filename_or_directory, None) {
61      Err(err) => {
62        error!("{} does not seem to be either a valid directory or an existing file! {err}", filename_or_directory);
63        panic!("Unable to open files!");
64      }
65      Ok(files) => {
66        firstfile = files[0].clone();
67        match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
68          Err(err) => {
69            error!("Unable to open file {firstfile}! {err}");
70            panic!("Unable to create reader from {filename_or_directory}!");
71          }
72          Ok(file) => {
73            let packet_reader = Self { 
74              filenames       : files,
75              file_reader     : BufReader::new(file),
76              cursor          : 0,
77              filter          : TofPacketType::Unknown,
78              n_packs_read    : 0,
79              skip_ahead      : 0,
80              stop_after      : 0,
81              n_packs_skipped : 0,
82              file_idx        : 0,
83              tof_paddles     : Arc::new(paddles),
84            };
85            packet_reader
86          }
87        }
88      }
89    } 
90  }
91
92
93  /// Return the next tofpacket in the stream
94  ///
95  /// Will return none if the file has been exhausted.
96  /// Use ::rewind to start reading from the beginning
97  /// again.
98  ///
99  /// If a filter is set, only packets of type as set 
100  /// in the filter will be read, all others will be 
101  /// ignored
102  pub fn read_next_item(&mut self) -> Option<TofPacket> {
103    // filter::Unknown corresponds to allowing any
104    let mut buffer = [0];
105    loop {
106      match self.file_reader.read_exact(&mut buffer) {
107        Err(err) => {
108          debug!("Unable to read from file! {err}");
109          self.prime_next_file()?;
110          return self.read_next_item();
111        }
112        Ok(_) => {
113          self.cursor += 1;
114        }
115      }
116      if buffer[0] != 0xAA {
117        continue;
118      } else {
119        match self.file_reader.read_exact(&mut buffer) {
120          Err(err) => {
121            debug!("Unable to read from file! {err}");
122            self.prime_next_file()?;
123            return self.read_next_item();
124          }
125          Ok(_) => {
126            self.cursor += 1;
127          }
128        }
129
130        if buffer[0] != 0xAA { 
131          continue;
132        } else {
133          // the 3rd byte is the packet type
134          match self.file_reader.read_exact(&mut buffer) {
135             Err(err) => {
136              debug!("Unable to read from file! {err}");
137              self.prime_next_file()?;
138              return self.read_next_item();
139            }
140            Ok(_) => {
141              self.cursor += 1;
142            }
143          }
144          let ptype    = TofPacketType::from(buffer[0]);
145          // read the the size of the packet
146          let mut buffer_psize = [0,0,0,0];
147          match self.file_reader.read_exact(&mut buffer_psize) {
148            Err(err) => {
149              debug!("Unable to read from file! {err}");
150              self.prime_next_file()?;
151              return self.read_next_item();
152            } 
153            Ok(_) => {
154              self.cursor += 4;
155            }
156          }
157          let vec_data = buffer_psize.to_vec();
158          let size     = parse_u32(&vec_data, &mut 0);
159          if ptype != self.filter && self.filter != TofPacketType::Unknown {
160            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
161              Err(err) => {
162                debug!("Unable to read more data! {err}");
163                self.prime_next_file()?;
164                return self.read_next_item(); 
165              }
166              Ok(_) => {
167                self.cursor += size as usize;
168              }
169            }
170            continue; // this is just not the packet we want
171          }
172          // now at this point, we want the packet!
173          // except we skip ahead or stop earlier
174          if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
175            // we don't want it
176            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
177              Err(err) => {
178                debug!("Unable to read more data! {err}");
179                self.prime_next_file()?;
180                return self.read_next_item(); 
181              }
182              Ok(_) => {
183                self.n_packs_skipped += 1;
184                self.cursor += size as usize;
185              }
186            }
187            continue; // this is just not the packet we want
188          }
189          if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
190            // we don't want it
191            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
192              Err(err) => {
193                debug!("Unable to read more data! {err}");
194                self.prime_next_file()?;
195                return self.read_next_item(); 
196              }
197              Ok(_) => {
198                self.cursor += size as usize;
199              }
200            }
201            continue; // this is just not the packet we want
202          }
203
204          let mut tp = TofPacket::new();
205          tp.packet_type = ptype;
206          let mut payload = vec![0u8;size as usize];
207
208          match self.file_reader.read_exact(&mut payload) {
209            Err(err) => {
210              debug!("Unable to read from file! {err}");
211              self.prime_next_file()?;
212              return self.read_next_item(); 
213            }
214            Ok(_) => {
215              self.cursor += size as usize;
216            }
217          }
218          tp.payload = payload;
219          // we don't filter, so we like this packet
220          let mut tail = vec![0u8; 2];
221          match self.file_reader.read_exact(&mut tail) {
222            Err(err) => {
223              debug!("Unable to read from file! {err}");
224              self.prime_next_file()?;
225              return self.read_next_item(); 
226            }
227            Ok(_) => {
228              self.cursor += 2;
229            }
230          }
231          let tail = parse_u16(&tail,&mut 0);
232          if tail != TofPacket::TAIL {
233            debug!("TofPacket TAIL signature wrong!");
234            return None;
235          }
236          if tp.packet_type == TofPacketType::TofEvent {
237            tp.tof_paddles = self.tof_paddles.clone();
238          }
239          self.n_packs_read += 1;
240          return Some(tp);
241        }
242      } // if no 0xAA found
243    } // end loop
244  } // end fn
245
246  /// This is the file the current cursor is located 
247  /// in and frames are currently read out from 
248  pub fn get_current_filename(&self) -> Option<String> {
249    // should only happen when it is empty
250    if self.filenames.len() <= self.file_idx {
251      return None;
252    }
253    Some(self.filenames[self.file_idx].clone())
254  }
255  
256  /// Run once over the entire file, skipping most of its content 
257  /// but retrieve the number of packets available. 
258  ///
259  /// After a succesful count, the reader is rewound automatically
260  ///
261  /// # Returns:
262  ///   number of packets in the current file or, if multiple files given, 
263  ///   all of them.
264  pub fn count_packets(&mut self) -> usize {
265    let _ = self.rewind();
266    let mut nframes = 0usize;
267    let mut buffer  = [0];
268    let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
269    let bar_style  = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
270    let bar = ProgressBar::new(self.filenames.len() as u64);
271    bar.set_position(0);
272    bar.set_message (String::from("Counting packets.."));
273    bar.set_prefix  ("\u{2728}");
274    bar.set_style   (bar_style);
275    bar.set_position(self.file_idx as u64);
276    loop {
277      match self.file_reader.read_exact(&mut buffer) {
278        Err(err) => {
279          debug!("Unable to read from file! {err}");
280          match self.prime_next_file() {
281            None    => break,
282            Some(_) => {
283              bar.set_position(self.file_idx as u64);
284              continue;
285            }
286          };
287        }
288        Ok(_) => {
289          self.cursor += 1;
290        }
291      }
292      if buffer[0] != 0xAA {
293        continue;
294      } else {
295        match self.file_reader.read_exact(&mut buffer) {
296          Err(err) => {
297            debug!("Unable to read from file! {err}");
298            match self.prime_next_file() {
299              None    => break,
300              Some(_) => {
301                bar.set_position(self.file_idx as u64);
302                continue;
303              }
304            };
305          }
306          Ok(_) => {
307            self.cursor += 1;
308          }
309        }
310        // check if the second byte of the header
311        if buffer[0] != 0xAA { 
312          continue;
313        } else {
314          // read the the size of the packet
315          // first we have to skip one byte for the packet type
316          match self.file_reader.read_exact(&mut buffer) {
317            Err(err) => {
318              debug!("Unable to read from file! {err}");
319              match self.prime_next_file() {
320                None    => break,
321                Some(_) => {
322                  bar.set_position(self.file_idx as u64);
323                  continue;
324                }
325              };
326            }
327            Ok(_) => {
328              self.cursor += 1;
329            }
330          }
331          let mut buffer_psize = [0,0,0,0];
332          match self.file_reader.read_exact(&mut buffer_psize) {
333            Err(_err) => {
334              match self.prime_next_file() {
335                None    => break,
336                Some(_) => {
337                  bar.set_position(self.file_idx as u64);
338                  continue;
339                }
340              }
341            }
342            Ok(_) => {
343              self.cursor += 4;
344            }
345          }
346          let vec_data = buffer_psize.to_vec();
347          let size     = parse_u32(&vec_data, &mut 0);
348          let mut temp_buffer = vec![0; size as usize];
349          match self.file_reader.read_exact(&mut temp_buffer) { 
350          //match self.file_reader.seek(SeekFrom::Current(size as i64)) {
351          //match self.file_reader.seek_relative(size as i64) {
352            Err(err) => {
353              error!("Unable to read {size} bytes from {}! {err}", self.get_current_filename().unwrap());
354              match self.prime_next_file() {
355                None    => break,
356                Some(_) => {
357                  bar.set_position(self.file_idx as u64);
358                  continue;
359                }
360              }
361            }
362            Ok(_) => {
363              self.cursor += size as usize;
364              nframes += 1;
365            }
366          }
367        }
368      } // if no 0xAA found
369    } // end loop
370    bar.finish_with_message("Done!");
371    let _ = self.rewind();
372    nframes
373  } // end fn
374
375  /// The very first TofPacket for a reader
376  pub fn first_packet(&mut self) -> Option<TofPacket> {
377    match self.rewind() {
378      Err(err) => {
379        error!("Error when rewinding files! {err}");
380      }
381      Ok(_) => ()
382    }
383    let pack = self.read_next_item();
384    match self.rewind() {
385      Err(err) => {
386        error!("Error when rewinding files! {err}");
387      }
388      Ok(_) => ()
389    }
390    return pack;
391  }
392
393  /// The very last TofPacket for a reader
394  pub fn last_packet(&mut self) -> Option<TofPacket> { 
395    self.file_idx    = self.filenames.len() - 1;
396    let lastfilename = self.filenames[self.file_idx].clone();
397    let lastfile     = OpenOptions::new().create(false).append(false).read(true).open(lastfilename).expect("Unable to open file {nextfilename}");
398    self.file_reader = BufReader::new(lastfile);
399    self.cursor      = 0;
400    let mut tp = TofPacket::new();
401    let mut idx = 0;
402    loop {
403      match self.read_next_item() {
404        None => {
405          match self.rewind() {
406            Err(err) => {
407              error!("Error when rewinding files! {err}");
408            }
409            Ok(_) => ()
410          }
411          if idx == 0 {
412            return None;
413          } else {
414            return Some(tp);
415          }
416        }
417        Some(pack) => {
418          idx += 1;
419          tp = pack;
420          continue;
421        }
422      }
423    }
424  }
425
426
427}
428
429impl fmt::Display for TofPacketReader {
430  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
431    let mut range_repr = String::from("");
432    if self.skip_ahead > 0 {
433      range_repr += &(format!("({}", self.skip_ahead));
434    } else {
435      range_repr += "(";
436    }
437    if self.stop_after > 0 {
438      range_repr += &(format!("..{})", self.stop_after));
439    } else {
440      range_repr += "..)";
441    }
442    let repr = format!("<TofPacketReader :read {} packets, filter {}, range {}\n files {:?}>", self.n_packs_read, self.filter, range_repr, self.filenames);
443    write!(f, "{}", repr)
444  }
445}
446
447reader!(TofPacketReader,TofPacket);
448
449#[cfg(feature="pybindings")]
450#[pymethods]
451impl TofPacketReader {
452
453  #[new]
454  #[pyo3(signature = (filename_or_directory, filter = None))] 
455  fn new_py(filename_or_directory : &Bound<'_,PyAny>, filter : Option<TofPacketType>) -> PyResult<Self> {
456    let mut string_value = String::from("foo");
457    if let Ok(s) = filename_or_directory.extract::<String>() {
458       string_value = s;
459    } //else if let Ok(p) = filename_or_directory.extract::<&Path>() {
460    if let Ok(fspath_method) = filename_or_directory.getattr("__fspath__") {
461      if let Ok(fspath_result) = fspath_method.call0() {
462        if let Ok(py_string) = fspath_result.extract::<String>() {
463          string_value = py_string;
464        }
465      }
466    }
467    let mut reader = Self::new(&string_value);
468    match filter {
469      None => (),
470      Some(ftr) => {
471        reader.filter = ftr;
472      }
473    }
474    Ok(reader)
475    //match Self::new(&string_value) {
476    //  Err(err) => {
477    //    return Err(PyValueError::new_err(err.to_string()));
478    //  }
479    //  Ok(reader) => {
480    //    return Ok(reader);
481    //  }
482    //}
483  }
484
485  #[getter]
486  fn first(&mut self) -> Option<TofPacket> {
487    self.first_packet()
488  }
489
490  #[getter]
491  fn last(&mut self) -> Option<TofPacket> {
492    self.last_packet()
493  }
494 
495  #[getter]
496  fn filenames(&self) -> Vec<String> {
497    self.filenames.clone()
498  }
499  //#[pyo3(name="set_tracker_calibrations_from_fnames")]
500  //#[pyo3(signature = (mask = None, pedestal = None, transfer_fn = None, cmn_noise = None))]
501  //fn set_tracker_calibrations_from_fnames_py(&mut self,
502  //                                            mask        : Option<String>,
503  //                                            pedestal    : Option<String>,
504  //                                            transfer_fn : Option<String>,
505  //                                            cmn_noise   : Option<String>) {
506  //  self.set_tracker_calibrations_from_fnames(mask, pedestal, transfer_fn, cmn_noise);
507  //}
508
509  /// This is the filename we are currently 
510  /// extracting frames from 
511  #[getter]
512  #[pyo3(name="current_filename")]
513  fn get_current_filename_py(&self) -> Option<String> {
514    self.get_current_filename()
515  }
516
517  /// Start the reader from the beginning
518  /// This is equivalent to a re-initialization
519  /// of that reader.
520  #[pyo3(name="rewind")]
521  fn rewind_py(&mut self) -> PyResult<()> {
522    match self.rewind() {
523      Err(err) => {
524        return Err(PyValueError::new_err(err.to_string()));
525      }
526      Ok(_) => Ok(())
527    }
528  }
529  
530  /// Run once over the entire file, skipping most of its content 
531  /// but retrieve the number of packets available. 
532  ///
533  /// After a succesful count, the reader is rewound automatically
534  ///
535  /// # Returns:
536  ///   number of packets in the current file or, if multiple files given, 
537  ///   all of them.
538  #[pyo3(name="count_packets")]
539  fn count_packets_py(&mut self) -> usize {
540    self.count_packets()
541  }
542
543  fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
544    slf 
545  }
546  
547  fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<TofPacket> {
548    slf.next()
549    //match slf.next() { 
550    //  Some(packet) => {
551    //    return Some(packet)
552    //  }   
553    //  None => {
554    //    return None;
555    //  }   
556    //}   
557  }
558}
559
560#[cfg(feature="pybindings")]
561pythonize_display!(TofPacketReader);
562
563