caraspace/
reader.rs

1//! The following file is part of gaps-online-software and published 
2//! under the GPLv3 license
3//!
4//! Specifically, this file is part of the i/o system, more specifically
5//! the caraspace library. Caraspace provides a system to read files 
6//! for the GAPS experiment comprising different sources, specifically 
7//! files from the TOF system written to disk as well as telemetry file
8//!
9//! While written for the GAPS experiment, the caraspace library is 
10//! designed in a form that it should be easily adaptable for other 
11//! purposes.
12//!
13//! This file contains the source for CRReader, a device to read a number
14//! of "caraspace" files from a given source.
15
16use std::fmt;
17
18use std::fs::{
19  self,
20  File,
21  OpenOptions
22};
23
24use std::path::Path;
25use std::io::{
26  self,
27  BufReader,
28  Seek,
29  SeekFrom,
30  Read,
31  ErrorKind
32};
33use regex::Regex;
34
35use indicatif::{
36  ProgressBar,
37  ProgressStyle
38};
39
40use crate::frame::CRFrame;
41use crate::serialization::CRSerializeable;
42use crate::parsers::*;
43
44/// Read binaries written through the caraspace system
45///
46/// The file needs to contain subsequent CRFrames.
47#[derive(Debug)] // deliberatly don't have a default() method, reader should fail in that case
48pub struct CRReader {
49  /// Read from this file
50  pub filenames        : Vec<String>,
51  /// The position of the current worked on file 
52  /// in the filenames vector
53  pub file_index       : usize,
54  /// A simple BufReader for reading generic binary
55  /// files
56  file_reader          : BufReader<File>,
57  /// Current (byte) position in the current file
58  /// This gets reset when we switch to a new file
59  cursor               : usize,
60  /// Number of read packets
61  n_packs_read         : usize,
62  /// Number of skipped packets
63  n_packs_skipped      : usize,
64  /// Number of deserialization errors occured
65  /// since the beginning of the file
66  pub n_errors         : usize,
67  /// Skip the first n packets
68  pub skip_ahead       : usize,
69  /// Stop reading after n packets
70  pub stop_after       : usize,
71  ///// A container for TOF paddle to associate
72  ///// hits with coordinates
73  //pub paddles         : HashMap<u8,Paddle>,
74  ///// did paddle loading work
75  //pub db_loaded       : bool,
76}
77
78impl fmt::Display for CRReader {
79  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
80    let mut range_repr = String::from("");
81    if self.skip_ahead > 0 {
82      range_repr += &(format!("({}", self.skip_ahead));
83    } else {
84      range_repr += "(";
85    }
86    if self.stop_after > 0 {
87      range_repr += &(format!("..{})", self.stop_after));
88    } else {
89      range_repr += "..)";
90    }
91    let mut repr = String::from("<CRReader :");
92    repr += "\n -- files:";
93    for k in &self.filenames {
94      repr += &format!("\n     -- {k}");
95    }
96    if self.filenames.len() > 0 {
97      repr += &format!("\n  current : {}", self.get_current_filename().unwrap());
98    }
99    repr += &String::from("\n -- -- -- -- -- -- -- -- -- -- -- --");
100    repr += &format!("\n  read {} packets, {} errors, range {}>", self.n_packs_read, self.n_errors, range_repr);
101    write!(f, "{}", repr)
102  }
103}
104
105impl CRReader {
106 
107  /// Create a new CRReader
108  ///
109  /// # Arguments:
110  ///   * filename_or_directory : Can be either the name of a single file, or a directory with 
111  ///                             caraspace files in it.
112  ///   
113  pub fn new(filename_or_directory : String) -> Result<Self, io::Error> {
114    //let mut paddles   = HashMap::<u8, Paddle>::new();
115    //let db_path       = env::var("DATABASE_URL").unwrap_or_else(|_| "".to_string());
116    //let mut db_loaded = false;
117    //match connect_to_db(db_path) {
118    //  Err(_err) => {
119    //    error!("Database can not be found! Did you load the setup-env.sh shell?");
120    //  }
121    //  Ok(mut conn) => {
122    //    match Paddle::all(&mut conn) {
123    //      None => {
124    //        error!("Unable to retrieve paddle information from DB!");
125    //      }
126    //      Some(pdls) => {
127    //        db_loaded = true;
128    //        for p in pdls {
129    //          paddles.insert(p.paddle_id as u8, p.clone());
130    //        }
131    //      }
132    //    }
133    //  }
134    //}
135    // check the input argument and get the filelist
136    let infiles   = Self::list_path_contents_sorted(&filename_or_directory, None)?;
137    if infiles.len() == 0 {
138      error!("Unable to read files from {filename_or_directory}. Is this a valid path?");
139      return Err(io::Error::new(ErrorKind::NotFound, "Unable to find given path!"))
140    }
141    let firstfile = infiles[0].clone(); 
142    let file = OpenOptions::new().create(false).append(false).read(true).open(&firstfile).expect("Unable to open file {filename}");
143    let packet_reader = Self { 
144      filenames        : infiles,
145      file_index       : 0,
146      file_reader      : BufReader::new(file),
147      cursor           : 0,
148      n_packs_read     : 0,
149      n_errors         : 0,
150      skip_ahead       : 0,
151      stop_after       : 0,
152      n_packs_skipped  : 0,
153      //paddles         : paddles,
154      //db_loaded       : db_loaded
155    };
156    Ok(packet_reader)
157  } 
158  
159  /// This is the file the current cursor is located 
160  /// in and frames are currently read out from 
161  pub fn get_current_filename(&self) -> Option<String> {
162    // should only happen when it is empty
163    if self.filenames.len() <= self.file_index {
164      return None;
165    }
166    Some(self.filenames[self.file_index].clone())
167  }
168  
169  /// Get all filenames in the current path sorted by timestamp if available
170  /// If the given path is a file and not a directory, return only that 
171  /// file instead
172  ///
173  /// # Arguments:
174  ///
175  ///    * input   : name of the target directory
176  ///    * pattern : the regex pattern to look for. That the sorting works,
177  ///                the pattern needs to return a date for the first
178  ///                captured argument and a time for the second captured argument
179  fn list_path_contents_sorted(input: &str, pattern: Option<Regex>) -> Result<Vec<String>, io::Error> {
180    let path = Path::new(input);
181    match fs::metadata(path) {
182      Ok(metadata) => {
183        if metadata.is_file() {
184          let fname = String::from(input);
185          return Ok(vec![fname]);
186        } 
187        if metadata.is_dir() {
188          let re : Regex;
189          match pattern {
190            None => {
191              re = Regex::new(r"Run\d+_\d+\.(\d{6})_(\d{6})UTC(\.tof)?\.gaps$").unwrap();
192            }
193            Some(_re) => {
194              re = _re;
195            }
196          }
197          let mut entries: Vec<(u32, u32, String)> = fs::read_dir(path)?
198            .filter_map(Result::ok) // Ignore unreadable entries
199            .filter_map(|entry| {
200              let filename = format!("{}/{}", path.display(), entry.file_name().into_string().ok()?);
201              re.captures(&filename.clone()).map(|caps| {
202                let date = caps.get(1)?.as_str().parse::<u32>().ok()?;
203                let time = caps.get(2)?.as_str().parse::<u32>().ok()?;
204                Some((date, time, filename))
205              })?
206            })
207            .collect();
208
209          // Sort by (date, time)
210          entries.sort_by(|a, b| (a.0, a.1).cmp(&(b.0, b.1)));
211          // Return only filenames
212          return Ok(entries.into_iter().map(|(_, _, name)| name).collect());
213        } 
214        Err(io::Error::new(ErrorKind::Other, "Path exists but is neither a file nor a directory"))
215      }
216      Err(e) => Err(e),
217    }
218  }
219  
220  ///// Use the associated database to enrich paddle information
221  //fn add_paddleinfo(&self, event : &mut TofEventSummary) {
222  //  event.set_paddles(&self.paddles);
223  //}
224  
225  /// Get the very first frame in all avaialbe files
226  pub fn first_frame(&mut self) -> Option<CRFrame> {
227    match self.rewind() {
228      Err(err) => {
229        error!("Error when rewinding files! {err}");
230      }
231      Ok(_) => ()
232    }
233    let frame = self.get_next_frame();
234    match self.rewind() {
235      Err(err) => {
236        error!("Error when rewinding files! {err}");
237      }
238      Ok(_) => ()
239    }
240    return frame;
241  }
242
243  /// Get the very last frame of all infiles
244  pub fn last_frame(&mut self) -> Option<CRFrame> { 
245    self.file_index = self.filenames.len() - 1;
246    let lastfilename = self.filenames[self.file_index].clone();
247    let lastfile     = OpenOptions::new().create(false).append(false).read(true).open(lastfilename).expect("Unable to open file {nextfilename}");
248    self.file_reader = BufReader::new(lastfile);
249    self.cursor      = 0;
250    let mut frame = CRFrame::new();
251    let mut idx = 0;
252    loop {
253      match self.get_next_frame() {
254        None => {
255          match self.rewind() {
256            Err(err) => {
257              error!("Error when rewinding files! {err}");
258            }
259            Ok(_) => ()
260          }
261          if idx == 0 {
262            return None;
263          } else {
264            return Some(frame);
265          }
266        }
267        Some(_fr) => {
268          idx += 1;
269          frame = _fr;
270          continue;
271        }
272      }
273    }
274  }
275
276  /// Preview the number of frames in this reader
277  pub fn get_n_frames(&mut self) -> usize {
278    let _ = self.rewind();
279    let mut nframes = 0usize;
280    let mut buffer  = [0];
281    let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
282    let bar_style  = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
283    let bar = ProgressBar::new(self.filenames.len() as u64);
284    bar.set_position(0);
285    bar.set_message (String::from("Counting frames.."));
286    bar.set_prefix  ("\u{2728}");
287    bar.set_style   (bar_style);
288    bar.set_position(self.file_index as u64);
289    loop {
290      match self.file_reader.read_exact(&mut buffer) {
291        Err(err) => {
292          debug!("Unable to read from file! {err}");
293          match self.progress_file() {
294            None    => break,
295            Some(_) => {
296              bar.set_position(self.file_index as u64);
297              continue;
298            }
299          };
300        }
301        Ok(_) => {
302          self.cursor += 1;
303        }
304      }
305      if buffer[0] != 0xAA {
306        continue;
307      } else {
308        match self.file_reader.read_exact(&mut buffer) {
309          Err(err) => {
310            debug!("Unable to read from file! {err}");
311            match self.progress_file() {
312              None    => break,
313              Some(_) => {
314                bar.set_position(self.file_index as u64);
315                continue;
316              }
317            };
318          }
319          Ok(_) => {
320            self.cursor += 1;
321          }
322        }
323        // check if the second byte of the header
324        if buffer[0] != 0xAA { 
325          continue;
326        } else {
327          // read the the size of the packet
328          let mut buffer_psize = [0,0,0,0,0,0,0,0];
329          match self.file_reader.read_exact(&mut buffer_psize) {
330            Err(_err) => {
331              match self.progress_file() {
332                None    => break,
333                Some(_) => {
334                  bar.set_position(self.file_index as u64);
335                  continue;
336                }
337              }
338            }
339            Ok(_) => {
340              self.cursor += 8;
341            }
342          }
343          let vec_data = buffer_psize.to_vec();
344          let size     = parse_u64(&vec_data, &mut 0);
345          match self.file_reader.seek(SeekFrom::Current(size as i64)) {
346            Err(err) => {
347              error!("Unable to read {size} bytes from {}! {err}", self.get_current_filename().unwrap());
348              match self.progress_file() {
349                None    => break,
350                Some(_) => {
351                  bar.set_position(self.file_index as u64);
352                  continue;
353                }
354              }
355            }
356            Ok(_) => {
357              self.cursor += size as usize;
358              nframes += 1;
359            }
360          }
361        }
362      } // if no 0xAA found
363    } // end loop
364    bar.finish_with_message("Done!");
365    let _ = self.rewind();
366    nframes
367  } // end fn
368
369  /// Move on to the next file, in case the current one 
370  /// is exhausted
371  /// 
372  /// Return true if there are still files lef
373  fn progress_file(&mut self) -> Option<()> {
374    if self.file_index == self.filenames.len() -1 {
375      return None;
376    } else {
377      self.file_index += 1;
378      let nextfilename = self.filenames[self.file_index].clone();
379      let nextfile     = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
380      self.file_reader = BufReader::new(nextfile);
381      self.cursor      = 0;
382      return Some(());
383    }
384  }
385
386
387  /// Reset the current state of the reader and make 
388  /// the next frame return to be the first frame
389  pub fn rewind(&mut self) -> io::Result<()> {
390    let firstfile = &self.filenames[0];
391    let file      = OpenOptions::new().create(false).append(false).read(true).open(&firstfile)?; 
392    self.file_reader  = BufReader::new(file);
393    self.file_index = 0;
394    self.cursor     = 0;
395    Ok(())
396  }
397
398  /// Return the next frame for the current files
399  ///
400  /// Will return none if the file has been exhausted.
401  /// Use ::rewind to start reading from the beginning
402  /// again.
403  pub fn get_next_frame(&mut self) -> Option<CRFrame> {
404    // filter::Unknown corresponds to allowing any
405
406    let mut buffer = [0];
407    loop {
408      match self.file_reader.read_exact(&mut buffer) {
409        Err(err) => {
410          debug!("Unable to read from file! {err}");
411          // this is ok in case we are out of files
412          self.progress_file()?;
413          return self.get_next_frame();
414        }
415        Ok(_) => {
416          self.cursor += 1;
417        }
418      }
419      if buffer[0] != 0xAA {
420        continue;
421      } else {
422        match self.file_reader.read_exact(&mut buffer) {
423          Err(err) => {
424            debug!("Unable to read from file! {err}");
425            self.progress_file()?;
426            return self.get_next_frame();
427          }
428          Ok(_) => {
429            self.cursor += 1;
430          }
431        }
432
433        if buffer[0] != 0xAA { 
434          continue;
435        } else {
436          // read the the size of the packet
437          let mut buffer_psize = [0,0,0,0,0,0,0,0];
438          match self.file_reader.read_exact(&mut buffer_psize) {
439            Err(err) => {
440              debug!("Unable to read from file! {err}");
441              self.progress_file()?;
442              return self.get_next_frame();
443            }
444            Ok(_) => {
445              self.cursor += 8;
446            }
447          }
448          
449          let vec_data = buffer_psize.to_vec();
450          //println!("vec_data {:?}", vec_data);
451          let size     = parse_u64(&vec_data, &mut 0);
452          //println!("Will read {size} bytes for payload!");
453          // now at this point, we want the packet!
454          // except we skip ahead or stop earlier
455          if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
456            // we don't want it
457            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
458              Err(err) => {
459                debug!("Unable to read more data! {err}");
460                self.progress_file()?;
461                return self.get_next_frame();
462              }
463              Ok(_) => {
464                self.n_packs_skipped += 1;
465                self.cursor += size as usize;
466              }
467            }
468            continue; // this is just not the packet we want
469          }
470          if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
471            // we don't want it
472            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
473              Err(err) => {
474                debug!("Unable to read more data! {err}");
475                self.progress_file()?;
476                return self.get_next_frame();
477              }
478              Ok(_) => {
479                self.cursor += size as usize;
480              }
481            }
482            continue; // this is just not the packet we want
483          }
484
485          let mut frame = CRFrame::new();
486          let mut payload = vec![0u8;size as usize];
487
488          match self.file_reader.read_exact(&mut payload) {
489            Err(err) => {
490              debug!("Unable to read from file! {err}");
491              self.progress_file()?;
492              return self.get_next_frame();
493            }
494            Ok(_) => {
495              self.cursor += size as usize;
496            }
497          }
498          let mut in_frame_pos = 0usize;
499          frame.index = CRFrame::parse_index(&payload, &mut in_frame_pos);
500          frame.bytestorage = payload[in_frame_pos..].to_vec();
501
502          //tp.payload = payload;
503          // we don't filter, so we like this packet
504          let mut tail = vec![0u8; 2];
505          match self.file_reader.read_exact(&mut tail) {
506            Err(err) => {
507              debug!("Unable to read from file! {err}");
508              self.progress_file()?;
509              return self.get_next_frame();
510            }
511            Ok(_) => {
512              self.cursor += 2;
513            }
514          }
515          let tail = parse_u16(&tail,&mut 0);
516          if tail != CRFrame::CRTAIL {
517            debug!("CRFrame TAIL signature wrong!");
518            return None;
519          }
520          self.n_packs_read += 1;
521          return Some(frame);
522        }
523      } // if no 0xAA found
524    } // end loop
525  } // end fn
526}
527
528impl Iterator for CRReader {
529  type Item = CRFrame;
530
531  fn next(&mut self) -> Option<Self::Item> {
532    self.get_next_frame()
533  }
534}
535