tof_dataclasses/
io.rs

1//! Dataio - readino/writing of different types
2//!
3//! of TOF data products.
4//!
5//! * TofPacketReader/Writer - sequentially read/write
6//!   TofPackets from/to a file on disk.
7//!   -> upcoming: Will connect to a network socket
8//!
9//! * RobinReader: Read (old) RB data files, where 
10//!   the file is simply a dump of the internal 
11//!   buffers ("RBEventMemoryView").
12//!
13//! * RBEventMemoryStreamer: Walk over "raw" RBEvents
14//!   representations ("RBEventMemoryView") and extract
15//!   RBEvents
16//!
17
18// change if we switch to a firmware
19// where the byteorder of u32 and larger 
20// is correct.
21const REVERSE_WORDS : bool = true;
22const ALGO : crc::Algorithm<u32> = crc::Algorithm {
23      width   : 32u8,
24      init    : 0xFFFFFFFF,
25      //poly    : 0xEDB88320,
26      poly    : 0x04C11DB7,
27      refin   : true,
28      refout  : true,
29      xorout  : 0xFFFFFFFF,
30      check   : 0,
31      residue : 0,
32    };
33
34use std::fmt;
35
36use crc::Crc;
37
38use std::path::Path;
39use std::fs::{
40    self,
41    File,
42    OpenOptions
43};
44
45use std::io;
46use std::io::{
47  ErrorKind,
48  BufReader,
49  Seek,
50  SeekFrom,
51  Read,
52  Write,
53};
54
55use std::collections::{
56  VecDeque,
57  HashMap
58};
59
60
61
62
63extern crate chrono;
64use chrono::{DateTime, Utc};
65
66use indicatif::{ProgressBar, ProgressStyle};
67use crossbeam_channel::Sender;
68use regex::Regex;
69
70use crate::events::{
71    RBEvent,
72    RBEventHeader,
73    EventStatus,
74};
75use crate::packets::{
76    TofPacket,
77    PacketType,
78};
79use crate::constants::{
80    NWORDS,
81    HUMAN_TIMESTAMP_FORMAT
82};
83use crate::serialization::{
84    Serialization,
85    Packable,
86    //SerializationError,
87    u8_to_u16_14bit,
88    u8_to_u16_err_check,
89    search_for_u16,
90    parse_u8,
91    parse_u16,
92    parse_u32,
93};
94
95use crate::events::TofEvent;
96
97/// Types of files
98#[derive(Debug, Clone)]
99pub enum FileType {
100  Unknown,
101  /// Calibration file for specific RB with id
102  CalibrationFile(u8),
103  /// A regular run file with TofEvents
104  RunFile(u32),
105  /// A file created from a file with TofEvents which 
106  /// contains only TofEventSummary
107  SummaryFile(String),
108}
109
110/// Get a human readable timestamp
111pub fn get_utc_timestamp() -> String {
112  let now: DateTime<Utc> = Utc::now();
113  //let timestamp_str = now.format("%Y_%m_%d-%H_%M_%S").to_string();
114  let timestamp_str = now.format(HUMAN_TIMESTAMP_FORMAT).to_string();
115  timestamp_str
116}
117
118/// Create date string in YYMMDD format
119pub fn get_utc_date() -> String {
120  let now: DateTime<Utc> = Utc::now();
121  //let timestamp_str = now.format("%Y_%m_%d-%H_%M_%S").to_string();
122  let timestamp_str = now.format("%y%m%d").to_string();
123  timestamp_str
124}
125
126/// A standardized name for calibration files saved by 
127/// the liftof suite
128///
129/// # Arguments
130///
131/// * rb_id   : unique identfier for the 
132///             Readoutboard (1-50)
133/// * default : if default, just add 
134///             "latest" instead of 
135///             a timestamp
136pub fn get_califilename(rb_id : u8, latest : bool) -> String {
137  let ts = get_utc_timestamp();
138  if latest {
139    format!("RB{rb_id:02}_latest.cali.tof.gaps")
140  } else {
141    format!("RB{rb_id:02}_{ts}.cali.tof.gaps")
142  }
143}
144
145/// A standardized name for regular run files saved by
146/// the liftof suite
147///
148/// # Arguments
149///
150/// * run    : run id (identifier)
151/// * subrun : subrun id (identifier of file # within
152///            the run
153/// * rb_id  : in case this should be used on the rb, 
154///            a rb id can be specified as well
155pub fn get_runfilename(run : u32, subrun : u64, rb_id : Option<u8>) -> String {
156  let ts = get_utc_timestamp();
157  let fname : String;
158  match rb_id {
159    None => {
160      fname = format!("Run{run}_{subrun}.{ts}.tof.gaps");
161    }
162    Some(rbid) => {
163      fname = format!("Run{run}_{subrun}.{ts}.RB{rbid:02}.tof.gaps");
164    }
165  }
166  fname
167}
168
169/// Read an entire file into memory
170///
171/// Represents the contents of a file 
172/// as a byte vector
173/// 
174/// # Arguments:
175///
176/// * filename : full path to the file to be read
177pub fn read_file(filename: &Path) -> io::Result<Vec<u8>> {
178  info!("Reading file {}", filename.display());
179  let mut f = File::open(&filename)?;
180  let metadata = fs::metadata(&filename)?;
181  let mut buffer = vec![0; metadata.len() as usize];
182  info!("Read {} bytes from {}", buffer.len(), filename.display());
183  // read_exact if the amount is not specified
184  f.read_exact(&mut buffer)?;
185  Ok(buffer)
186}
187
188/// Take a .tof.gaps file with TofEvents and keep all packets, 
189/// but reduce the TofEvents to TofEventSuammry to conserve space.
190pub fn summarize_toffile(fname : String) {
191  let mut reader    = TofPacketReader::new(fname.clone());
192  let outfile       = fname.replace(".tof.", ".tofsum."); 
193  let outfile_type  = FileType::SummaryFile(fname.clone());
194  let mut writer    = TofPacketWriter::new(outfile,outfile_type); 
195  let mut n_errors  = 0u32;
196  let npack : usize = reader.get_packet_index().unwrap_or(HashMap::<PacketType,usize>::new()).values().cloned().collect::<Vec<usize>>().iter().sum();
197  let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
198  let bar_style  = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
199  let bar_label  = String::from("Reading events");
200  let bar = ProgressBar::new(npack as u64);
201  bar.set_position(0);
202  bar.set_message (bar_label);
203  bar.set_prefix  ("\u{2728}");
204  bar.set_style   (bar_style);
205  let mut npack = 0u64;
206  for pack in reader {
207    npack += 1;
208    bar.set_position(npack);
209    match pack.packet_type {
210      PacketType::TofEvent => {
211        match pack.unpack::<TofEvent>() {
212          Err(err) => {
213            debug!("Can't unpack TofEvent! {err}");
214            n_errors += 1;
215          }
216          Ok(te) => {
217            let ts = te.get_summary();
218            let tp = ts.pack();
219            writer.add_tof_packet(&tp); 
220          }
221        }
222      }
223      _ => {
224        writer.add_tof_packet(&pack);
225      }
226    }
227  }
228  bar.finish_with_message("Done!");
229  if n_errors > 0 {
230    error!("Unpacking TofEvents from {} failed {} times!", n_errors, fname);
231  }
232}
233
234/// Emit RBEvents from a stream of bytes
235/// from RBMemory
236///
237/// The layout of the stream has to have
238/// the fpga fw memory layout.
239///
240/// This provides a next() method to act
241/// as a generator for RBEvents
242pub struct RBEventMemoryStreamer {
243  /// Raw stream read out from the RB buffers.
244  pub stream               : Vec<u8>,
245  /// Error checking mode - check error bits for 
246  /// channels/cells
247  pub check_channel_errors : bool,
248  /// Ignore channels in this list
249  pub mask                 : Vec<u8>,
250
251  /// Current position in the stream
252  pos                      : usize,
253  /// The current posion marker points to a header 
254  /// signature in the stream.
255  pos_at_head              : bool,
256  /// An optional crossbeam::channel Sender, which 
257  /// will allow to send TofPackets
258  pub tp_sender            : Option<Sender<TofPacket>>,
259  /// number of extracted events from stream
260  /// this manages how we are draining the stream
261  n_events_ext             : usize,
262  pub is_depleted          : bool,
263  /// Calculate the crc32 checksum for the channels 
264  /// everytime next() is called
265  pub calc_crc32           : bool,
266  /// placeholder for checksum calculator
267  crc32_sum                : Crc::<u32>,
268  pub request_mode         : bool,
269  pub request_cache        : VecDeque<(u32,u8)>,
270  /// an index for the events in the stream
271  /// this links eventid and start position
272  /// in the stream together
273  pub event_map            : HashMap<u32,(usize,usize)>,
274  pub first_evid           : u32,
275  pub last_evid            : u32,
276  pub last_event_complete  : bool,
277  pub last_event_pos       : usize,
278  /// When in request mode, number of events the last event in the stream is behind the
279  /// first request
280  pub is_behind_by         : usize,
281  /// When in request mode, number of events the last event in the stream is ahead the
282  /// last request
283  pub is_ahead_by          : usize,
284}
285
286impl RBEventMemoryStreamer {
287
288  pub fn new() -> Self {
289    Self {
290      stream               : Vec::<u8>::new(),
291      check_channel_errors : false,
292      mask                 : Vec::<u8>::new(),
293      pos                  : 0,
294      pos_at_head          : false,
295      tp_sender            : None,
296      n_events_ext         : 0,
297      is_depleted          : false,
298      calc_crc32           : false,
299      crc32_sum            : Crc::<u32>::new(&ALGO),
300      request_mode         : false,
301      request_cache        : VecDeque::<(u32,u8)>::new(),
302      event_map            : HashMap::<u32,(usize,usize)>::new(),
303      first_evid           : 0,
304      last_evid            : 0,
305      last_event_complete  : false,
306      last_event_pos       : 0,
307      is_behind_by         : 0,
308      is_ahead_by          : 0,
309    }
310  }
311 
312  /// Create the event index, which is
313  /// a map of event ids and position 
314  /// + length in the stream
315  pub fn create_event_index(&mut self) { //-> Result<Ok, SerializationError> {
316    let begin_pos = self.pos;
317    let mut event_id = 0u32;
318    // we are now at head, 
319    // read packet len and event id
320    loop {
321      let mut result = (0usize, 0usize);
322      if !self.seek_next_header(0xaaaa) {
323        debug!("Could not find another header...");
324        self.pos = begin_pos;
325        self.last_evid = event_id;
326        if result.0 + result.1 > self.stream.len() - 1 {
327          self.last_event_complete = false;
328        } else {
329          self.last_event_complete = true;
330        }
331        info!("Indexed {} events from {} to {}", self.event_map.len(), self.first_evid, self.last_evid);
332        return;
333      }
334      result.0 = self.pos;
335      self.pos += 4;//header, status
336      let packet_len = parse_u16(&self.stream, &mut self.pos) as usize * 2;
337      if self.stream.len() < self.pos -6 + packet_len {
338        //self.is_depleted = true;
339        self.pos = begin_pos;
340        self.last_evid = event_id;
341        info!("Indexed {} events from {} to {}", self.event_map.len(), self.first_evid, self.last_evid);
342        return;
343        //return Err(SerializationError::StreamTooShort);
344      }
345      result.1 = packet_len;
346      if packet_len < 6 {
347        self.pos = begin_pos;
348        self.last_evid = event_id;
349        info!("Indexed {} events from {} to {}", self.event_map.len(), self.first_evid, self.last_evid);
350        return;
351        //return Err(SerializationError::StreamTooShort);
352      }
353      // rewind
354      self.pos -= 6;
355      // event id is at pos 22
356      self.pos += 22;
357      let event_id0    = parse_u16(&self.stream, &mut self.pos);
358      let event_id1    = parse_u16(&self.stream, &mut self.pos);
359      if REVERSE_WORDS {
360        event_id = u32::from(event_id0) << 16 | u32::from(event_id1);
361      } else {
362        event_id = u32::from(event_id1) << 16 | u32::from(event_id0);
363      }
364      if self.first_evid == 0 {
365        self.first_evid = event_id;
366      }
367      self.pos += packet_len - 26;
368      self.event_map.insert(event_id,result);
369    }
370  }
371
372  pub fn print_event_map(&self) {
373    for k in self.event_map.keys() {
374      let pos = self.event_map[&k];
375      println!("-- --> {} -> {},{}", k, pos.0, pos.1);
376    }
377  }
378
379  // EXPERIMENTAL
380  pub fn init_sender(&mut self, tp_sender : Sender<TofPacket>) {
381    self.tp_sender = Some(tp_sender);
382  }
383
384  // EXPERIMENTAL
385  pub fn send_all(&mut self) {
386    loop {
387      match self.next() {
388        None => {
389          info!("Streamer drained!");
390          break;
391        },
392        Some(ev) => {
393          let tp = TofPacket::from(&ev);
394          match self.tp_sender.as_ref().expect("Sender needs to be initialized first!").send(tp) {
395            Ok(_) => (),
396            Err(err) => {
397              error!("Unable to send TofPacket! {err}");
398            }
399          }
400        }
401      }
402    }
403  }
404
405
406  // FIXME - performance. Don't extend it. It would be
407  // better if we'd consume the stream without 
408  // reallocating memory.
409  pub fn add(&mut self, stream : &Vec<u8>, nbytes : usize) {
410    //self.stream.extend(stream.iter().copied());
411    //println!("self.pos {}", self.pos);
412    //println!("Stream before {}",self.stream.len());
413    self.is_depleted = false;
414    self.stream.extend_from_slice(&stream[0..nbytes]);
415    //self.create_event_index();
416    //println!("Stream after {}",self.stream.len());
417  }
418
419  /// Take in a stream by consuming it, that means moving
420  /// This will avoid clones.
421  pub fn consume(&mut self, stream : &mut Vec<u8>) {
422    self.is_depleted = false;
423    // FIXME: append can panic
424    // we use it here, since it does not clone
425    //println!("[io.rs] consuming {} bytes", stream.len());
426    self.stream.append(stream);
427    //println!("[io.rs] stream has now {} bytes", self.stream.len());
428    //self.create_event_index();
429  }
430
431  /// Headers are expected to be a 2byte signature, 
432  /// e.g. 0xaaaa. 
433  ///
434  /// # Arguments:
435  ///   header : 2byte header.
436  ///
437  /// # Returns
438  /// 
439  ///   * success   : header found
440  pub fn seek_next_header(&mut self, header : u16) -> bool{
441    match search_for_u16(header, &self.stream, self.pos) {
442      Err(_) => {
443        return false;
444      }
445      Ok(head_pos) => {
446        self.pos = head_pos;
447        self.pos_at_head = true;
448        return true;
449      }
450    }
451  }
452
453  pub fn next_tofpacket(&mut self) -> Option<TofPacket> {
454    let begin_pos = self.pos; // in case we need
455                              // to reset the position
456    let foot_pos : usize;
457    let head_pos : usize;
458    if self.stream.len() == 0 {
459      trace!("Stream empty!");
460      return None;
461    }
462    if !self.pos_at_head {
463      if !self.seek_next_header(0xaaaa) {
464        debug!("Could not find another header...");
465        self.pos = begin_pos;
466        return None;
467      }
468    }
469    head_pos  = self.pos;
470    //let mut foot_pos  = self.pos;
471    //head_pos = self.pos;
472    if !self.seek_next_header(0x5555) {
473      debug!("Could not find another footer...");
474      self.pos = begin_pos;
475      return None;
476    }
477    //println!("{} {} {}", self.stream.len(), head_pos, foot_pos);
478    foot_pos = self.pos;
479    self.n_events_ext += 1;
480    let mut tp = TofPacket::new();
481    tp.packet_type = PacketType::RBEventMemoryView;
482    //let mut payload = Vec::<u8>::with_capacity(18530);
483    tp.payload.extend_from_slice(&self.stream[head_pos..foot_pos+2]);
484    //tp.payload = payload;
485    //self.pos += 2;
486    self.pos_at_head = false;
487    //self.stream.drain(0..foot_pos);
488    //self.pos = 0;
489    if self.n_events_ext % 200 == 0 {
490      self.stream.drain(0..foot_pos+3);
491      self.pos = 0;
492    }
493    Some(tp)
494  }
495
496
497  /// Retrive an RBEvent from a certain position
498  pub fn get_event_at_pos_unchecked(&mut self,
499                                    replace_channel_mask : Option<u16>)
500      -> Option<RBEvent> {
501    let mut header       = RBEventHeader::new();
502    let mut event        = RBEvent::new();
503    let mut event_status = EventStatus::Unknown;
504    //let begin_pos = self.pos;
505    if self.calc_crc32 && self.check_channel_errors {
506      event_status = EventStatus::Perfect;
507    }
508    if !self.calc_crc32 && !self.check_channel_errors {
509      event_status = EventStatus::GoodNoCRCOrErrBitCheck;
510    }
511    if !self.calc_crc32 && self.check_channel_errors {
512      event_status = EventStatus::GoodNoCRCCheck;
513    }
514    if self.calc_crc32 && !self.check_channel_errors {
515      event_status = EventStatus::GoodNoErrBitCheck;
516    }
517    // start parsing
518    //let first_pos = self.pos;
519    let head   = parse_u16(&self.stream, &mut self.pos);
520    if head != RBEventHeader::HEAD {
521      error!("Event does not start with {}", RBEventHeader::HEAD);
522      return None;
523    }
524
525    let status = parse_u16(&self.stream, &mut self.pos);
526    // At this state, this can be a header or a full event. Check here and
527    // proceed depending on the options
528    header.parse_status(status);
529    let packet_len = parse_u16(&self.stream, &mut self.pos) as usize * 2;
530    let nwords     = parse_u16(&self.stream, &mut self.pos) as usize + 1; // the field will tell you the 
531    if self.pos - 8 + packet_len > self.stream.len() { // -1?
532      error!("Stream is too short! Stream len is {}, packet len is {}. We are at pos {}", self.stream.len(), packet_len, self.pos);
533      self.is_depleted = true;
534      self.pos -= 8;
535      return None;
536    }
537    // now we skip the next 10 bytes, 
538    // they are dna, rsv, rsv, rsv, fw_hash
539    self.pos += 10;
540    self.pos += 1; // rb id first byte is rsvd
541    header.rb_id        = parse_u8(&self.stream, &mut self.pos);
542    header.set_channel_mask(parse_u16(&self.stream, &mut self.pos)); 
543    match replace_channel_mask {
544      None => (),
545      Some(mask) => {
546        println!("==> Replacing ch mask {} with {}", header.get_channel_mask(), mask);
547        header.set_channel_mask(mask); 
548      }
549    }
550    let event_id0       = parse_u16(&self.stream, &mut self.pos);
551    let event_id1       = parse_u16(&self.stream, &mut self.pos);
552    let event_id : u32;
553    if REVERSE_WORDS {
554      event_id = u32::from(event_id0) << 16 | u32::from(event_id1);
555    } else {
556      event_id = u32::from(event_id1) << 16 | u32::from(event_id0);
557    }
558    
559    header.event_id  = event_id;
560    // we are currently not using these
561    //let _dtap0       = parse_u16(&self.stream, &mut self.pos);
562    //let _drs4_temp   = parse_u16(&self.stream, &mut self.pos);
563    self.pos += 4;
564    let timestamp0   = parse_u16(&self.stream, &mut self.pos);
565    let timestamp1   = parse_u16(&self.stream, &mut self.pos);
566    let timestamp2   = parse_u16(&self.stream, &mut self.pos);
567    //println!("TIMESTAMPS {} {} {}", timestamp0, timestamp1, timestamp2);
568    let timestamp16 : u16;
569    let timestamp32 : u32;
570    if REVERSE_WORDS {
571      timestamp16 = timestamp0;
572      timestamp32 = u32::from(timestamp1) << 16 | u32::from(timestamp2);
573    } else {
574      timestamp16 = timestamp2;
575      timestamp32 = u32::from(timestamp0) << 16 | u32::from(timestamp1);
576    }
577    header.timestamp16 = timestamp16;
578    header.timestamp32 = timestamp32;
579    // now the payload
580    //println!("{}", header);
581    //println!("{}", nwords);
582    if header.drs_lost_trigger() {
583      event.status = EventStatus::IncompleteReadout;
584      event.header = header;
585      //self.pos_at_head = false;
586      return Some(event);
587    }
588    // make sure we can read them!
589    //let expected_packet_size =   header.get_channels().len()*nwords*2 
590    //                           + header.get_channels().len()*2 
591    //                           + header.get_channels().len()*4;
592    let mut any_cell_error = false;
593    let mut header_channels = header.get_channels().clone();
594    for k in &self.mask {
595      header_channels.retain(|x| x != k);
596    }
597
598    for ch in header_channels.iter() {
599      let ch_id = parse_u16(&self.stream, &mut self.pos);
600      if ch_id != *ch as u16 {
601        // check where is the next header
602        let search_pos = self.pos;
603        match search_for_u16(TofPacket::HEAD, &self.stream, search_pos) {
604          Err(_) => (),
605          Ok(result) => {
606            info!("The channel data is corrupt, but we found a header at {} for remaining stream len {}", result, self.stream.len()); 
607          }
608        }
609        let mut stream_view = Vec::<u8>::new();
610        let foo_pos = self.pos;
611        for k in foo_pos -3..foo_pos + 3 {
612          stream_view.push(self.stream[k]);
613        }
614        error!("We got {ch_id} but expected {ch} for event {}. The parsed ch id is not in the channel mask! We will fill this channel with u16::MAX .... Stream view +- 3 around the ch id {:?}", header.event_id, stream_view);
615        event_status = EventStatus::ChannelIDWrong;
616        // we fill the channel with MAX values:
617        event.adc[*ch as usize] = vec![u16::MAX;NWORDS];
618        self.pos += 2*nwords + 4;
619        continue;
620      } else {
621      //if ch_id == *ch as u16 {
622        //println!("Got ch id {}", ch_id);
623        //let header = parse_u16(&self.stream, &mut self.pos);
624        // noice!!
625        //let data : Vec<u8> = self.stream.iter().skip(self.pos).take(2*nwords).map(|&x| x).collect();
626         
627        let mut dig = self.crc32_sum.digest();
628        if self.calc_crc32 {
629          let mut this_ch_adc = Vec::<u16>::with_capacity(nwords);
630          for _ in 0..nwords {
631            let this_field = parse_u16(&self.stream, &mut self.pos);
632            dig.update(&this_field.to_le_bytes());
633            if self.check_channel_errors {
634              if ((0x8000 & this_field) >> 15) == 0x1 {
635                error!("Ch error bit set for ch {}!", ch);
636                event_status = EventStatus::ChnSyncErrors;
637              }
638              if ((0x4000 & this_field) >> 14) == 0x1 {
639                error!("Cell error bit set for ch {}!", ch);
640                event_status = EventStatus::CellSyncErrors;
641                any_cell_error = true;
642              }
643            }
644            this_ch_adc.push(0x3fff & this_field)
645          }
646          event.adc[*ch as usize] = this_ch_adc;
647        } else {
648          if self.check_channel_errors {
649            let adc_w_errs = u8_to_u16_err_check(&self.stream[self.pos..self.pos + 2*nwords]);
650            if adc_w_errs.1 {
651              error!("Ch error bit set for ch {}!", ch);
652              event_status = EventStatus::ChnSyncErrors;
653              any_cell_error = true;
654            } else if adc_w_errs.2 {
655              error!("Cell error bit set for ch {}!", ch);
656              event_status = EventStatus::CellSyncErrors;
657            }
658            event.adc[*ch as usize] = adc_w_errs.0;
659          } else {
660            event.adc[*ch as usize] = u8_to_u16_14bit(&self.stream[self.pos..self.pos + 2*nwords]);
661          }
662          self.pos += 2*nwords;
663        } 
664        //let data = &self.stream[self.pos..self.pos+2*nwords];
665        //self.pos += 2*nwords;
666        let crc320 = parse_u16(&self.stream, &mut self.pos);
667        let crc321 = parse_u16(&self.stream, &mut self.pos);
668        //let checksum = self.crc32_sum.clone().finalize();
669        if self.calc_crc32 {
670          let crc32 : u32;
671          if REVERSE_WORDS {
672            crc32 = u32::from(crc320) << 16 | u32::from(crc321);
673          } else {
674            crc32 = u32::from(crc321) << 16 | u32::from(crc320);
675          }
676          let checksum = dig.finalize();
677          if checksum != crc32 {
678            event_status = EventStatus::CRC32Wrong;
679          }
680          println!("== ==> Checksum {}, channel checksum {}!", checksum, crc32); 
681        }
682      }
683    }
684    if any_cell_error {
685      if event_status == EventStatus::ChnSyncErrors {
686        event_status = EventStatus::CellAndChnSyncErrors;
687      }
688    }
689    
690    if !header.drs_lost_trigger() {
691      header.stop_cell = parse_u16(&self.stream, &mut self.pos);
692    }
693    // CRC32 checksum - next 4 bytes
694    // FIXME
695    // skip crc32 checksum
696    self.pos += 4;
697
698    // in principle there is a checksum for the whole event, whcih
699    // we are currently not using (it is easy to spot wrong bytes
700    // in the header)
701    //let crc320         = parse_u16(&self.stream, &mut self.pos);
702    //let crc321         = parse_u16(&self.stream, &mut self.pos);
703    //if self.calc_crc32 {
704    //  let crc32 : u32;
705    //  if REVERSE_WORDS {
706    //    crc32 = u32::from(crc320) << 16 | u32::from(crc321);
707    //  } else {
708    //    crc32 = u32::from(crc321) << 16 | u32::from(crc320);
709    //  }
710    //  warn!("Checksum test for the whole event is not yet implemented!");
711    //  //if event.header.crc32 != crc32 {
712    //  //  trace!("Checksum test for the whole event is not yet implemented!");
713    //  //}
714    //}
715    
716    let tail         = parse_u16(&self.stream, &mut self.pos);
717    if tail != 0x5555 {
718      error!("Tail signature {} for event {} is invalid!", tail, header.event_id);
719      event_status = EventStatus::TailWrong;
720    } 
721    //self.stream.drain(0..self.pos);
722    self.pos_at_head = false;
723    event.header = header;
724    event.status = event_status;
725    if event_status == EventStatus::TailWrong {
726      info!("{}", event);
727    }
728    Some(event)
729  }
730
731  pub fn get_event_at_id(&mut self, event_id : u32, replace_channel_mask : Option<u16>) -> Option<RBEvent> {
732    let begin_pos = self.pos; // in case we need
733                              // to reset the position
734    //println!("--> Requested {}", event_id);
735    //if self.event_map.contains_key(&event_id) {
736    //  //println!("-- We have it!");
737    //} else {
738    //  //println!("-- We DON'T have it, event_map len {}", self.event_map.len());
739    //  //self.print_event_map();
740    //  //println!("-- last event id {}", self.last_evid);
741    //  //println!("-- first event id {}", self.first_evid);
742    //}
743    let pos = self.event_map.remove(&event_id)?;
744    if self.stream.len() < pos.0 + pos.1 {
745      trace!("Stream is too short!");
746      self.is_depleted = true;
747      self.pos = begin_pos;
748      return None;
749    }
750    self.pos = pos.0;
751    self.get_event_at_pos_unchecked(replace_channel_mask)
752  }
753}
754
755impl Iterator for RBEventMemoryStreamer {
756  type Item = RBEvent;
757
758  fn next(&mut self) -> Option<Self::Item> {
759    // FIXME - we should init this only once
760    // event id from stream
761    //let event_id  = 0u32;
762    let begin_pos : usize; // in case we need
763                           // to rewind
764     
765    self.pos_at_head = false;
766    begin_pos = self.pos; // in case we need
767                                // to reset the position
768    if self.stream.len() == 0 {
769      trace!("Stream empty!");
770      self.is_depleted = true;
771      self.pos = 0;
772      return None;
773    }
774    if !self.pos_at_head {
775      if !self.seek_next_header(0xaaaa) {
776        debug!("Could not find another header...");
777        self.pos = begin_pos;
778        self.is_depleted = true;
779        return None;
780      }
781    }
782    
783    let event          = self.get_event_at_pos_unchecked(None)?;
784    self.n_events_ext += 1;
785    self.stream.drain(0..self.pos);
786    self.pos           = 0;
787    self.pos_at_head   = false;
788    Some(event)
789  }
790}
791
792
793/// Generics for packet reading (TofPacket, Telemetry packet,...)
794/// FIXME - not implemented yet
795pub trait PacketReader {
796  /// header bytes, e.g. 0xAAAA for TofPackets
797  const HEADER0 : u8 = 0;
798  const HEADER1 : u8 = 0;
799
800  /// Manage the internal cursor attribute
801  fn set_cursor(&mut self, pos : usize);
802
803  /// Rewind the file, so it can be read again from the 
804  /// beginning
805  fn rewind(&mut self) -> io::Result<()> {
806    //self.file_reader.rewind()?;
807    self.set_cursor(0);
808    Ok(())
809  }
810}
811
812
813
814
815/// Read serialized TofPackets from an existing file or directory
816///
817/// This can read the "TOF stream" files, typically suffixed with .tof.gaps
818/// These files are typically written by a TofPacketReader instance, e.g. as 
819/// on the TOF flight computer
820#[derive(Debug)]
821pub struct TofPacketReader {
822  /// Read from this file
823  pub filenames       : Vec<String>,
824  file_reader         : BufReader<File>,
825  /// Current (byte) position in the file
826  cursor              : usize,
827  /// Read only packets of type == PacketType
828  pub filter          : PacketType,
829  /// Number of read packets
830  n_packs_read        : usize,
831  /// Number of skipped packets
832  n_packs_skipped     : usize,
833  /// Skip the first n packets
834  pub skip_ahead      : usize,
835  /// Stop reading after n packets
836  pub stop_after      : usize,
837  /// The index of the current file in the internal "filenames" vector.
838  pub file_index      : usize,
839}
840
841impl fmt::Display for TofPacketReader {
842  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
843    let mut range_repr = String::from("");
844    if self.skip_ahead > 0 {
845      range_repr += &(format!("({}", self.skip_ahead));
846    } else {
847      range_repr += "(";
848    }
849    if self.stop_after > 0 {
850      range_repr += &(format!("..{})", self.stop_after));
851    } else {
852      range_repr += "..)";
853    }
854    let repr = format!("<TofPacketReader :read {} packets, filter {}, range {}\n files {:?}>", self.n_packs_read, self.filter, range_repr, self.filenames);
855    write!(f, "{}", repr)
856  }
857}
858
859impl TofPacketReader {
860
861  pub fn get_current_filename(&self) -> Option<String> {
862    // should only happen when it is empty
863    if self.filenames.len() <= self.file_index {
864      return None;
865    }
866    Some(self.filenames[self.file_index].clone())
867  }
868
869  fn list_path_contents_sorted(input: &str) -> Result<Vec<String>, io::Error> {
870    let path = Path::new(input);
871    match fs::metadata(path) {
872      Ok(metadata) => {
873        if metadata.is_file() {
874          //return Ok(vec![path.file_name()
875          let fname = String::from(input);
876          return Ok(vec![fname]);
877          //return Ok(vec![path
878          //  .and_then(|name| name.to_str())
879          //  .map(String::from)
880          //  .ok_or_else(|| io::Error::new(ErrorKind::InvalidData, "Invalid filename"))?]);
881        } 
882        if metadata.is_dir() {
883          let re = Regex::new(r"Run\d+_\d+\.(\d{6})_(\d{6})UTC\.tof\.gaps$").unwrap();
884
885          let mut entries: Vec<(u32, u32, String)> = fs::read_dir(path)?
886            .filter_map(Result::ok) // Ignore unreadable entries
887            .filter_map(|entry| {
888              //let filename = String::from(entry.file_name().into_string().ok()?); // Convert to String
889              let filename = format!("{}/{}", path.display(), entry.file_name().into_string().ok()?);
890              re.captures(&filename.clone()).map(|caps| {
891                let date = caps.get(1)?.as_str().parse::<u32>().ok()?;
892                let time = caps.get(2)?.as_str().parse::<u32>().ok()?;
893                Some((date, time, filename))
894              })?
895            })
896            .collect();
897
898          // Sort by (date, time)
899          entries.sort_by(|a, b| (a.0, a.1).cmp(&(b.0, b.1)));
900          // Return only filenames
901          return Ok(entries.into_iter().map(|(_, _, name)| name).collect());
902        } 
903        Err(io::Error::new(ErrorKind::Other, "Path exists but is neither a file nor a directory"))
904      }
905      Err(e) => Err(e),
906    }
907  }
908
909  /// Setup a new Reader, allowing the argument to be either the name of a single file or 
910  /// the name of a directory
911  pub fn new(filename_or_directory : String) -> TofPacketReader {
912    let firstfile : String;
913    match TofPacketReader::list_path_contents_sorted(&filename_or_directory) {
914      Err(err) => {
915        error!("{} does not seem to be either a valid directory or an existing file! {err}", filename_or_directory);
916        panic!("Unable to open files!");
917      }
918      Ok(files) => {
919        firstfile = files[0].clone();
920        match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
921          Err(err) => {
922            error!("Unable to open file {firstfile}! {err}");
923            panic!("Unable to create reader from {filename_or_directory}!");
924          }
925          Ok(file) => {
926            let packet_reader = Self { 
927              filenames       : files,
928              file_reader     : BufReader::new(file),
929              cursor          : 0,
930              filter          : PacketType::Unknown,
931              n_packs_read    : 0,
932              skip_ahead      : 0,
933              stop_after      : 0,
934              n_packs_skipped : 0,
935              file_index      : 0,
936            };
937            packet_reader
938          }
939        }
940      }
941    } 
942  }
943
944  /// The very first TofPacket for a reader
945  pub fn first_packet(&mut self) -> Option<TofPacket> {
946    match self.rewind() {
947      Err(err) => {
948        error!("Error when rewinding files! {err}");
949      }
950      Ok(_) => ()
951    }
952    let pack = self.get_next_packet();
953    match self.rewind() {
954      Err(err) => {
955        error!("Error when rewinding files! {err}");
956      }
957      Ok(_) => ()
958    }
959    return pack;
960  }
961
962  /// The very last TofPacket for a reader
963  pub fn last_packet(&mut self) -> Option<TofPacket> { 
964    self.file_index = self.filenames.len() - 1;
965    let lastfilename = self.filenames[self.file_index].clone();
966    let lastfile     = OpenOptions::new().create(false).append(false).read(true).open(lastfilename).expect("Unable to open file {nextfilename}");
967    self.file_reader = BufReader::new(lastfile);
968    self.cursor      = 0;
969    let mut tp = TofPacket::new();
970    let mut idx = 0;
971    loop {
972      match self.get_next_packet() {
973        None => {
974          match self.rewind() {
975            Err(err) => {
976              error!("Error when rewinding files! {err}");
977            }
978            Ok(_) => ()
979          }
980          if idx == 0 {
981            return None;
982          } else {
983            return Some(tp);
984          }
985        }
986        Some(pack) => {
987          idx += 1;
988          tp = pack;
989          continue;
990        }
991      }
992    }
993  }
994
995
996  #[deprecated(since="0.10.0", note="Use public attribute instead!")]
997  pub fn set_filter(&mut self, ptype : PacketType) {
998    self.filter = ptype;
999  }
1000
1001  /// Get an index of the file - count number of packets
1002  ///
1003  /// Returns the number of all PacketTypes in the file
1004  pub fn get_packet_index(&mut self) -> io::Result<HashMap<PacketType, usize>> {
1005    let mut index  = HashMap::<PacketType, usize>::new();
1006    let mut buffer = [0];
1007    loop {
1008      match self.file_reader.read_exact(&mut buffer) {
1009        Err(err) => {
1010          debug!("Unable to read from file! {err}");
1011          //return None;
1012          break;
1013        }
1014        Ok(_) => {
1015          self.cursor += 1;
1016        }
1017      }
1018      if buffer[0] != 0xAA {
1019        continue;
1020      } else {
1021        match self.file_reader.read_exact(&mut buffer) {
1022          Err(err) => {
1023            debug!("Unable to read from file! {err}");
1024            //return None;
1025            break;
1026          }
1027          Ok(_) => {
1028            self.cursor += 1;
1029          }
1030        }
1031
1032        if buffer[0] != 0xAA { 
1033          continue;
1034        } else {
1035          // the 3rd byte is the packet type
1036          match self.file_reader.read_exact(&mut buffer) {
1037             Err(err) => {
1038              debug!("Unable to read from file! {err}");
1039              break;
1040            }
1041            Ok(_) => {
1042              self.cursor += 1;
1043            }
1044          }
1045          let ptype    = PacketType::from(buffer[0]);
1046          // read the the size of the packet
1047          let mut buffer_psize = [0,0,0,0];
1048          match self.file_reader.read_exact(&mut buffer_psize) {
1049            Err(err) => {
1050              error!("Unable to read from file! {err}");
1051              break;
1052            }
1053            Ok(_) => {
1054              self.cursor += 4;
1055            }
1056          }
1057          let vec_data = buffer_psize.to_vec();
1058          let size     = parse_u32(&vec_data, &mut 0);
1059          match self.file_reader.seek(SeekFrom::Current(size as i64)) {
1060            Err(err) => {
1061              debug!("Unable to read more data! {err}");
1062              break; 
1063            }
1064            Ok(_) => {
1065              self.cursor += size as usize;
1066              // and then we add the packet type to the 
1067              // hashmap
1068              //let ptype_key = ptype as u8;
1069              if index.contains_key(&ptype) {
1070                *index.get_mut(&ptype).unwrap() += 1;
1071              } else {
1072                index.insert(ptype, 1usize);
1073              }
1074            }
1075          }
1076        }
1077      } // if no 0xAA found
1078    } // end loop
1079    self.rewind()?;
1080    Ok(index)
1081  } // end fn
1082
1083  pub fn rewind(&mut self) -> io::Result<()> {
1084    let firstfile = &self.filenames[0];
1085    let file = OpenOptions::new().create(false).append(false).read(true).open(&firstfile)?;
1086    self.file_reader  = BufReader::new(file);
1087    self.cursor     = 0;
1088    self.file_index = 0;
1089    Ok(())
1090  }
1091
1092  /// Return the next tofpacket in the stream
1093  ///
1094  /// Will return none if the file has been exhausted.
1095  /// Use ::rewind to start reading from the beginning
1096  /// again.
1097  pub fn get_next_packet(&mut self) -> Option<TofPacket> {
1098    // filter::Unknown corresponds to allowing any
1099
1100    let mut buffer = [0];
1101    loop {
1102      match self.file_reader.read_exact(&mut buffer) {
1103        Err(err) => {
1104          debug!("Unable to read from file! {err}");
1105          if self.file_index == self.filenames.len() -1 {
1106            return None;
1107          } else {
1108            self.file_index += 1;
1109            let nextfilename = self.filenames[self.file_index].clone();
1110            let nextfile     = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1111            self.file_reader = BufReader::new(nextfile);
1112            self.cursor      = 0;
1113            return self.get_next_packet();
1114          }
1115        }
1116        Ok(_) => {
1117          self.cursor += 1;
1118        }
1119      }
1120      if buffer[0] != 0xAA {
1121        continue;
1122      } else {
1123        match self.file_reader.read_exact(&mut buffer) {
1124          Err(err) => {
1125            debug!("Unable to read from file! {err}");
1126            if self.file_index == self.filenames.len() -1 {
1127              return None;
1128            } else {
1129              self.file_index += 1;
1130              let nextfilename = self.filenames[self.file_index].clone();
1131              let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1132              self.file_reader = BufReader::new(nextfile);
1133              self.cursor      = 0;
1134              return self.get_next_packet();
1135            }
1136          }
1137          Ok(_) => {
1138            self.cursor += 1;
1139          }
1140        }
1141
1142        if buffer[0] != 0xAA { 
1143          continue;
1144        } else {
1145          // the 3rd byte is the packet type
1146          match self.file_reader.read_exact(&mut buffer) {
1147             Err(err) => {
1148              debug!("Unable to read from file! {err}");
1149              if self.file_index == self.filenames.len() -1 {
1150                return None;
1151              } else {
1152                self.file_index += 1;
1153                let nextfilename = self.filenames[self.file_index].clone();
1154                let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1155                self.cursor      = 0;
1156                self.file_reader = BufReader::new(nextfile);
1157                return self.get_next_packet();
1158              }
1159            }
1160            Ok(_) => {
1161              self.cursor += 1;
1162            }
1163          }
1164          let ptype    = PacketType::from(buffer[0]);
1165          // read the the size of the packet
1166          let mut buffer_psize = [0,0,0,0];
1167          match self.file_reader.read_exact(&mut buffer_psize) {
1168            Err(err) => {
1169              debug!("Unable to read from file! {err}");
1170              if self.file_index == self.filenames.len() -1 {
1171                return None;
1172              } else {
1173                self.file_index += 1;
1174                let nextfilename = self.filenames[self.file_index].clone();
1175                let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1176                self.cursor      = 0;
1177                self.file_reader = BufReader::new(nextfile);
1178                return self.get_next_packet();
1179              }
1180            }
1181            Ok(_) => {
1182              self.cursor += 4;
1183            }
1184          }
1185          let vec_data = buffer_psize.to_vec();
1186          let size     = parse_u32(&vec_data, &mut 0);
1187          if ptype != self.filter && self.filter != PacketType::Unknown {
1188            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
1189              Err(err) => {
1190                debug!("Unable to read more data! {err}");
1191                if self.file_index == self.filenames.len() -1 {
1192                  return None;
1193                } else {
1194                  self.file_index += 1;
1195                  let nextfilename = self.filenames[self.file_index].clone();
1196                  let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1197                  self.cursor      = 0;
1198                  self.file_reader = BufReader::new(nextfile);
1199                  return self.get_next_packet();
1200                }
1201              }
1202              Ok(_) => {
1203                self.cursor += size as usize;
1204              }
1205            }
1206            continue; // this is just not the packet we want
1207          }
1208          // now at this point, we want the packet!
1209          // except we skip ahead or stop earlier
1210          if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
1211            // we don't want it
1212            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
1213              Err(err) => {
1214                debug!("Unable to read more data! {err}");
1215                if self.file_index == self.filenames.len() -1 {
1216                  return None;
1217                } else {
1218                  self.file_index += 1;
1219                  let nextfilename = self.filenames[self.file_index].clone();
1220                  let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1221                  self.cursor      = 0;
1222                  self.file_reader = BufReader::new(nextfile);
1223                  return self.get_next_packet();
1224                }
1225              }
1226              Ok(_) => {
1227                self.n_packs_skipped += 1;
1228                self.cursor += size as usize;
1229              }
1230            }
1231            continue; // this is just not the packet we want
1232          }
1233          if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
1234            // we don't want it
1235            match self.file_reader.seek(SeekFrom::Current(size as i64)) {
1236              Err(err) => {
1237                debug!("Unable to read more data! {err}");
1238                if self.file_index == self.filenames.len() -1 {
1239                  return None;
1240                } else {
1241                  self.file_index += 1;
1242                  let nextfilename = self.filenames[self.file_index].clone();
1243                  let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1244                  self.cursor      = 0;
1245                  self.file_reader = BufReader::new(nextfile);
1246                  return self.get_next_packet();
1247                }
1248              }
1249              Ok(_) => {
1250                self.cursor += size as usize;
1251              }
1252            }
1253            continue; // this is just not the packet we want
1254
1255          }
1256
1257          let mut tp = TofPacket::new();
1258          tp.packet_type = ptype;
1259          let mut payload = vec![0u8;size as usize];
1260
1261          match self.file_reader.read_exact(&mut payload) {
1262            Err(err) => {
1263              debug!("Unable to read from file! {err}");
1264              if self.file_index == self.filenames.len() -1 {
1265                return None;
1266              } else {
1267                self.file_index += 1;
1268                let nextfilename = self.filenames[self.file_index].clone();
1269                let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1270                self.cursor      = 0;
1271                self.file_reader = BufReader::new(nextfile);
1272                return self.get_next_packet();
1273              }
1274            }
1275            Ok(_) => {
1276              self.cursor += size as usize;
1277            }
1278          }
1279          tp.payload = payload;
1280          // we don't filter, so we like this packet
1281          let mut tail = vec![0u8; 2];
1282          match self.file_reader.read_exact(&mut tail) {
1283            Err(err) => {
1284              debug!("Unable to read from file! {err}");
1285              if self.file_index == self.filenames.len() -1 {
1286                return None;
1287              } else {
1288                self.file_index += 1;
1289                let nextfilename = self.filenames[self.file_index].clone();
1290                let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1291                self.cursor      = 0;
1292                self.file_reader = BufReader::new(nextfile);
1293                return self.get_next_packet();
1294              }
1295            }
1296            Ok(_) => {
1297              self.cursor += 2;
1298            }
1299          }
1300          let tail = parse_u16(&tail,&mut 0);
1301          if tail != TofPacket::TAIL {
1302            debug!("TofPacket TAIL signature wrong!");
1303            return None;
1304          }
1305          self.n_packs_read += 1;
1306          return Some(tp);
1307        }
1308      } // if no 0xAA found
1309    } // end loop
1310  } // end fn
1311}
1312
1313impl Default for TofPacketReader {
1314  fn default() -> Self {
1315    TofPacketReader::new(String::from(""))
1316  }
1317}
1318
1319impl Iterator for TofPacketReader {
1320  type Item = TofPacket;
1321  
1322  fn next(&mut self) -> Option<Self::Item> {
1323    self.get_next_packet()
1324  }
1325}
1326
1327/// Write TofPackets to disk.
1328///
1329/// Operates sequentially, packets can 
1330/// be added one at a time, then will
1331/// be synced to disk.
1332pub struct TofPacketWriter {
1333
1334  pub file            : File,
1335  /// location to store the file
1336  pub file_path       : String,
1337  /// The maximum number of packets 
1338  /// for a single file. Ater this 
1339  /// number is reached, a new 
1340  /// file is started.
1341  pub pkts_per_file   : usize,
1342  /// The maximum number of (Mega)bytes
1343  /// per file. After this a new file 
1344  /// is started
1345  pub mbytes_per_file : usize,
1346  /// add timestamps to filenames
1347  pub file_type       : FileType,
1348  pub file_name       : String,
1349
1350  file_id             : usize,
1351  /// internal packet counter, number of 
1352  /// packets which went through the writer
1353  n_packets           : usize,
1354  /// internal counter for bytes written in 
1355  /// this file
1356  file_nbytes_wr      : usize,
1357}
1358
1359impl TofPacketWriter {
1360
1361  /// Instantiate a new PacketWriter 
1362  ///
1363  /// # Arguments
1364  ///
1365  /// * file_prefix     : Prefix file with this string. A continuous number will get 
1366  ///                     appended to control the file size.
1367  /// * file_type       : control the behaviour of how the filename is
1368  ///                     assigned.
1369  pub fn new(mut file_path : String, file_type : FileType) -> Self {
1370    //let filename = file_prefix.clone() + "_0.tof.gaps";
1371    let file : File;
1372    let file_name : String;
1373    if !file_path.ends_with("/") {
1374      file_path += "/";
1375    }
1376    match file_type {
1377      FileType::Unknown => {
1378        let filename = file_path.clone() + "Data.tof.gaps";
1379        let path     = Path::new(&filename); 
1380        info!("Writing to file {filename}");
1381        file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1382        file_name = filename;
1383      }
1384      FileType::RunFile(runid) => {
1385        let filename = format!("{}{}", file_path, get_runfilename(runid, 0, None));
1386        let path     = Path::new(&filename); 
1387        println!("Writing to file {filename}");
1388        file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1389        file_name = filename;
1390      }
1391      FileType::CalibrationFile(rbid) => {
1392        let filename = format!("{}{}", file_path, get_califilename(rbid, false));
1393        //let filename = file_path.clone() + &get_califilename(rbid,false);
1394        let path     = Path::new(&filename); 
1395        info!("Writing to file {filename}");
1396        file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1397        file_name = filename;
1398      }
1399      FileType::SummaryFile(ref fname) => {
1400        let filename = fname.replace(".tof.", ".tofsum.");
1401        let path     = Path::new(&filename);
1402        info!("Writing to file {filename}");
1403        file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1404        file_name = filename;
1405      }
1406    }
1407    Self {
1408      file,
1409      file_path        : file_path,
1410      pkts_per_file    : 0,
1411      mbytes_per_file  : 420,
1412      file_nbytes_wr   : 0,    
1413      file_type        : file_type,
1414      file_id          : 1,
1415      n_packets        : 0,
1416      file_name        : file_name,
1417    }
1418  }
1419
1420  pub fn get_file(&self) -> File { 
1421    let file : File;
1422    match &self.file_type {
1423      FileType::Unknown => {
1424        let filename = self.file_path.clone() + "Data.tof.gaps";
1425        let path     = Path::new(&filename); 
1426        info!("Writing to file {filename}");
1427        file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1428      }
1429      FileType::RunFile(runid) => {
1430        let filename = format!("{}{}", self.file_path, get_runfilename(*runid, self.file_id as u64, None));
1431        //let filename = self.file_path.clone() + &get_runfilename(runid,self.file_id as u64, None);
1432        let path     = Path::new(&filename); 
1433        info!("Writing to file {filename}");
1434        file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1435      }
1436      FileType::CalibrationFile(rbid) => {
1437        //let filename = self.file_path.clone() + &get_califilename(rbid,false);
1438        let filename = format!("{}{}", self.file_path, get_califilename(*rbid, false));
1439        let path     = Path::new(&filename); 
1440        info!("Writing to file {filename}");
1441        file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1442      }
1443      FileType::SummaryFile(fname) => {
1444        let filename = fname.replace(".tof.", ".tofsum.");
1445        let path     = Path::new(&filename);
1446        info!("Writing to file {filename}");
1447        file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1448      }
1449    }
1450    file
1451  }
1452
1453  /// Induce serialization to disk for a TofPacket
1454  ///
1455  ///
1456  pub fn add_tof_packet(&mut self, packet : &TofPacket) {
1457    let buffer = packet.to_bytestream();
1458    self.file_nbytes_wr += buffer.len();
1459    match self.file.write_all(buffer.as_slice()) {
1460      Err(err) => error!("Writing to file to path {} failed! {}", self.file_path, err),
1461      Ok(_)    => ()
1462    }
1463    self.n_packets += 1;
1464    let mut newfile = false;
1465    if self.pkts_per_file != 0 {
1466      if self.n_packets == self.pkts_per_file {
1467        newfile = true;
1468        self.n_packets = 0;
1469      }
1470    } else if self.mbytes_per_file != 0 {
1471      // multiply by mebibyte
1472      if self.file_nbytes_wr >= self.mbytes_per_file * 1_048_576 {
1473        newfile = true;
1474        self.file_nbytes_wr = 0;
1475      }
1476    }
1477    if newfile {
1478        //let filename = self.file_prefix.clone() + "_" + &self.file_id.to_string() + ".tof.gaps";
1479        match self.file.sync_all() {
1480          Err(err) => {
1481            error!("Unable to sync file to disc! {err}");
1482          },
1483          Ok(_) => ()
1484        }
1485        self.file = self.get_file();
1486        self.file_id += 1;
1487        //let path  = Path::new(&filename);
1488        //println!("==> [TOFPACKETWRITER] Will start a new file {}", path.display());
1489        //self.file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1490        //self.n_packets = 0;
1491        //self.file_id += 1;
1492      }
1493  debug!("TofPacket written!");
1494  }
1495}
1496
1497impl Default for TofPacketWriter {
1498  fn default() -> TofPacketWriter {
1499    TofPacketWriter::new(String::from(""), FileType::Unknown)
1500  }
1501}
1502
1503/// Read RB binary (robin) files. These are also 
1504/// known as "blob" files
1505///
1506/// The robin reader consumes a file. 
1507///
1508///
1509//#[deprecated(since="0.10.0", note="There are no robin files anymore. RBs will write data with RBEvents wrapped in TofPackets!")]
1510pub struct RobinReader {
1511  pub streamer    : RBEventMemoryStreamer,
1512  pub filename    : String,
1513  file_reader     : Option<BufReader<File>>,
1514  pub board_id    : u8,
1515  // cache events
1516  cache           : HashMap<u32, RBEvent>, 
1517  // event id position of in stream
1518  index           : HashMap<u32, usize>,
1519  /// number of events we have successfully parsed from the file
1520  n_events_read   : usize,
1521  n_bytes_read    : usize,
1522  pub eof_reached : bool,
1523  pub extra_filenames : Vec<String>,
1524}
1525
1526impl RobinReader {
1527
1528  /// The "old" Robin files have a fixed 
1529  /// bytesize by design
1530  const EVENT_SIZE : usize = 18530;
1531
1532  pub fn new(filename : String) -> Self {
1533    let filename_c = filename.clone();
1534    let mut robin_reader = Self { 
1535      streamer        : RBEventMemoryStreamer::new(),
1536      filename        : String::from(""),
1537      file_reader     : None,
1538      board_id        : 0,
1539      cache           : HashMap::<u32,RBEvent>::new(),
1540      index           : HashMap::<u32,usize>::new(),
1541      eof_reached     : false,
1542      n_events_read   : 0,
1543      n_bytes_read    : 0,
1544      extra_filenames : Vec::<String>::new(),
1545    };
1546    robin_reader.open(filename_c);
1547    robin_reader.init();
1548    robin_reader
1549  }
1550 
1551  pub fn add_file(&mut self, filename : String) {
1552    self.extra_filenames.push(filename);
1553  }
1554
1555  fn init(&mut self) {
1556    //match self.search_start() {
1557    //  Err(err) => {
1558    //    error!("Can not find any header signature (typically 0xAAAA) in file! Err {err}");
1559    //    panic!("This is most likely a useless endeavour! Hence, I panic!");
1560    //  }
1561    //  Ok(start_pos) => {
1562    //    self.cursor = start_pos;
1563    //  }
1564    //}
1565    // get the first event to infer board id, then rewind
1566    if let Some(ev) = self.next() {
1567      self.board_id = ev.header.rb_id;  
1568      let rewind : i64 = RobinReader::EVENT_SIZE.try_into().expect("That needs to fit!");
1569      match self.file_reader.as_mut().unwrap().seek(SeekFrom::Current(rewind)) {
1570        Err(err) => {
1571          error!("Read first event, but can not rewind stream! Err {}", err);
1572          panic!("I don't understand, panicking...");
1573        }
1574        Ok(_) => {
1575          self.n_bytes_read  = 0;
1576          self.n_events_read = 0;
1577        }
1578      }
1579    } else {
1580      panic!("I can not find a single event in this file! Panicking!");
1581    }
1582    //self.generate_index();
1583  }
1584
1585  pub fn get_from_cache(&mut self, event_id : &u32) -> Option<RBEvent> {
1586    self.cache.remove(event_id)
1587  }
1588
1589  pub fn cache_all_events(&mut self) {
1590    self.rewind();
1591    while !self.eof_reached {
1592      match self.next() {
1593        None => {
1594          break;
1595        }
1596        Some(ev) => {
1597          //println!("{}", ev.header.event_id); 
1598          self.cache.insert(ev.header.event_id, ev);
1599        }
1600      }
1601    }
1602    info!("Cached {} events!", self.cache.len());
1603  }
1604
1605  /// Loop over the whole file and create a mapping event_id -> position
1606  ///
1607  /// This will allow to use the ::seek method
1608  ///
1609  pub fn generate_index(&mut self) {
1610    if self.n_events_read > 0 {
1611      error!("Can not generate index when events have already been read! Use ::rewind() first!");
1612      return;
1613    }
1614    self.n_events_read  = 0;
1615    let pb = ProgressBar::new_spinner();
1616    pb.set_style(ProgressStyle::default_spinner().template("{spinner:.green} Generating eventid index...").unwrap());
1617    let mut seen_before  = 0usize;
1618    let mut total_events = 0usize;
1619    while !self.eof_reached { 
1620      if let Some(ev) = self.next() {
1621        if self.index.contains_key(&ev.header.event_id) {
1622          debug!("We have seen this event id {} before!", ev.header.event_id);
1623          seen_before += 1;
1624        }
1625        self.index.insert(ev.header.event_id,self.n_events_read);
1626        self.n_events_read += 1;
1627        total_events += 1;
1628      }
1629      pb.tick();
1630    }
1631    if seen_before > 0 {
1632      error!("There have been duplicate event ids! In total, we discard {}/{}", seen_before, total_events);
1633    }
1634    info!("Generated index by reading {} events!", self.n_events_read);
1635    self.rewind();
1636    info!("Generated index for {} events!", self.index.len());
1637  }
1638
1639  pub fn get_cache_size(&self) -> usize {
1640    self.cache.len()
1641  }
1642
1643  pub fn print_index(&self) {
1644    let mut reverse_index = HashMap::<usize, u32>::new();
1645    for k in self.index.keys() {
1646      reverse_index.insert(self.index[k], *k);
1647    }
1648    debug!("Generated reversed index of size {}", reverse_index.len());
1649    //println!("Index [reversed]:");
1650    //println!("\t pos -> event id");
1651    //println!("{:?}", reverse_index);
1652    //println!("{:?}", self.index);
1653    let mut sorted_keys: Vec<&usize> = reverse_index.keys().collect();
1654    sorted_keys.sort();
1655    //let mut n = 0u32;
1656    //for k in sorted_keys {
1657      //println!("{k} -> {}", reverse_index[&k]);
1658      //n += 1;
1659      //if n == 8000 {break;}
1660    //}
1661  }
1662
1663  pub fn is_indexed(&self, event_id : &u32) -> bool {
1664    self.index.contains_key(event_id)
1665  }
1666
1667
1668  /// Get RBEvents from the file in ascending order of event ID
1669  ///
1670  /// In case the event_id jumps, this function is not suitable
1671  pub fn get_in_order(&mut self, event_id : &u32) -> Option<RBEvent> {
1672    if !self.is_indexed(event_id) {
1673      error!("Can not get event {} since it is not in the index!", event_id);
1674      return None;
1675    }
1676    let event_idx = self.index.remove(event_id).unwrap();
1677    if self.n_events_read > event_idx {
1678      error!("Can not get event {} since we have already read it. You can use ::rewind() and try again!", event_id);
1679      return None;
1680    } else {
1681      let delta = event_idx - self.n_events_read;
1682      let mut n_read = 0usize;
1683      //let mut ev = RBEvent::new();
1684      loop {
1685        match self.next() {
1686          Some(ev) => {
1687            n_read += 1;
1688            if n_read == delta {
1689              return Some(ev);
1690            }
1691          },
1692          None => {
1693            break;
1694          }
1695        }    
1696      }
1697    }
1698    None
1699  }
1700  
1701  /// Rewind the underlying file back to the beginning
1702  pub fn rewind(&mut self) {
1703    warn!("Rewinding {}", self.filename);
1704    let mut rewind : i64 = self.n_bytes_read.try_into().unwrap();
1705    rewind = -1*rewind;
1706    debug!("Attempting to rewind {rewind} bytes");
1707    match self.file_reader.as_mut().unwrap().seek(SeekFrom::Current(rewind)) {
1708      Err(err) => {
1709        error!("Can not rewind file buffer! Error {err}");
1710      }
1711      Ok(_) => {
1712        info!("File rewound by {rewind} bytes!");
1713        self.n_events_read = 0;
1714        self.n_bytes_read  = 0;
1715      }
1716    }
1717    self.eof_reached = false;
1718  }
1719
1720  pub fn open(&mut self, filename : String) {
1721    if self.filename != "" {
1722      warn!("Overiding previously set filename {}", self.filename);
1723    }
1724    let self_filename = filename.clone();
1725    self.filename     = self_filename;
1726    if filename != "" {
1727      let path = Path::new(&filename); 
1728      info!("Reading from {}", &self.filename);
1729      let file = OpenOptions::new().create(false).append(true).read(true).open(path).expect("Unable to open file {filename}");
1730      self.file_reader = Some(BufReader::new(file));
1731    }
1732  }
1733
1734  pub fn precache_events(&mut self, n_events : usize) {
1735    self.cache.clear();
1736    let mut n_ev = 0usize;
1737    if self.eof_reached {
1738      return;
1739    }
1740    for _ in 0..n_events {
1741      let event = self.next();
1742      n_ev += 1;
1743      if let Some(ev) = event {
1744        self.cache.insert(ev.header.event_id, ev);
1745      } else {
1746        error!("Can not cache {}th event!", n_ev);
1747        self.eof_reached = true;
1748        break
1749      }
1750    }
1751  }
1752
1753  pub fn max_cached_event_id(&self) -> Option<u32> {
1754    let keys : Vec<u32> = self.cache.keys().cloned().collect();
1755    keys.iter().max().copied()
1756  }
1757  
1758  pub fn min_cached_event_id(&self) -> Option<u32> {
1759    let keys : Vec<u32> = self.cache.keys().cloned().collect();
1760    keys.iter().min().copied()
1761  }
1762
1763  pub fn is_cached(&self, event_id : &u32) -> bool {
1764    let keys : Vec<&u32> = self.cache.keys().collect();
1765    keys.contains(&event_id)
1766  }
1767
1768  pub fn get_event_by_id(&mut self, event_id : &u32) -> Option<RBEvent> {
1769    self.cache.remove(event_id)
1770  }
1771
1772  pub fn is_expired(&self) -> bool {
1773    self.eof_reached && self.cache.len() == 0
1774  }
1775
1776  pub fn event_ids_in_cache(&self) -> Vec<u32> {
1777    trace!("We have {} elements in the cache!", self.cache.len());
1778    let mut keys : Vec<u32> = self.cache.keys().cloned().collect();
1779    trace!("We have {} elements in the cache!", keys.len());
1780    keys.sort();
1781    keys
1782  }
1783
1784  pub fn get_events(&self) -> Vec<RBEvent> {
1785    self.cache.values().cloned().collect()
1786  }
1787
1788  pub fn count_packets(&self) -> u64 {
1789    let metadata  = self.file_reader.as_ref().unwrap().get_ref().metadata().unwrap();
1790    let file_size = metadata.len();
1791    let n_packets =  file_size/RobinReader::EVENT_SIZE as u64; 
1792    info!("The file {} contains likely ~{} event packets!", self.filename, n_packets);
1793    n_packets
1794  }
1795}
1796
1797impl Default for RobinReader {
1798
1799  fn default() -> Self {
1800    RobinReader::new(String::from(""))
1801  }
1802}
1803
1804impl Iterator for RobinReader {
1805  type Item = RBEvent;
1806
1807  fn next(&mut self) -> Option<Self::Item> {
1808    match self.streamer.next() {
1809      Some(event) => {
1810        return Some(event);
1811      },
1812      None => {
1813        // check if we can feed more data to the 
1814        // streamer
1815        const CHUNKSIZE : usize  = 200000;
1816        let mut buffer      = [0u8;CHUNKSIZE];
1817        match self.file_reader.as_mut().unwrap().read(&mut buffer) {
1818          Err(err) => {
1819            error!("Unable to read any bytes from file {}, {}", self.filename, err);
1820            return None;
1821          },
1822          Ok(_nbytes) => {
1823            self.n_bytes_read += _nbytes;
1824            if _nbytes == 0 {
1825              self.eof_reached = true;
1826              if self.extra_filenames.len() > 0 {
1827                let next_filename = self.extra_filenames.pop().unwrap();
1828                self.open(next_filename);
1829                self.eof_reached = false;
1830                match self.file_reader.as_mut().unwrap().read(&mut buffer) {
1831                  Err(err) => {
1832                    error!("Failed reading bytes from buffer! {}", err);
1833                  },
1834                  Ok(_nbytes2) => {}
1835                }
1836              }
1837              return None;
1838            }
1839            self.streamer.add(&buffer.to_vec(), _nbytes);
1840            match self.streamer.next() {
1841              None => {
1842                //println!("none..");
1843                return None;
1844              },
1845              Some(event) => {
1846                return Some(event);
1847                //println!("{}", event);
1848              } 
1849            }
1850          }
1851        }
1852      }
1853    }
1854  }
1855}
1856
1857#[test]
1858fn crc32() {
1859  let crc32_sum = Crc::<u32>::new(&ALGO);
1860  let mut dig   = crc32_sum.digest();
1861  dig.update(&0u16.to_le_bytes());
1862  let result = dig.finalize();
1863  //assert_eq!(stream.len(), RBEventHeader::SIZE);
1864  assert_eq!(1104745215,result);
1865}
1866
1867