gondola_core/io/caraspace/
reader.rs

1//! Caraspace provides a system to read files 
2//! for the GAPS experiment comprising different sources, specifically 
3//! files from the TOF system written to disk as well as telemetry file
4//!
5//! While written for the GAPS experiment, the caraspace library is 
6//! designed in a form that it should be easily adaptable for other 
7//! purposes.
8//!
9//! This file contains the source for CRReader, a device to read a number
10//! of "caraspace" files from a given source.
11//
12// This file is part of gaps-online-software and published 
13// under the GPLv3 license
14
15use crate::prelude::*;
16
17/// Read binaries written through the caraspace i/o system
18///
19/// The file needs to contain subsequent CRFrames.
20#[derive(Debug)] // deliberatly don't have a default() method, reader should fail in that case
21#[cfg_attr(feature="pybindings", pyclass)]
22pub struct CRReader {
23  /// Read from this file
24  pub filenames        : Vec<String>,
25  /// The position of the current worked on file 
26  /// in the filenames vector
27  pub file_idx       : usize,
28  /// A simple BufReader for reading generic binary
29  /// files
30  file_reader          : BufReader<File>,
31  /// Current (byte) position in the current file
32  /// This gets reset when we switch to a new file
33  cursor               : usize,
34  /// Number of read packets
35  n_packs_read         : usize,
36  /// Number of skipped packets
37  n_packs_skipped      : usize,
38  /// Number of deserialization errors occured
39  /// since the beginning of the file
40  pub n_errors         : usize,
41  /// Skip the first n packets
42  pub skip_ahead       : usize,
43  /// Stop reading after n packets
44  pub stop_after       : usize,
45  /// Geometry of each TOF paddle
46  /// e.g. paddles 
47  pub tof_paddles      : Arc<HashMap<u8,TofPaddle>>,
48  /// Geometry of each tracker strip
49  pub trk_strips       : Arc<HashMap<u32, TrackerStrip>>,
50  /// Mask tracker strips 
51  pub trk_masks        : Arc<HashMap<u32, TrackerStripMask>>,
52  /// Tracker pedestal values
53  pub trk_ped          : Arc<HashMap<u32, TrackerStripPedestal>>,
54  /// Transfer functions for tracker (adc -> energy)
55  pub trk_tf           : Arc<HashMap<u32, TrackerStripTransferFunction>>,
56  /// Common noise data for tracker
57  pub trk_cmn          : Arc<HashMap<u32, TrackerStripCmnNoise>>, 
58  ///// did paddle loading work
59  pub db_loaded        : bool,
60  /// TRK calibration - convert to energy
61  pub do_trk_calib     : bool,
62  /// TRK subtract CMN 
63  pub do_trk_cmn_noise : bool,
64}
65
66
67impl CRReader {
68
69  /// Create a new CRReader
70  ///
71  /// # Arguments:
72  ///   * filename_or_directory : Can be either the name of a single file, or a directory with 
73  ///                             caraspace files in it.
74  ///   
75  pub fn new(filename_or_directory : String) -> Result<Self, io::Error> {
76    #[cfg(feature="database")]
77    let mut paddles = HashMap::<u8, TofPaddle>::new();
78    #[cfg(not(feature="database"))]
79    let paddles = HashMap::<u8, TofPaddle>::new();
80    #[cfg(feature="database")]
81    let mut strips  = HashMap::<u32, TrackerStrip>::with_capacity(11520);
82    #[cfg(not(feature="database"))]
83    let strips  = HashMap::<u32, TrackerStrip>::with_capacity(11520);
84    let trk_mask    = HashMap::<u32, TrackerStripMask>::with_capacity(11520);
85    let trk_ped     = HashMap::<u32, TrackerStripPedestal>::with_capacity(11520);
86    let trk_tf      = HashMap::<u32, TrackerStripTransferFunction>::with_capacity(11520);
87    let trk_cmn     = HashMap::<u32, TrackerStripCmnNoise>::with_capacity(11520);
88    //let db_path       = env::var("DATABASE_URL").unwrap_or_else(|_| "".to_string());
89    #[cfg(feature="database")]
90    let mut db_loaded = false;
91    #[cfg(not(feature="database"))]
92    let db_loaded = false;
93    #[cfg(feature="database")]
94    match TofPaddle::all_as_dict() {
95      Err(err) => {
96        error!("Unable to retrieve paddle information from DB! {err}");
97      }
98      Ok(pdls) => {
99        db_loaded = true;
100        paddles   = pdls;         
101      }
102    }
103    #[cfg(feature="database")]
104    match TrackerStrip::all_as_dict() {
105      Err(err) => {
106        error!("Unable to retrieve tracker strip information from DB! {err}");
107        // if strips and paddles do not work, something is utterly fisy
108        db_loaded = false;
109      }
110      Ok(strips_) => {
111        strips   = strips_;         
112      }
113    }
114    // check the input argument and get the filelist
115    let infiles   = list_path_contents_sorted(&filename_or_directory, None)?;
116    if infiles.len() == 0 {
117      error!("Unable to read files from {filename_or_directory}. Is this a valid path?");
118      return Err(io::Error::new(io::ErrorKind::NotFound, "Unable to find given path!"))
119    }
120    let firstfile = infiles[0].clone(); 
121    let file = OpenOptions::new().create(false).append(false).read(true).open(&firstfile).expect("Unable to open file {filename}");
122    let packet_reader = Self { 
123      filenames        : infiles,
124      file_idx         : 0,
125      //file_reader      : BufReader::new(file),
126      // we exploit the fact here that a file is typically ~500Mb
127      // (tof file only is 420MB)
128      file_reader      : BufReader::with_capacity(500*1024*1024,file),
129      cursor           : 0,
130      n_packs_read     : 0,
131      n_errors         : 0,
132      skip_ahead       : 0,
133      stop_after       : 0,
134      n_packs_skipped  : 0,
135      tof_paddles      : Arc::new(paddles),
136      trk_strips       : Arc::new(strips),
137      trk_masks        : Arc::new(trk_mask),
138      trk_ped          : Arc::new(trk_ped),
139      trk_tf           : Arc::new(trk_tf),
140      trk_cmn          : Arc::new(trk_cmn),
141      db_loaded        : db_loaded,
142      do_trk_calib     : false,
143      do_trk_cmn_noise : false,
144    };
145    Ok(packet_reader)
146  } 
147    
148  #[cfg(feature="database")]
149  pub fn set_tracker_calibrations_from_fnames(&mut self,
150                                              mask        : Option<String>,
151                                              pedestal    : Option<String>,
152                                              transfer_fn : Option<String>,
153                                              cmn_noise   : Option<String>) {
154    // Tracker calibration parameters
155    if let Some(maskname) = mask {
156      match TrackerStripMask::as_dict_by_name(&maskname) {
157        Err(err) => {
158          error!("Unable to retrieve Trk strip mask information from DB! {err}");
159          // if strips and paddles do not work, something is utterly fisy
160          self.db_loaded = false;
161        }
162        Ok(strips_) => {
163          self.trk_masks = Arc::new(strips_);
164        }
165      }
166    }
167    if let Some(pedname) = pedestal {
168      match TrackerStripPedestal::as_dict_by_name(&pedname) {
169        Err(err) => {
170          error!("Unable to retrieve TRK pedestal information from DB! {err}");
171          // if strips and paddles do not work, something is utterly fisy
172          self.db_loaded = false;
173        }
174        Ok(strips_) => {
175          self.trk_ped = Arc::new(strips_);
176        }
177      }
178    }
179    if let Some(trafoname) = transfer_fn { 
180      match TrackerStripTransferFunction::as_dict_by_name(&trafoname) {
181        Err(err) => {
182          error!("Unable to retrieve TRK Transfer fn information from DB! {err}");
183          // if strips and paddles do not work, something is utterly fisy
184          self.db_loaded = false;
185        }
186        Ok(trafo_fns_) => {
187          self.trk_tf = Arc::new(trafo_fns_);
188          self.do_trk_calib = true;
189        }
190      }
191    }
192    if let Some(cmnname) = cmn_noise { 
193      match TrackerStripCmnNoise::as_dict_by_name(&cmnname) {
194        Err(err) => {
195          error!("Unable to retrieve TRK common noise from DB! {err}");
196          // if strips and paddles do not work, something is utterly fisy
197          self.db_loaded = false;
198        }
199        Ok(cmn_) => {
200          self.trk_cmn = Arc::new(cmn_);
201          self.do_trk_cmn_noise = true;
202        }
203      }
204    }
205  }
206
207  //  
208  /// This is the file the current cursor is located 
209  /// in and frames are currently read out from 
210  pub fn get_current_filename(&self) -> Option<String> {
211    // should only happen when it is empty
212    if self.filenames.len() <= self.file_idx {
213      return None;
214    }
215    Some(self.filenames[self.file_idx].clone())
216  }
217//  
218//  
219//  ///// Use the associated database to enrich paddle information
220//  //fn add_paddleinfo(&self, event : &mut TofEventSummary) {
221//  //  event.set_paddles(&self.paddles);
222//  //}
223//  
224//
225  /// Preview the number of frames in this reader
226  pub fn count_frames(&mut self) -> usize {
227    let _ = self.rewind();
228    let mut nframes = 0usize;
229    let mut buffer  = [0];
230    let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
231    let bar_style  = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
232    let bar = ProgressBar::new(self.filenames.len() as u64);
233    bar.set_position(0);
234    bar.set_message (String::from("Counting frames.."));
235    bar.set_prefix  ("\u{2728}");
236    bar.set_style   (bar_style);
237    bar.set_position(self.file_idx as u64);
238    loop {
239      match self.file_reader.read_exact(&mut buffer) {
240        Err(err) => {
241          debug!("Unable to read from file! {err}");
242          match self.prime_next_file() {
243            None    => break,
244            Some(_) => {
245              bar.set_position(self.file_idx as u64);
246              continue;
247            }
248          };
249        }
250        Ok(_) => {
251          self.cursor += 1;
252        }
253      }
254      if buffer[0] != 0xAA {
255        continue;
256      } else {
257        match self.file_reader.read_exact(&mut buffer) {
258          Err(err) => {
259            debug!("Unable to read from file! {err}");
260            match self.prime_next_file() {
261              None    => break,
262              Some(_) => {
263                bar.set_position(self.file_idx as u64);
264                continue;
265              }
266            };
267          }
268          Ok(_) => {
269            self.cursor += 1;
270          }
271        }
272        // check if the second byte of the header
273        if buffer[0] != 0xAA { 
274          continue;
275        } else {
276          // read the the size of the packet
277          let mut buffer_psize = [0,0,0,0,0,0,0,0];
278          match self.file_reader.read_exact(&mut buffer_psize) {
279            Err(_err) => {
280              match self.prime_next_file() {
281                None    => break,
282                Some(_) => {
283                  bar.set_position(self.file_idx as u64);
284                  continue;
285                }
286              }
287            }
288            Ok(_) => {
289              self.cursor += 8;
290            }
291          }
292          let vec_data = buffer_psize.to_vec();
293          let size     = parse_u64(&vec_data, &mut 0);
294          let mut temp_buffer = vec![0; size as usize];
295          match self.file_reader.read_exact(&mut temp_buffer) { 
296          //match self.file_reader.seek(SeekFrom::Current(size as i64)) {
297          //match self.file_reader.seek_relative(size as i64) {
298            Err(err) => {
299              error!("Unable to read {size} bytes from {}! {err}", self.get_current_filename().unwrap());
300              match self.prime_next_file() {
301                None    => break,
302                Some(_) => {
303                  bar.set_position(self.file_idx as u64);
304                  continue;
305                }
306              }
307            }
308            Ok(_) => {
309              self.cursor += size as usize;
310              nframes += 1;
311            }
312          }
313        }
314      } // if no 0xAA found
315    } // end loop
316    bar.finish_with_message("Done!");
317    let _ = self.rewind();
318    nframes
319  } // end fn
320
321  /// Return the next frame for the current files
322  ///
323  /// Will return none if the file has been exhausted.
324  /// Use ::rewind to start reading from the beginning
325  /// again.
326  pub fn read_next_item(&mut self) -> Option<CRFrame> {
327    // filter::Unknown corresponds to allowing any
328  
329    let mut buffer = [0];
330    loop {
331      match self.file_reader.read_exact(&mut buffer) {
332        Err(err) => {
333          debug!("Unable to read from file! {err}");
334          // this is ok in case we are out of files
335          self.prime_next_file()?;
336          return self.read_next_item();
337        }
338        Ok(_) => {
339          self.cursor += 1;
340        }
341      }
342      if buffer[0] != 0xAA {
343        continue;
344      } else {
345        match self.file_reader.read_exact(&mut buffer) {
346          Err(err) => {
347            debug!("Unable to read from file! {err}");
348            self.prime_next_file()?;
349            return self.read_next_item();
350          }
351          Ok(_) => {
352            self.cursor += 1;
353          }
354        }
355  
356        if buffer[0] != 0xAA { 
357          continue;
358        } else {
359          // read the the size of the packet
360          let mut buffer_psize = [0,0,0,0,0,0,0,0];
361          match self.file_reader.read_exact(&mut buffer_psize) {
362            Err(err) => {
363              debug!("Unable to read from file! {err}");
364              self.prime_next_file()?;
365              return self.read_next_item();
366            }
367            Ok(_) => {
368              self.cursor += 8;
369            }
370          }
371          
372          let vec_data = buffer_psize.to_vec();
373          //println!("vec_data {:?}", vec_data);
374          let size     = parse_u64(&vec_data, &mut 0);
375          //println!("Will read {size} bytes for payload!");
376          // now at this point, we want the packet!
377          // except we skip ahead or stop earlier
378          if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
379            // we don't want it
380            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
381              Err(err) => {
382                debug!("Unable to read more data! {err}");
383                self.prime_next_file()?;
384                return self.read_next_item();
385              }
386              Ok(_) => {
387                self.n_packs_skipped += 1;
388                self.cursor += size as usize;
389              }
390            }
391            continue; // this is just not the packet we want
392          }
393          if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
394            // we don't want it
395            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
396              Err(err) => {
397                debug!("Unable to read more data! {err}");
398                self.prime_next_file()?;
399                return self.read_next_item();
400              }
401              Ok(_) => {
402                self.cursor += size as usize;
403              }
404            }
405            continue; // this is just not the packet we want
406          }
407  
408          let mut frame = CRFrame::new();
409          let mut payload = vec![0u8;size as usize];
410  
411          match self.file_reader.read_exact(&mut payload) {
412            Err(err) => {
413              debug!("Unable to read from file! {err}");
414              self.prime_next_file()?;
415              return self.read_next_item();
416            }
417            Ok(_) => {
418              self.cursor += size as usize;
419            }
420          }
421          let mut in_frame_pos = 0usize;
422          frame.index = CRFrame::parse_index(&payload, &mut in_frame_pos);
423          frame.bytestorage = payload[in_frame_pos..].to_vec();
424  
425          //tp.payload = payload;
426          // we don't filter, so we like this packet
427          let mut tail = vec![0u8; 2];
428          match self.file_reader.read_exact(&mut tail) {
429            Err(err) => {
430              debug!("Unable to read from file! {err}");
431              self.prime_next_file()?;
432              return self.read_next_item();
433            }
434            Ok(_) => {
435              self.cursor += 2;
436            }
437          }
438          let tail = parse_u16(&tail,&mut 0);
439          if tail != CRFrame::TAIL {
440            debug!("CRFrame TAIL signature wrong!");
441            return None;
442          }
443          self.n_packs_read += 1;
444          // hand the database poitners over to the frame 
445          frame.tof_paddles  = Arc::clone(&self.tof_paddles);
446          frame.trk_strips   = Arc::clone(&self.trk_strips);
447          frame.trk_masks    = Arc::clone(&self.trk_masks);
448          frame.trk_ped      = Arc::clone(&self.trk_ped);
449          frame.trk_tf       = Arc::clone(&self.trk_tf);
450          frame.trk_cmn      = Arc::clone(&self.trk_cmn);
451          frame.do_trk_calib = self.do_trk_calib;
452          frame.subtract_trk_cmn = self.do_trk_cmn_noise;
453          return Some(frame);
454        }
455      } // if no 0xAA found
456    } // end loop
457  } // end fn
458}
459
460impl fmt::Display for CRReader {
461  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
462    let mut range_repr = String::from("");
463    if self.skip_ahead > 0 {
464      range_repr += &(format!("({}", self.skip_ahead));
465    } else {
466      range_repr += "(";
467    }
468    if self.stop_after > 0 {
469      range_repr += &(format!("..{})", self.stop_after));
470    } else {
471      range_repr += "..)";
472    }
473    let mut repr = String::from("<CRReader :");
474    repr += "\n -- files:";
475    for k in &self.filenames {
476      repr += &format!("\n     -- {k}");
477    }
478    if self.filenames.len() > 0 {
479      repr += &format!("\n  current : {}", self.get_current_filename().unwrap());
480    }
481    repr += &String::from("\n -- -- -- -- -- -- -- -- -- -- -- --");
482    repr += &format!("\n  read {} packets, {} errors, range {}>", self.n_packs_read, self.n_errors, range_repr);
483    write!(f, "{}", repr)
484  }
485}
486
487reader!(CRReader,CRFrame);
488
489//--------------------------------------
490
491#[cfg(feature="pybindings")]
492#[pymethods]
493impl CRReader {
494
495  #[new]
496  fn new_py(filename_or_directory : &Bound<'_,PyAny>) -> PyResult<Self> {
497    let mut string_value = String::from("foo");
498    if let Ok(s) = filename_or_directory.extract::<String>() {
499       string_value = s;
500    } //else if let Ok(p) = filename_or_directory.extract::<&Path>() {
501    if let Ok(fspath_method) = filename_or_directory.getattr("__fspath__") {
502      if let Ok(fspath_result) = fspath_method.call0() {
503        if let Ok(py_string) = fspath_result.extract::<String>() {
504          string_value = py_string;
505        }
506      }
507    }
508    match Self::new(string_value) {
509      Err(err) => {
510        return Err(PyValueError::new_err(err.to_string()));
511      }
512      Ok(reader) => {
513        return Ok(reader);
514      }
515    }
516  }
517
518  #[pyo3(name="set_tracker_calibrations_from_fnames")]
519  #[pyo3(signature = (mask = None, pedestal = None, transfer_fn = None, cmn_noise = None))]
520  fn set_tracker_calibrations_from_fnames_py(&mut self,
521                                              mask        : Option<String>,
522                                              pedestal    : Option<String>,
523                                              transfer_fn : Option<String>,
524                                              cmn_noise   : Option<String>) {
525    self.set_tracker_calibrations_from_fnames(mask, pedestal, transfer_fn, cmn_noise);
526  }
527
528  /// This is the filename we are currently 
529  /// extracting frames from 
530  #[getter]
531  #[pyo3(name="current_filename")]
532  fn get_current_filename_py(&self) -> Option<String> {
533    self.get_current_filename()
534  }
535
536  /// Start the reader from the beginning
537  /// This is equivalent to a re-initialization
538  /// of that reader.
539  #[pyo3(name="rewind")]
540  fn rewind_py(&mut self) -> PyResult<()> {
541    match self.rewind() {
542      Err(err) => {
543        return Err(PyValueError::new_err(err.to_string()));
544      }
545      Ok(_) => Ok(())
546    }
547  }
548
549  #[pyo3(name="count_frames")]
550  fn count_frames_py(&mut self) -> usize {
551    self.count_frames()
552  }
553
554  fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
555    slf 
556  }
557  
558  fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<CRFrame> {
559    match slf.next() { 
560      Some(frame) => {
561        return Some(frame)
562      }   
563      None => {
564        return None;
565      }   
566    }   
567  }
568}
569
570#[cfg(feature="pybindings")]
571pythonize_display!(CRReader);