telemetry_dataclasses/
io.rs

1use std::fmt;
2use std::fs::{
3  self,
4  File,
5};
6use std::io;
7use std::io::SeekFrom;
8use std::io::Seek;
9use std::io::BufReader;
10use std::path::Path;
11use std::collections::HashMap;
12use std::fs::OpenOptions;
13use std::io::Read;
14use std::io::ErrorKind;
15use regex::Regex;
16
17use log::{
18  debug,
19  error
20};
21
22use tof_dataclasses::io::read_file;
23use tof_dataclasses::serialization::{
24  search_for_u16,
25  Serialization,
26  //parse_u16,
27  parse_u32,
28};
29
30use crate::packets::{
31  TelemetryHeader,
32  TelemetryPacket,
33  MergedEvent,
34  TrackerPacket,
35  GapsEvent,
36};
37use crate::packets::TelemetryPacketType;
38
39/// Extract all merged events from a file and ignore all others
40pub fn get_gaps_events(filename : String) -> Vec<GapsEvent> {
41  let mut events = Vec::<GapsEvent>::new();
42  let stream = read_file(Path::new(&filename)).expect("Unable to open input file!");
43  let mut pos : usize = 0;
44  //let mut npackets : usize = 0;
45  let mut packet_types = Vec::<u8>::new();
46  loop {
47    match TelemetryHeader::from_bytestream(&stream, &mut pos) {
48      Err(err) => {
49        println!("Can not decode telemtry header! {err}");
50        //for k in pos - 5 .. pos + 5 {
51        //  println!("{}",stream[k]);
52        //}
53        match search_for_u16(0x90eb, &stream, pos) {
54          Err(err) => {
55            println!("Unable to find next header! {err}");
56            break;
57          }
58          Ok(head_pos) => {
59            pos = head_pos;
60          }
61        }
62      }
63      Ok(header) => {
64        println!("HEADER {}", header);
65        //for k in pos - 10 .. pos + 10 {
66        //  println!("{}",stream[k]);
67        //}
68        if header.ptype == 80 {
69          match TrackerPacket::from_bytestream(&stream, &mut pos) {
70            Err(err) => {
71              //for k in pos - 5 .. pos + 5 {
72              //  println!("{}",stream[k]);
73              //}
74              println!("Unable to decode TrackerPacket! {err}");
75            }
76            Ok(mut tp) => {
77              tp.telemetry_header = header;
78              println!("{}", tp);
79            }
80          }
81        }
82        if header.ptype == 90 {
83          match MergedEvent::from_bytestream(&stream, &mut pos) {
84            Err(err) => {
85              println!("Unable to decode MergedEvent! {err}");
86            }
87            Ok(mut me) => {
88              me.header  = header;
89              let mut g_event = GapsEvent::new();
90              //println!("Event ID  : {}", me.event_id);
91              //println!("Tof bytes : {:?}", me.tof_data);
92              //println!("len tof bytes : {}", me.tof_data.len());
93              g_event.tof     = me.tof_event;
94              g_event.tracker = me.tracker_events;
95              events.push(g_event)
96            }
97          }
98        }
99        //npackets += 1;
100        packet_types.push(header.ptype);
101        match search_for_u16(0x90eb, &stream, pos) {
102          Err(err) => {
103            println!("Unable to find next header! {err}");
104            break;
105          }
106          Ok(head_pos) => {
107            pos = head_pos;
108          }
109        }
110      }
111    }
112  }
113  events
114}
115
116
117/// Read serialized TelemetryPackets from an existing file
118///
119/// Read GAPS binary files ("Berkeley binaries)
120#[derive(Debug)]
121pub struct TelemetryPacketReader {
122  /// Reader will emit packets from these files,
123  /// if one file is exhausted, it moves on to 
124  /// the next file automatically
125  pub filenames       : Vec<String>,
126  /// The index of the file the reader is 
127  /// currently reading
128  pub file_index      : usize,
129  file_reader         : BufReader<File>,
130  /// Current (byte) position in the file
131  cursor              : usize,
132  /// Read only packets of type == PacketType
133  pub filter          : TelemetryPacketType,
134  /// Number of read packets
135  n_packs_read        : usize,
136  /// Number of skipped packets
137  n_packs_skipped     : usize,
138  /// Skip the first n packets
139  pub skip_ahead      : usize,
140  /// Stop reading after n packets
141  pub stop_after      : usize,
142}
143
144impl fmt::Display for TelemetryPacketReader {
145  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
146    let mut range_repr = String::from("");
147    if self.skip_ahead > 0 {
148      range_repr += &(format!("({}", self.skip_ahead));
149    } else {
150      range_repr += "(";
151    }
152    if self.stop_after > 0 {
153      range_repr += &(format!("..{})", self.stop_after));
154    } else {
155      range_repr += "..)";
156    }
157    let repr = format!("<TelemetryPacketReader : read {} packets, filter {}, range {},\n files {:?}>", self.n_packs_read, self.filter, range_repr, self.filenames);
158    write!(f, "{}", repr)
159  }
160}
161
162impl TelemetryPacketReader {
163  
164  fn list_path_contents_sorted(input: &str) -> Result<Vec<String>, io::Error> {
165    let path = Path::new(input);
166    match fs::metadata(path) {
167      Ok(metadata) => {
168        if metadata.is_file() {
169          let fname = String::from(input);
170          return Ok(vec![fname]);
171        } 
172        if metadata.is_dir() {
173          let re = Regex::new(r"RAW(\d{6})_(\d{6})\.bin$").unwrap();
174
175          let mut entries: Vec<(u32, u32, String)> = fs::read_dir(path)?
176            .filter_map(Result::ok) // Ignore unreadable entries
177            .filter_map(|entry| {
178              let filename = format!("{}/{}", path.display(), entry.file_name().into_string().ok()?);
179              re.captures(&filename.clone()).map(|caps| {
180                let date = caps.get(1)?.as_str().parse::<u32>().ok()?;
181                let time = caps.get(2)?.as_str().parse::<u32>().ok()?;
182                Some((date, time, filename))
183              })?
184            })
185            .collect();
186
187          // Sort by (date, time)
188          entries.sort_by(|a, b| (a.0, a.1).cmp(&(b.0, b.1)));
189          // Return only filenames
190          return Ok(entries.into_iter().map(|(_, _, name)| name).collect());
191        } 
192        Err(io::Error::new(ErrorKind::Other, "Path exists but is neither a file nor a directory"))
193      }
194      Err(e) => Err(e),
195    }
196  }
197
198  pub fn new(filename_or_directory : String) -> Self {
199    let firstfile : String;
200    match Self::list_path_contents_sorted(&filename_or_directory) {
201      Err(err) => {
202        error!("{} does not seem to be either a valid directory or an existing file! {err}", filename_or_directory);
203        panic!("Unable to open files!");
204      }
205      Ok(files) => {
206        firstfile = files[0].clone();
207        match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
208          Err(err) => {
209            error!("Unable to open file {firstfile}! {err}");
210            panic!("Unable to create reader from {filename_or_directory}!");
211          }
212          Ok(file) => {
213            let packet_reader = Self { 
214              filenames       : files,
215              file_index      : 0,
216              file_reader     : BufReader::new(file),
217              cursor          : 0,
218              filter          : TelemetryPacketType::Unknown,
219              n_packs_read    : 0,
220              skip_ahead      : 0,
221              stop_after      : 0,
222              n_packs_skipped : 0,
223            };
224            packet_reader
225          }
226        }
227      }
228    }
229  } 
230
231  /// Get an index of the file - count number of packets
232  ///
233  /// Returns the number of all PacketTypes in the file
234  pub fn get_packet_index(&mut self) -> io::Result<HashMap<u8, usize>> {
235    error!("The packet index function is currently broken and will only show the packet index for one file, not for all!");
236    error!("FIXME!");
237    let mut index  = HashMap::<u8, usize>::new();
238    let mut buffer = [0];
239    loop {
240      match self.file_reader.read_exact(&mut buffer) {
241        Err(err) => {
242          debug!("Unable to read from file! {err}");
243          //return None;
244          break;
245        }
246        Ok(_) => {
247          self.cursor += 1;
248        }
249      }
250      if buffer[0] != 0xeb {
251        continue;
252      } else {
253        match self.file_reader.read_exact(&mut buffer) {
254          Err(err) => {
255            debug!("Unable to read from file! {err}");
256            //return None;
257            break;
258          }
259          Ok(_) => {
260            self.cursor += 1;
261          }
262        }
263
264        if buffer[0] != 0x90 { 
265          continue;
266        } else {
267          // the 3rd byte is the packet type
268          match self.file_reader.read_exact(&mut buffer) {
269             Err(err) => {
270              debug!("Unable to read from file! {err}");
271              break;
272            }
273            Ok(_) => {
274              self.cursor += 1;
275            }
276          }
277          let ptype    = TelemetryPacketType::from(buffer[0]);
278          let mut padding = [0,0,0,0,0,0];
279          match self.file_reader.read_exact(&mut padding) {
280            Err(err) => {
281              error!("Unable to read from file! {err}");
282              break;
283            }
284            Ok(_) => {
285              self.cursor += 6;
286            }
287          }
288          // read the the size of the packet
289
290          let mut buffer_psize = [0,0,0,0];
291          match self.file_reader.read_exact(&mut buffer_psize) {
292            Err(err) => {
293              error!("Unable to read from file! {err}");
294              break;
295            }
296            Ok(_) => {
297              self.cursor += 4;
298            }
299          }
300          let vec_data = buffer_psize.to_vec();
301          let mut size = parse_u32(&vec_data, &mut 0);
302          // This size includes the header
303          if (size as usize) < TelemetryHeader::SIZE {
304            error!("This packet might be empty or corrupt!");
305            break;
306          }
307          size -= TelemetryHeader::SIZE as u32;
308
309          match self.file_reader.seek(SeekFrom::Current(size as i64)) {
310            Err(err) => {
311              debug!("Unable to read more data! {err}");
312              break; 
313            }
314            Ok(_) => {
315              self.cursor += size as usize;
316              // and then we add the packet type to the 
317              // hashmap
318              let ptype_key = ptype as u8;
319              if index.contains_key(&ptype_key) {
320                *index.get_mut(&ptype_key).unwrap() += 1;
321              } else {
322                index.insert(ptype_key, 1usize);
323              }
324            }
325          }
326        }
327      } // if no 0xAA found
328    } // end loop
329    self.rewind()?;
330    Ok(index)
331  } // end fn
332
333  pub fn rewind(&mut self) -> io::Result<()> {
334    let firstfile = &self.filenames[0];
335    match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
336      Err(err) => {
337        error!("Unable to open file {firstfile}! {err}");
338        panic!("Unable to create reader from {firstfile}!");
339      }
340      Ok(file) => {
341        self.file_reader  = BufReader::new(file);
342      }
343    }   
344    self.file_index = 0;
345    self.cursor = 0;
346    Ok(())
347  }
348
349  /// Get the next file ready
350  fn prime_next_file(&mut self) -> Option<usize> {
351    if self.file_index == self.filenames.len() -1 {
352      return None;
353    } else {
354      self.file_index += 1;
355      let nextfilename = self.filenames[self.file_index].clone();
356      let nextfile     = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
357      self.file_reader = BufReader::new(nextfile);
358      self.cursor      = 0;
359      return Some(self.file_index);
360    }
361  }
362
363  /// Return the next tofpacket in the stream
364  ///
365  /// Will return none if the file has been exhausted.
366  /// Use ::rewind to start reading from the beginning
367  /// again.
368  pub fn get_next_packet(&mut self) -> Option<TelemetryPacket> {
369    // filter::Unknown corresponds to allowing any
370    let mut buffer = [0];
371    loop {
372      match self.file_reader.read_exact(&mut buffer) {
373        Err(err) => {
374          debug!("Unable to read from file! {err}");
375          self.prime_next_file()?;
376          return self.get_next_packet();
377        }
378        Ok(_) => {
379          self.cursor += 1;
380        }
381      }
382      if buffer[0] != 0xeb {
383        continue;
384      } else {
385        match self.file_reader.read_exact(&mut buffer) {
386          Err(err) => {
387            debug!("Unable to read from file! {err}");
388            self.prime_next_file()?;
389            return self.get_next_packet();
390          }
391          Ok(_) => {
392            self.cursor += 1;
393          }
394        }
395
396        if buffer[0] != 0x90 { 
397          continue;
398        } else {
399          // the 3rd byte is the packet type
400          match self.file_reader.read_exact(&mut buffer) {
401             Err(err) => {
402              debug!("Unable to read from file! {err}");
403              self.prime_next_file()?;
404              return self.get_next_packet();
405            }
406            Ok(_) => {
407              self.cursor += 1;
408            }
409          }
410          let mut thead = TelemetryHeader::new();
411          thead.sync      = 0x90eb;
412          thead.ptype     = buffer[0];
413          let ptype    = TelemetryPacketType::from(buffer[0]);
414          // read the the size of the packet
415          let mut buffer_ts = [0,0,0,0];
416          match self.file_reader.read_exact(&mut buffer_ts) {
417            Err(err) => {
418              debug!("Unable to read from file! {err}");
419              self.prime_next_file()?;
420              return self.get_next_packet();
421            }
422            Ok(_) => {
423              self.cursor += 4;
424              thead.timestamp = u32::from_le_bytes(buffer_ts);
425            }
426          }
427          let mut buffer_counter = [0,0];
428          match self.file_reader.read_exact(&mut buffer_counter) {
429            Err(err) => {
430              debug!("Unable to read from file! {err}");
431              self.prime_next_file()?;
432              return self.get_next_packet();
433            }
434            Ok(_) => {
435              self.cursor += 2;
436              thead.counter   = u16::from_le_bytes(buffer_counter);
437            }
438          }
439          let mut buffer_length = [0,0];
440          match self.file_reader.read_exact(&mut buffer_length) {
441            Err(err) => {
442              debug!("Unable to read from file! {err}");
443              return None;
444            }
445            Ok(_) => {
446              self.cursor += 2;
447              thead.length    = u16::from_le_bytes(buffer_length);
448            }
449          }
450          let mut buffer_checksum = [0,0];
451          match self.file_reader.read_exact(&mut buffer_checksum) {
452            Err(err) => {
453              debug!("Unable to read from file! {err}");
454              self.prime_next_file()?;
455              return self.get_next_packet();
456            }
457            Ok(_) => {
458              self.cursor += 2;
459              thead.checksum    = u16::from_le_bytes(buffer_checksum);
460            }
461          }
462          
463          let mut size     = thead.length;
464          // This size includes the header
465          if (size as usize) < TelemetryHeader::SIZE {
466            error!("This packet might be empty or corrupt!");
467            return None;
468          }
469          size -= TelemetryHeader::SIZE as u16;
470          if ptype != self.filter && self.filter != TelemetryPacketType::Unknown {
471            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
472              Err(err) => {
473                debug!("Unable to read more data! {err}");
474                self.prime_next_file()?;
475                return self.get_next_packet();
476              }
477              Ok(_) => {
478                self.cursor += size as usize;
479              }
480            }
481            continue; // this is just not the packet we want
482          }
483          // now at this point, we want the packet!
484          // except we skip ahead or stop earlier
485          if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
486            // we don't want it
487            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
488              Err(err) => {
489                debug!("Unable to read more data! {err}");
490                self.prime_next_file()?;
491                return self.get_next_packet();
492              }
493              Ok(_) => {
494                self.n_packs_skipped += 1;
495                self.cursor += size as usize;
496              }
497            }
498            continue; // this is just not the packet we want
499          }
500          if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
501            // we don't want it
502            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
503              Err(err) => {
504                debug!("Unable to read more data! {err}");
505                self.prime_next_file()?;
506                return self.get_next_packet();
507              }
508              Ok(_) => {
509                self.cursor += size as usize;
510              }
511            }
512            continue; // this is just not the packet we want
513          }
514          
515
516          let mut tp = TelemetryPacket::new();
517          tp.header  = thead;
518          
519          //tp.packet_type = ptype;
520          //let mut payload = vec![0u8;TelemetryHeader::SIZE];
521          //match self.file_reader.read_exact(&mut payload) {
522          //  Err(err) => {
523          //    debug!("Unable to read from file! {err}");
524          //    return None;
525          //  }
526          //  Ok(_) => {
527          //    self.cursor += size as usize;
528          //  }
529          //}
530
531          let mut payload = vec![0u8;size as usize];
532          match self.file_reader.read_exact(&mut payload) {
533            Err(err) => {
534              debug!("Unable to read from file! {err}");
535              self.prime_next_file()?;
536              return self.get_next_packet();
537            }
538            Ok(_) => {
539              self.cursor += tp.header.length as usize;
540            }
541          }
542
543          tp.payload = payload;
544          self.n_packs_read += 1;
545          return Some(tp);
546        }
547      } // if no 0xAA found
548    } // end loop
549  } // end fn
550}
551
552impl Default for TelemetryPacketReader {
553  fn default() -> Self {
554    TelemetryPacketReader::new(String::from(""))
555  }
556}
557
558impl Iterator for TelemetryPacketReader {
559  type Item = TelemetryPacket;
560  
561  fn next(&mut self) -> Option<Self::Item> {
562    self.get_next_packet()
563  }
564}
565
566