1const REVERSE_WORDS : bool = true;
22const ALGO : crc::Algorithm<u32> = crc::Algorithm {
23 width : 32u8,
24 init : 0xFFFFFFFF,
25 poly : 0x04C11DB7,
27 refin : true,
28 refout : true,
29 xorout : 0xFFFFFFFF,
30 check : 0,
31 residue : 0,
32 };
33
34use std::fmt;
35
36use crc::Crc;
37
38use std::path::Path;
39use std::fs::{
40 self,
41 File,
42 OpenOptions
43};
44
45use std::io;
46use std::io::{
47 ErrorKind,
48 BufReader,
49 Seek,
50 SeekFrom,
51 Read,
52 Write,
53};
54
55use std::collections::{
56 VecDeque,
57 HashMap
58};
59
60
61
62
63extern crate chrono;
64use chrono::{DateTime, Utc};
65
66use indicatif::{ProgressBar, ProgressStyle};
67use crossbeam_channel::Sender;
68use regex::Regex;
69
70use crate::events::{
71 RBEvent,
72 RBEventHeader,
73 EventStatus,
74};
75use crate::packets::{
76 TofPacket,
77 PacketType,
78};
79use crate::constants::{
80 NWORDS,
81 HUMAN_TIMESTAMP_FORMAT
82};
83use crate::serialization::{
84 Serialization,
85 Packable,
86 u8_to_u16_14bit,
88 u8_to_u16_err_check,
89 search_for_u16,
90 parse_u8,
91 parse_u16,
92 parse_u32,
93};
94
95use crate::events::TofEvent;
96
97#[derive(Debug, Clone)]
99pub enum FileType {
100 Unknown,
101 CalibrationFile(u8),
103 RunFile(u32),
105 SummaryFile(String),
108}
109
110pub fn get_utc_timestamp() -> String {
112 let now: DateTime<Utc> = Utc::now();
113 let timestamp_str = now.format(HUMAN_TIMESTAMP_FORMAT).to_string();
115 timestamp_str
116}
117
118pub fn get_utc_date() -> String {
120 let now: DateTime<Utc> = Utc::now();
121 let timestamp_str = now.format("%y%m%d").to_string();
123 timestamp_str
124}
125
126pub fn get_califilename(rb_id : u8, latest : bool) -> String {
137 let ts = get_utc_timestamp();
138 if latest {
139 format!("RB{rb_id:02}_latest.cali.tof.gaps")
140 } else {
141 format!("RB{rb_id:02}_{ts}.cali.tof.gaps")
142 }
143}
144
145pub fn get_runfilename(run : u32, subrun : u64, rb_id : Option<u8>) -> String {
156 let ts = get_utc_timestamp();
157 let fname : String;
158 match rb_id {
159 None => {
160 fname = format!("Run{run}_{subrun}.{ts}.tof.gaps");
161 }
162 Some(rbid) => {
163 fname = format!("Run{run}_{subrun}.{ts}.RB{rbid:02}.tof.gaps");
164 }
165 }
166 fname
167}
168
169pub fn read_file(filename: &Path) -> io::Result<Vec<u8>> {
178 info!("Reading file {}", filename.display());
179 let mut f = File::open(&filename)?;
180 let metadata = fs::metadata(&filename)?;
181 let mut buffer = vec![0; metadata.len() as usize];
182 info!("Read {} bytes from {}", buffer.len(), filename.display());
183 f.read_exact(&mut buffer)?;
185 Ok(buffer)
186}
187
188pub fn summarize_toffile(fname : String) {
191 let mut reader = TofPacketReader::new(fname.clone());
192 let outfile = fname.replace(".tof.", ".tofsum.");
193 let outfile_type = FileType::SummaryFile(fname.clone());
194 let mut writer = TofPacketWriter::new(outfile,outfile_type);
195 let mut n_errors = 0u32;
196 let npack : usize = reader.get_packet_index().unwrap_or(HashMap::<PacketType,usize>::new()).values().cloned().collect::<Vec<usize>>().iter().sum();
197 let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
198 let bar_style = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
199 let bar_label = String::from("Reading events");
200 let bar = ProgressBar::new(npack as u64);
201 bar.set_position(0);
202 bar.set_message (bar_label);
203 bar.set_prefix ("\u{2728}");
204 bar.set_style (bar_style);
205 let mut npack = 0u64;
206 for pack in reader {
207 npack += 1;
208 bar.set_position(npack);
209 match pack.packet_type {
210 PacketType::TofEvent => {
211 match pack.unpack::<TofEvent>() {
212 Err(err) => {
213 debug!("Can't unpack TofEvent! {err}");
214 n_errors += 1;
215 }
216 Ok(te) => {
217 let ts = te.get_summary();
218 let tp = ts.pack();
219 writer.add_tof_packet(&tp);
220 }
221 }
222 }
223 _ => {
224 writer.add_tof_packet(&pack);
225 }
226 }
227 }
228 bar.finish_with_message("Done!");
229 if n_errors > 0 {
230 error!("Unpacking TofEvents from {} failed {} times!", n_errors, fname);
231 }
232}
233
234pub struct RBEventMemoryStreamer {
243 pub stream : Vec<u8>,
245 pub check_channel_errors : bool,
248 pub mask : Vec<u8>,
250
251 pos : usize,
253 pos_at_head : bool,
256 pub tp_sender : Option<Sender<TofPacket>>,
259 n_events_ext : usize,
262 pub is_depleted : bool,
263 pub calc_crc32 : bool,
266 crc32_sum : Crc::<u32>,
268 pub request_mode : bool,
269 pub request_cache : VecDeque<(u32,u8)>,
270 pub event_map : HashMap<u32,(usize,usize)>,
274 pub first_evid : u32,
275 pub last_evid : u32,
276 pub last_event_complete : bool,
277 pub last_event_pos : usize,
278 pub is_behind_by : usize,
281 pub is_ahead_by : usize,
284}
285
286impl RBEventMemoryStreamer {
287
288 pub fn new() -> Self {
289 Self {
290 stream : Vec::<u8>::new(),
291 check_channel_errors : false,
292 mask : Vec::<u8>::new(),
293 pos : 0,
294 pos_at_head : false,
295 tp_sender : None,
296 n_events_ext : 0,
297 is_depleted : false,
298 calc_crc32 : false,
299 crc32_sum : Crc::<u32>::new(&ALGO),
300 request_mode : false,
301 request_cache : VecDeque::<(u32,u8)>::new(),
302 event_map : HashMap::<u32,(usize,usize)>::new(),
303 first_evid : 0,
304 last_evid : 0,
305 last_event_complete : false,
306 last_event_pos : 0,
307 is_behind_by : 0,
308 is_ahead_by : 0,
309 }
310 }
311
312 pub fn create_event_index(&mut self) { let begin_pos = self.pos;
317 let mut event_id = 0u32;
318 loop {
321 let mut result = (0usize, 0usize);
322 if !self.seek_next_header(0xaaaa) {
323 debug!("Could not find another header...");
324 self.pos = begin_pos;
325 self.last_evid = event_id;
326 if result.0 + result.1 > self.stream.len() - 1 {
327 self.last_event_complete = false;
328 } else {
329 self.last_event_complete = true;
330 }
331 info!("Indexed {} events from {} to {}", self.event_map.len(), self.first_evid, self.last_evid);
332 return;
333 }
334 result.0 = self.pos;
335 self.pos += 4;let packet_len = parse_u16(&self.stream, &mut self.pos) as usize * 2;
337 if self.stream.len() < self.pos -6 + packet_len {
338 self.pos = begin_pos;
340 self.last_evid = event_id;
341 info!("Indexed {} events from {} to {}", self.event_map.len(), self.first_evid, self.last_evid);
342 return;
343 }
345 result.1 = packet_len;
346 if packet_len < 6 {
347 self.pos = begin_pos;
348 self.last_evid = event_id;
349 info!("Indexed {} events from {} to {}", self.event_map.len(), self.first_evid, self.last_evid);
350 return;
351 }
353 self.pos -= 6;
355 self.pos += 22;
357 let event_id0 = parse_u16(&self.stream, &mut self.pos);
358 let event_id1 = parse_u16(&self.stream, &mut self.pos);
359 if REVERSE_WORDS {
360 event_id = u32::from(event_id0) << 16 | u32::from(event_id1);
361 } else {
362 event_id = u32::from(event_id1) << 16 | u32::from(event_id0);
363 }
364 if self.first_evid == 0 {
365 self.first_evid = event_id;
366 }
367 self.pos += packet_len - 26;
368 self.event_map.insert(event_id,result);
369 }
370 }
371
372 pub fn print_event_map(&self) {
373 for k in self.event_map.keys() {
374 let pos = self.event_map[&k];
375 println!("-- --> {} -> {},{}", k, pos.0, pos.1);
376 }
377 }
378
379 pub fn init_sender(&mut self, tp_sender : Sender<TofPacket>) {
381 self.tp_sender = Some(tp_sender);
382 }
383
384 pub fn send_all(&mut self) {
386 loop {
387 match self.next() {
388 None => {
389 info!("Streamer drained!");
390 break;
391 },
392 Some(ev) => {
393 let tp = TofPacket::from(&ev);
394 match self.tp_sender.as_ref().expect("Sender needs to be initialized first!").send(tp) {
395 Ok(_) => (),
396 Err(err) => {
397 error!("Unable to send TofPacket! {err}");
398 }
399 }
400 }
401 }
402 }
403 }
404
405
406 pub fn add(&mut self, stream : &Vec<u8>, nbytes : usize) {
410 self.is_depleted = false;
414 self.stream.extend_from_slice(&stream[0..nbytes]);
415 }
418
419 pub fn consume(&mut self, stream : &mut Vec<u8>) {
422 self.is_depleted = false;
423 self.stream.append(stream);
427 }
430
431 pub fn seek_next_header(&mut self, header : u16) -> bool{
441 match search_for_u16(header, &self.stream, self.pos) {
442 Err(_) => {
443 return false;
444 }
445 Ok(head_pos) => {
446 self.pos = head_pos;
447 self.pos_at_head = true;
448 return true;
449 }
450 }
451 }
452
453 pub fn next_tofpacket(&mut self) -> Option<TofPacket> {
454 let begin_pos = self.pos; let foot_pos : usize;
457 let head_pos : usize;
458 if self.stream.len() == 0 {
459 trace!("Stream empty!");
460 return None;
461 }
462 if !self.pos_at_head {
463 if !self.seek_next_header(0xaaaa) {
464 debug!("Could not find another header...");
465 self.pos = begin_pos;
466 return None;
467 }
468 }
469 head_pos = self.pos;
470 if !self.seek_next_header(0x5555) {
473 debug!("Could not find another footer...");
474 self.pos = begin_pos;
475 return None;
476 }
477 foot_pos = self.pos;
479 self.n_events_ext += 1;
480 let mut tp = TofPacket::new();
481 tp.packet_type = PacketType::RBEventMemoryView;
482 tp.payload.extend_from_slice(&self.stream[head_pos..foot_pos+2]);
484 self.pos_at_head = false;
487 if self.n_events_ext % 200 == 0 {
490 self.stream.drain(0..foot_pos+3);
491 self.pos = 0;
492 }
493 Some(tp)
494 }
495
496
497 pub fn get_event_at_pos_unchecked(&mut self,
499 replace_channel_mask : Option<u16>)
500 -> Option<RBEvent> {
501 let mut header = RBEventHeader::new();
502 let mut event = RBEvent::new();
503 let mut event_status = EventStatus::Unknown;
504 if self.calc_crc32 && self.check_channel_errors {
506 event_status = EventStatus::Perfect;
507 }
508 if !self.calc_crc32 && !self.check_channel_errors {
509 event_status = EventStatus::GoodNoCRCOrErrBitCheck;
510 }
511 if !self.calc_crc32 && self.check_channel_errors {
512 event_status = EventStatus::GoodNoCRCCheck;
513 }
514 if self.calc_crc32 && !self.check_channel_errors {
515 event_status = EventStatus::GoodNoErrBitCheck;
516 }
517 let head = parse_u16(&self.stream, &mut self.pos);
520 if head != RBEventHeader::HEAD {
521 error!("Event does not start with {}", RBEventHeader::HEAD);
522 return None;
523 }
524
525 let status = parse_u16(&self.stream, &mut self.pos);
526 header.parse_status(status);
529 let packet_len = parse_u16(&self.stream, &mut self.pos) as usize * 2;
530 let nwords = parse_u16(&self.stream, &mut self.pos) as usize + 1; if self.pos - 8 + packet_len > self.stream.len() { error!("Stream is too short! Stream len is {}, packet len is {}. We are at pos {}", self.stream.len(), packet_len, self.pos);
533 self.is_depleted = true;
534 self.pos -= 8;
535 return None;
536 }
537 self.pos += 10;
540 self.pos += 1; header.rb_id = parse_u8(&self.stream, &mut self.pos);
542 header.set_channel_mask(parse_u16(&self.stream, &mut self.pos));
543 match replace_channel_mask {
544 None => (),
545 Some(mask) => {
546 println!("==> Replacing ch mask {} with {}", header.get_channel_mask(), mask);
547 header.set_channel_mask(mask);
548 }
549 }
550 let event_id0 = parse_u16(&self.stream, &mut self.pos);
551 let event_id1 = parse_u16(&self.stream, &mut self.pos);
552 let event_id : u32;
553 if REVERSE_WORDS {
554 event_id = u32::from(event_id0) << 16 | u32::from(event_id1);
555 } else {
556 event_id = u32::from(event_id1) << 16 | u32::from(event_id0);
557 }
558
559 header.event_id = event_id;
560 self.pos += 4;
564 let timestamp0 = parse_u16(&self.stream, &mut self.pos);
565 let timestamp1 = parse_u16(&self.stream, &mut self.pos);
566 let timestamp2 = parse_u16(&self.stream, &mut self.pos);
567 let timestamp16 : u16;
569 let timestamp32 : u32;
570 if REVERSE_WORDS {
571 timestamp16 = timestamp0;
572 timestamp32 = u32::from(timestamp1) << 16 | u32::from(timestamp2);
573 } else {
574 timestamp16 = timestamp2;
575 timestamp32 = u32::from(timestamp0) << 16 | u32::from(timestamp1);
576 }
577 header.timestamp16 = timestamp16;
578 header.timestamp32 = timestamp32;
579 if header.drs_lost_trigger() {
583 event.status = EventStatus::IncompleteReadout;
584 event.header = header;
585 return Some(event);
587 }
588 let mut any_cell_error = false;
593 let mut header_channels = header.get_channels().clone();
594 for k in &self.mask {
595 header_channels.retain(|x| x != k);
596 }
597
598 for ch in header_channels.iter() {
599 let ch_id = parse_u16(&self.stream, &mut self.pos);
600 if ch_id != *ch as u16 {
601 let search_pos = self.pos;
603 match search_for_u16(TofPacket::HEAD, &self.stream, search_pos) {
604 Err(_) => (),
605 Ok(result) => {
606 info!("The channel data is corrupt, but we found a header at {} for remaining stream len {}", result, self.stream.len());
607 }
608 }
609 let mut stream_view = Vec::<u8>::new();
610 let foo_pos = self.pos;
611 for k in foo_pos -3..foo_pos + 3 {
612 stream_view.push(self.stream[k]);
613 }
614 error!("We got {ch_id} but expected {ch} for event {}. The parsed ch id is not in the channel mask! We will fill this channel with u16::MAX .... Stream view +- 3 around the ch id {:?}", header.event_id, stream_view);
615 event_status = EventStatus::ChannelIDWrong;
616 event.adc[*ch as usize] = vec![u16::MAX;NWORDS];
618 self.pos += 2*nwords + 4;
619 continue;
620 } else {
621 let mut dig = self.crc32_sum.digest();
628 if self.calc_crc32 {
629 let mut this_ch_adc = Vec::<u16>::with_capacity(nwords);
630 for _ in 0..nwords {
631 let this_field = parse_u16(&self.stream, &mut self.pos);
632 dig.update(&this_field.to_le_bytes());
633 if self.check_channel_errors {
634 if ((0x8000 & this_field) >> 15) == 0x1 {
635 error!("Ch error bit set for ch {}!", ch);
636 event_status = EventStatus::ChnSyncErrors;
637 }
638 if ((0x4000 & this_field) >> 14) == 0x1 {
639 error!("Cell error bit set for ch {}!", ch);
640 event_status = EventStatus::CellSyncErrors;
641 any_cell_error = true;
642 }
643 }
644 this_ch_adc.push(0x3fff & this_field)
645 }
646 event.adc[*ch as usize] = this_ch_adc;
647 } else {
648 if self.check_channel_errors {
649 let adc_w_errs = u8_to_u16_err_check(&self.stream[self.pos..self.pos + 2*nwords]);
650 if adc_w_errs.1 {
651 error!("Ch error bit set for ch {}!", ch);
652 event_status = EventStatus::ChnSyncErrors;
653 any_cell_error = true;
654 } else if adc_w_errs.2 {
655 error!("Cell error bit set for ch {}!", ch);
656 event_status = EventStatus::CellSyncErrors;
657 }
658 event.adc[*ch as usize] = adc_w_errs.0;
659 } else {
660 event.adc[*ch as usize] = u8_to_u16_14bit(&self.stream[self.pos..self.pos + 2*nwords]);
661 }
662 self.pos += 2*nwords;
663 }
664 let crc320 = parse_u16(&self.stream, &mut self.pos);
667 let crc321 = parse_u16(&self.stream, &mut self.pos);
668 if self.calc_crc32 {
670 let crc32 : u32;
671 if REVERSE_WORDS {
672 crc32 = u32::from(crc320) << 16 | u32::from(crc321);
673 } else {
674 crc32 = u32::from(crc321) << 16 | u32::from(crc320);
675 }
676 let checksum = dig.finalize();
677 if checksum != crc32 {
678 event_status = EventStatus::CRC32Wrong;
679 }
680 println!("== ==> Checksum {}, channel checksum {}!", checksum, crc32);
681 }
682 }
683 }
684 if any_cell_error {
685 if event_status == EventStatus::ChnSyncErrors {
686 event_status = EventStatus::CellAndChnSyncErrors;
687 }
688 }
689
690 if !header.drs_lost_trigger() {
691 header.stop_cell = parse_u16(&self.stream, &mut self.pos);
692 }
693 self.pos += 4;
697
698 let tail = parse_u16(&self.stream, &mut self.pos);
717 if tail != 0x5555 {
718 error!("Tail signature {} for event {} is invalid!", tail, header.event_id);
719 event_status = EventStatus::TailWrong;
720 }
721 self.pos_at_head = false;
723 event.header = header;
724 event.status = event_status;
725 if event_status == EventStatus::TailWrong {
726 info!("{}", event);
727 }
728 Some(event)
729 }
730
731 pub fn get_event_at_id(&mut self, event_id : u32, replace_channel_mask : Option<u16>) -> Option<RBEvent> {
732 let begin_pos = self.pos; let pos = self.event_map.remove(&event_id)?;
744 if self.stream.len() < pos.0 + pos.1 {
745 trace!("Stream is too short!");
746 self.is_depleted = true;
747 self.pos = begin_pos;
748 return None;
749 }
750 self.pos = pos.0;
751 self.get_event_at_pos_unchecked(replace_channel_mask)
752 }
753}
754
755impl Iterator for RBEventMemoryStreamer {
756 type Item = RBEvent;
757
758 fn next(&mut self) -> Option<Self::Item> {
759 let begin_pos : usize; self.pos_at_head = false;
766 begin_pos = self.pos; if self.stream.len() == 0 {
769 trace!("Stream empty!");
770 self.is_depleted = true;
771 self.pos = 0;
772 return None;
773 }
774 if !self.pos_at_head {
775 if !self.seek_next_header(0xaaaa) {
776 debug!("Could not find another header...");
777 self.pos = begin_pos;
778 self.is_depleted = true;
779 return None;
780 }
781 }
782
783 let event = self.get_event_at_pos_unchecked(None)?;
784 self.n_events_ext += 1;
785 self.stream.drain(0..self.pos);
786 self.pos = 0;
787 self.pos_at_head = false;
788 Some(event)
789 }
790}
791
792
793pub trait PacketReader {
796 const HEADER0 : u8 = 0;
798 const HEADER1 : u8 = 0;
799
800 fn set_cursor(&mut self, pos : usize);
802
803 fn rewind(&mut self) -> io::Result<()> {
806 self.set_cursor(0);
808 Ok(())
809 }
810}
811
812
813
814
815#[derive(Debug)]
821pub struct TofPacketReader {
822 pub filenames : Vec<String>,
824 file_reader : BufReader<File>,
825 cursor : usize,
827 pub filter : PacketType,
829 n_packs_read : usize,
831 n_packs_skipped : usize,
833 pub skip_ahead : usize,
835 pub stop_after : usize,
837 pub file_index : usize,
839}
840
841impl fmt::Display for TofPacketReader {
842 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
843 let mut range_repr = String::from("");
844 if self.skip_ahead > 0 {
845 range_repr += &(format!("({}", self.skip_ahead));
846 } else {
847 range_repr += "(";
848 }
849 if self.stop_after > 0 {
850 range_repr += &(format!("..{})", self.stop_after));
851 } else {
852 range_repr += "..)";
853 }
854 let repr = format!("<TofPacketReader :read {} packets, filter {}, range {}\n files {:?}>", self.n_packs_read, self.filter, range_repr, self.filenames);
855 write!(f, "{}", repr)
856 }
857}
858
859impl TofPacketReader {
860
861 pub fn get_current_filename(&self) -> Option<String> {
862 if self.filenames.len() <= self.file_index {
864 return None;
865 }
866 Some(self.filenames[self.file_index].clone())
867 }
868
869 fn list_path_contents_sorted(input: &str) -> Result<Vec<String>, io::Error> {
870 let path = Path::new(input);
871 match fs::metadata(path) {
872 Ok(metadata) => {
873 if metadata.is_file() {
874 let fname = String::from(input);
876 return Ok(vec![fname]);
877 }
882 if metadata.is_dir() {
883 let re = Regex::new(r"Run\d+_\d+\.(\d{6})_(\d{6})UTC\.tof\.gaps$").unwrap();
884
885 let mut entries: Vec<(u32, u32, String)> = fs::read_dir(path)?
886 .filter_map(Result::ok) .filter_map(|entry| {
888 let filename = format!("{}/{}", path.display(), entry.file_name().into_string().ok()?);
890 re.captures(&filename.clone()).map(|caps| {
891 let date = caps.get(1)?.as_str().parse::<u32>().ok()?;
892 let time = caps.get(2)?.as_str().parse::<u32>().ok()?;
893 Some((date, time, filename))
894 })?
895 })
896 .collect();
897
898 entries.sort_by(|a, b| (a.0, a.1).cmp(&(b.0, b.1)));
900 return Ok(entries.into_iter().map(|(_, _, name)| name).collect());
902 }
903 Err(io::Error::new(ErrorKind::Other, "Path exists but is neither a file nor a directory"))
904 }
905 Err(e) => Err(e),
906 }
907 }
908
909 pub fn new(filename_or_directory : String) -> TofPacketReader {
912 let firstfile : String;
913 match TofPacketReader::list_path_contents_sorted(&filename_or_directory) {
914 Err(err) => {
915 error!("{} does not seem to be either a valid directory or an existing file! {err}", filename_or_directory);
916 panic!("Unable to open files!");
917 }
918 Ok(files) => {
919 firstfile = files[0].clone();
920 match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
921 Err(err) => {
922 error!("Unable to open file {firstfile}! {err}");
923 panic!("Unable to create reader from {filename_or_directory}!");
924 }
925 Ok(file) => {
926 let packet_reader = Self {
927 filenames : files,
928 file_reader : BufReader::new(file),
929 cursor : 0,
930 filter : PacketType::Unknown,
931 n_packs_read : 0,
932 skip_ahead : 0,
933 stop_after : 0,
934 n_packs_skipped : 0,
935 file_index : 0,
936 };
937 packet_reader
938 }
939 }
940 }
941 }
942 }
943
944 pub fn first_packet(&mut self) -> Option<TofPacket> {
946 match self.rewind() {
947 Err(err) => {
948 error!("Error when rewinding files! {err}");
949 }
950 Ok(_) => ()
951 }
952 let pack = self.get_next_packet();
953 match self.rewind() {
954 Err(err) => {
955 error!("Error when rewinding files! {err}");
956 }
957 Ok(_) => ()
958 }
959 return pack;
960 }
961
962 pub fn last_packet(&mut self) -> Option<TofPacket> {
964 self.file_index = self.filenames.len() - 1;
965 let lastfilename = self.filenames[self.file_index].clone();
966 let lastfile = OpenOptions::new().create(false).append(false).read(true).open(lastfilename).expect("Unable to open file {nextfilename}");
967 self.file_reader = BufReader::new(lastfile);
968 self.cursor = 0;
969 let mut tp = TofPacket::new();
970 let mut idx = 0;
971 loop {
972 match self.get_next_packet() {
973 None => {
974 match self.rewind() {
975 Err(err) => {
976 error!("Error when rewinding files! {err}");
977 }
978 Ok(_) => ()
979 }
980 if idx == 0 {
981 return None;
982 } else {
983 return Some(tp);
984 }
985 }
986 Some(pack) => {
987 idx += 1;
988 tp = pack;
989 continue;
990 }
991 }
992 }
993 }
994
995
996 #[deprecated(since="0.10.0", note="Use public attribute instead!")]
997 pub fn set_filter(&mut self, ptype : PacketType) {
998 self.filter = ptype;
999 }
1000
1001 pub fn get_packet_index(&mut self) -> io::Result<HashMap<PacketType, usize>> {
1005 let mut index = HashMap::<PacketType, usize>::new();
1006 let mut buffer = [0];
1007 loop {
1008 match self.file_reader.read_exact(&mut buffer) {
1009 Err(err) => {
1010 debug!("Unable to read from file! {err}");
1011 break;
1013 }
1014 Ok(_) => {
1015 self.cursor += 1;
1016 }
1017 }
1018 if buffer[0] != 0xAA {
1019 continue;
1020 } else {
1021 match self.file_reader.read_exact(&mut buffer) {
1022 Err(err) => {
1023 debug!("Unable to read from file! {err}");
1024 break;
1026 }
1027 Ok(_) => {
1028 self.cursor += 1;
1029 }
1030 }
1031
1032 if buffer[0] != 0xAA {
1033 continue;
1034 } else {
1035 match self.file_reader.read_exact(&mut buffer) {
1037 Err(err) => {
1038 debug!("Unable to read from file! {err}");
1039 break;
1040 }
1041 Ok(_) => {
1042 self.cursor += 1;
1043 }
1044 }
1045 let ptype = PacketType::from(buffer[0]);
1046 let mut buffer_psize = [0,0,0,0];
1048 match self.file_reader.read_exact(&mut buffer_psize) {
1049 Err(err) => {
1050 error!("Unable to read from file! {err}");
1051 break;
1052 }
1053 Ok(_) => {
1054 self.cursor += 4;
1055 }
1056 }
1057 let vec_data = buffer_psize.to_vec();
1058 let size = parse_u32(&vec_data, &mut 0);
1059 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
1060 Err(err) => {
1061 debug!("Unable to read more data! {err}");
1062 break;
1063 }
1064 Ok(_) => {
1065 self.cursor += size as usize;
1066 if index.contains_key(&ptype) {
1070 *index.get_mut(&ptype).unwrap() += 1;
1071 } else {
1072 index.insert(ptype, 1usize);
1073 }
1074 }
1075 }
1076 }
1077 } } self.rewind()?;
1080 Ok(index)
1081 } pub fn rewind(&mut self) -> io::Result<()> {
1084 let firstfile = &self.filenames[0];
1085 let file = OpenOptions::new().create(false).append(false).read(true).open(&firstfile)?;
1086 self.file_reader = BufReader::new(file);
1087 self.cursor = 0;
1088 self.file_index = 0;
1089 Ok(())
1090 }
1091
1092 pub fn get_next_packet(&mut self) -> Option<TofPacket> {
1098 let mut buffer = [0];
1101 loop {
1102 match self.file_reader.read_exact(&mut buffer) {
1103 Err(err) => {
1104 debug!("Unable to read from file! {err}");
1105 if self.file_index == self.filenames.len() -1 {
1106 return None;
1107 } else {
1108 self.file_index += 1;
1109 let nextfilename = self.filenames[self.file_index].clone();
1110 let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1111 self.file_reader = BufReader::new(nextfile);
1112 self.cursor = 0;
1113 return self.get_next_packet();
1114 }
1115 }
1116 Ok(_) => {
1117 self.cursor += 1;
1118 }
1119 }
1120 if buffer[0] != 0xAA {
1121 continue;
1122 } else {
1123 match self.file_reader.read_exact(&mut buffer) {
1124 Err(err) => {
1125 debug!("Unable to read from file! {err}");
1126 if self.file_index == self.filenames.len() -1 {
1127 return None;
1128 } else {
1129 self.file_index += 1;
1130 let nextfilename = self.filenames[self.file_index].clone();
1131 let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1132 self.file_reader = BufReader::new(nextfile);
1133 self.cursor = 0;
1134 return self.get_next_packet();
1135 }
1136 }
1137 Ok(_) => {
1138 self.cursor += 1;
1139 }
1140 }
1141
1142 if buffer[0] != 0xAA {
1143 continue;
1144 } else {
1145 match self.file_reader.read_exact(&mut buffer) {
1147 Err(err) => {
1148 debug!("Unable to read from file! {err}");
1149 if self.file_index == self.filenames.len() -1 {
1150 return None;
1151 } else {
1152 self.file_index += 1;
1153 let nextfilename = self.filenames[self.file_index].clone();
1154 let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1155 self.cursor = 0;
1156 self.file_reader = BufReader::new(nextfile);
1157 return self.get_next_packet();
1158 }
1159 }
1160 Ok(_) => {
1161 self.cursor += 1;
1162 }
1163 }
1164 let ptype = PacketType::from(buffer[0]);
1165 let mut buffer_psize = [0,0,0,0];
1167 match self.file_reader.read_exact(&mut buffer_psize) {
1168 Err(err) => {
1169 debug!("Unable to read from file! {err}");
1170 if self.file_index == self.filenames.len() -1 {
1171 return None;
1172 } else {
1173 self.file_index += 1;
1174 let nextfilename = self.filenames[self.file_index].clone();
1175 let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1176 self.cursor = 0;
1177 self.file_reader = BufReader::new(nextfile);
1178 return self.get_next_packet();
1179 }
1180 }
1181 Ok(_) => {
1182 self.cursor += 4;
1183 }
1184 }
1185 let vec_data = buffer_psize.to_vec();
1186 let size = parse_u32(&vec_data, &mut 0);
1187 if ptype != self.filter && self.filter != PacketType::Unknown {
1188 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
1189 Err(err) => {
1190 debug!("Unable to read more data! {err}");
1191 if self.file_index == self.filenames.len() -1 {
1192 return None;
1193 } else {
1194 self.file_index += 1;
1195 let nextfilename = self.filenames[self.file_index].clone();
1196 let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1197 self.cursor = 0;
1198 self.file_reader = BufReader::new(nextfile);
1199 return self.get_next_packet();
1200 }
1201 }
1202 Ok(_) => {
1203 self.cursor += size as usize;
1204 }
1205 }
1206 continue; }
1208 if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
1211 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
1213 Err(err) => {
1214 debug!("Unable to read more data! {err}");
1215 if self.file_index == self.filenames.len() -1 {
1216 return None;
1217 } else {
1218 self.file_index += 1;
1219 let nextfilename = self.filenames[self.file_index].clone();
1220 let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1221 self.cursor = 0;
1222 self.file_reader = BufReader::new(nextfile);
1223 return self.get_next_packet();
1224 }
1225 }
1226 Ok(_) => {
1227 self.n_packs_skipped += 1;
1228 self.cursor += size as usize;
1229 }
1230 }
1231 continue; }
1233 if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
1234 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
1236 Err(err) => {
1237 debug!("Unable to read more data! {err}");
1238 if self.file_index == self.filenames.len() -1 {
1239 return None;
1240 } else {
1241 self.file_index += 1;
1242 let nextfilename = self.filenames[self.file_index].clone();
1243 let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1244 self.cursor = 0;
1245 self.file_reader = BufReader::new(nextfile);
1246 return self.get_next_packet();
1247 }
1248 }
1249 Ok(_) => {
1250 self.cursor += size as usize;
1251 }
1252 }
1253 continue; }
1256
1257 let mut tp = TofPacket::new();
1258 tp.packet_type = ptype;
1259 let mut payload = vec![0u8;size as usize];
1260
1261 match self.file_reader.read_exact(&mut payload) {
1262 Err(err) => {
1263 debug!("Unable to read from file! {err}");
1264 if self.file_index == self.filenames.len() -1 {
1265 return None;
1266 } else {
1267 self.file_index += 1;
1268 let nextfilename = self.filenames[self.file_index].clone();
1269 let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1270 self.cursor = 0;
1271 self.file_reader = BufReader::new(nextfile);
1272 return self.get_next_packet();
1273 }
1274 }
1275 Ok(_) => {
1276 self.cursor += size as usize;
1277 }
1278 }
1279 tp.payload = payload;
1280 let mut tail = vec![0u8; 2];
1282 match self.file_reader.read_exact(&mut tail) {
1283 Err(err) => {
1284 debug!("Unable to read from file! {err}");
1285 if self.file_index == self.filenames.len() -1 {
1286 return None;
1287 } else {
1288 self.file_index += 1;
1289 let nextfilename = self.filenames[self.file_index].clone();
1290 let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
1291 self.cursor = 0;
1292 self.file_reader = BufReader::new(nextfile);
1293 return self.get_next_packet();
1294 }
1295 }
1296 Ok(_) => {
1297 self.cursor += 2;
1298 }
1299 }
1300 let tail = parse_u16(&tail,&mut 0);
1301 if tail != TofPacket::TAIL {
1302 debug!("TofPacket TAIL signature wrong!");
1303 return None;
1304 }
1305 self.n_packs_read += 1;
1306 return Some(tp);
1307 }
1308 } } } }
1312
1313impl Default for TofPacketReader {
1314 fn default() -> Self {
1315 TofPacketReader::new(String::from(""))
1316 }
1317}
1318
1319impl Iterator for TofPacketReader {
1320 type Item = TofPacket;
1321
1322 fn next(&mut self) -> Option<Self::Item> {
1323 self.get_next_packet()
1324 }
1325}
1326
1327pub struct TofPacketWriter {
1333
1334 pub file : File,
1335 pub file_path : String,
1337 pub pkts_per_file : usize,
1342 pub mbytes_per_file : usize,
1346 pub file_type : FileType,
1348 pub file_name : String,
1349
1350 file_id : usize,
1351 n_packets : usize,
1354 file_nbytes_wr : usize,
1357}
1358
1359impl TofPacketWriter {
1360
1361 pub fn new(mut file_path : String, file_type : FileType) -> Self {
1370 let file : File;
1372 let file_name : String;
1373 if !file_path.ends_with("/") {
1374 file_path += "/";
1375 }
1376 match file_type {
1377 FileType::Unknown => {
1378 let filename = file_path.clone() + "Data.tof.gaps";
1379 let path = Path::new(&filename);
1380 info!("Writing to file {filename}");
1381 file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1382 file_name = filename;
1383 }
1384 FileType::RunFile(runid) => {
1385 let filename = format!("{}{}", file_path, get_runfilename(runid, 0, None));
1386 let path = Path::new(&filename);
1387 println!("Writing to file {filename}");
1388 file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1389 file_name = filename;
1390 }
1391 FileType::CalibrationFile(rbid) => {
1392 let filename = format!("{}{}", file_path, get_califilename(rbid, false));
1393 let path = Path::new(&filename);
1395 info!("Writing to file {filename}");
1396 file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1397 file_name = filename;
1398 }
1399 FileType::SummaryFile(ref fname) => {
1400 let filename = fname.replace(".tof.", ".tofsum.");
1401 let path = Path::new(&filename);
1402 info!("Writing to file {filename}");
1403 file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1404 file_name = filename;
1405 }
1406 }
1407 Self {
1408 file,
1409 file_path : file_path,
1410 pkts_per_file : 0,
1411 mbytes_per_file : 420,
1412 file_nbytes_wr : 0,
1413 file_type : file_type,
1414 file_id : 1,
1415 n_packets : 0,
1416 file_name : file_name,
1417 }
1418 }
1419
1420 pub fn get_file(&self) -> File {
1421 let file : File;
1422 match &self.file_type {
1423 FileType::Unknown => {
1424 let filename = self.file_path.clone() + "Data.tof.gaps";
1425 let path = Path::new(&filename);
1426 info!("Writing to file {filename}");
1427 file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1428 }
1429 FileType::RunFile(runid) => {
1430 let filename = format!("{}{}", self.file_path, get_runfilename(*runid, self.file_id as u64, None));
1431 let path = Path::new(&filename);
1433 info!("Writing to file {filename}");
1434 file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1435 }
1436 FileType::CalibrationFile(rbid) => {
1437 let filename = format!("{}{}", self.file_path, get_califilename(*rbid, false));
1439 let path = Path::new(&filename);
1440 info!("Writing to file {filename}");
1441 file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1442 }
1443 FileType::SummaryFile(fname) => {
1444 let filename = fname.replace(".tof.", ".tofsum.");
1445 let path = Path::new(&filename);
1446 info!("Writing to file {filename}");
1447 file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
1448 }
1449 }
1450 file
1451 }
1452
1453 pub fn add_tof_packet(&mut self, packet : &TofPacket) {
1457 let buffer = packet.to_bytestream();
1458 self.file_nbytes_wr += buffer.len();
1459 match self.file.write_all(buffer.as_slice()) {
1460 Err(err) => error!("Writing to file to path {} failed! {}", self.file_path, err),
1461 Ok(_) => ()
1462 }
1463 self.n_packets += 1;
1464 let mut newfile = false;
1465 if self.pkts_per_file != 0 {
1466 if self.n_packets == self.pkts_per_file {
1467 newfile = true;
1468 self.n_packets = 0;
1469 }
1470 } else if self.mbytes_per_file != 0 {
1471 if self.file_nbytes_wr >= self.mbytes_per_file * 1_048_576 {
1473 newfile = true;
1474 self.file_nbytes_wr = 0;
1475 }
1476 }
1477 if newfile {
1478 match self.file.sync_all() {
1480 Err(err) => {
1481 error!("Unable to sync file to disc! {err}");
1482 },
1483 Ok(_) => ()
1484 }
1485 self.file = self.get_file();
1486 self.file_id += 1;
1487 }
1493 debug!("TofPacket written!");
1494 }
1495}
1496
1497impl Default for TofPacketWriter {
1498 fn default() -> TofPacketWriter {
1499 TofPacketWriter::new(String::from(""), FileType::Unknown)
1500 }
1501}
1502
1503pub struct RobinReader {
1511 pub streamer : RBEventMemoryStreamer,
1512 pub filename : String,
1513 file_reader : Option<BufReader<File>>,
1514 pub board_id : u8,
1515 cache : HashMap<u32, RBEvent>,
1517 index : HashMap<u32, usize>,
1519 n_events_read : usize,
1521 n_bytes_read : usize,
1522 pub eof_reached : bool,
1523 pub extra_filenames : Vec<String>,
1524}
1525
1526impl RobinReader {
1527
1528 const EVENT_SIZE : usize = 18530;
1531
1532 pub fn new(filename : String) -> Self {
1533 let filename_c = filename.clone();
1534 let mut robin_reader = Self {
1535 streamer : RBEventMemoryStreamer::new(),
1536 filename : String::from(""),
1537 file_reader : None,
1538 board_id : 0,
1539 cache : HashMap::<u32,RBEvent>::new(),
1540 index : HashMap::<u32,usize>::new(),
1541 eof_reached : false,
1542 n_events_read : 0,
1543 n_bytes_read : 0,
1544 extra_filenames : Vec::<String>::new(),
1545 };
1546 robin_reader.open(filename_c);
1547 robin_reader.init();
1548 robin_reader
1549 }
1550
1551 pub fn add_file(&mut self, filename : String) {
1552 self.extra_filenames.push(filename);
1553 }
1554
1555 fn init(&mut self) {
1556 if let Some(ev) = self.next() {
1567 self.board_id = ev.header.rb_id;
1568 let rewind : i64 = RobinReader::EVENT_SIZE.try_into().expect("That needs to fit!");
1569 match self.file_reader.as_mut().unwrap().seek(SeekFrom::Current(rewind)) {
1570 Err(err) => {
1571 error!("Read first event, but can not rewind stream! Err {}", err);
1572 panic!("I don't understand, panicking...");
1573 }
1574 Ok(_) => {
1575 self.n_bytes_read = 0;
1576 self.n_events_read = 0;
1577 }
1578 }
1579 } else {
1580 panic!("I can not find a single event in this file! Panicking!");
1581 }
1582 }
1584
1585 pub fn get_from_cache(&mut self, event_id : &u32) -> Option<RBEvent> {
1586 self.cache.remove(event_id)
1587 }
1588
1589 pub fn cache_all_events(&mut self) {
1590 self.rewind();
1591 while !self.eof_reached {
1592 match self.next() {
1593 None => {
1594 break;
1595 }
1596 Some(ev) => {
1597 self.cache.insert(ev.header.event_id, ev);
1599 }
1600 }
1601 }
1602 info!("Cached {} events!", self.cache.len());
1603 }
1604
1605 pub fn generate_index(&mut self) {
1610 if self.n_events_read > 0 {
1611 error!("Can not generate index when events have already been read! Use ::rewind() first!");
1612 return;
1613 }
1614 self.n_events_read = 0;
1615 let pb = ProgressBar::new_spinner();
1616 pb.set_style(ProgressStyle::default_spinner().template("{spinner:.green} Generating eventid index...").unwrap());
1617 let mut seen_before = 0usize;
1618 let mut total_events = 0usize;
1619 while !self.eof_reached {
1620 if let Some(ev) = self.next() {
1621 if self.index.contains_key(&ev.header.event_id) {
1622 debug!("We have seen this event id {} before!", ev.header.event_id);
1623 seen_before += 1;
1624 }
1625 self.index.insert(ev.header.event_id,self.n_events_read);
1626 self.n_events_read += 1;
1627 total_events += 1;
1628 }
1629 pb.tick();
1630 }
1631 if seen_before > 0 {
1632 error!("There have been duplicate event ids! In total, we discard {}/{}", seen_before, total_events);
1633 }
1634 info!("Generated index by reading {} events!", self.n_events_read);
1635 self.rewind();
1636 info!("Generated index for {} events!", self.index.len());
1637 }
1638
1639 pub fn get_cache_size(&self) -> usize {
1640 self.cache.len()
1641 }
1642
1643 pub fn print_index(&self) {
1644 let mut reverse_index = HashMap::<usize, u32>::new();
1645 for k in self.index.keys() {
1646 reverse_index.insert(self.index[k], *k);
1647 }
1648 debug!("Generated reversed index of size {}", reverse_index.len());
1649 let mut sorted_keys: Vec<&usize> = reverse_index.keys().collect();
1654 sorted_keys.sort();
1655 }
1662
1663 pub fn is_indexed(&self, event_id : &u32) -> bool {
1664 self.index.contains_key(event_id)
1665 }
1666
1667
1668 pub fn get_in_order(&mut self, event_id : &u32) -> Option<RBEvent> {
1672 if !self.is_indexed(event_id) {
1673 error!("Can not get event {} since it is not in the index!", event_id);
1674 return None;
1675 }
1676 let event_idx = self.index.remove(event_id).unwrap();
1677 if self.n_events_read > event_idx {
1678 error!("Can not get event {} since we have already read it. You can use ::rewind() and try again!", event_id);
1679 return None;
1680 } else {
1681 let delta = event_idx - self.n_events_read;
1682 let mut n_read = 0usize;
1683 loop {
1685 match self.next() {
1686 Some(ev) => {
1687 n_read += 1;
1688 if n_read == delta {
1689 return Some(ev);
1690 }
1691 },
1692 None => {
1693 break;
1694 }
1695 }
1696 }
1697 }
1698 None
1699 }
1700
1701 pub fn rewind(&mut self) {
1703 warn!("Rewinding {}", self.filename);
1704 let mut rewind : i64 = self.n_bytes_read.try_into().unwrap();
1705 rewind = -1*rewind;
1706 debug!("Attempting to rewind {rewind} bytes");
1707 match self.file_reader.as_mut().unwrap().seek(SeekFrom::Current(rewind)) {
1708 Err(err) => {
1709 error!("Can not rewind file buffer! Error {err}");
1710 }
1711 Ok(_) => {
1712 info!("File rewound by {rewind} bytes!");
1713 self.n_events_read = 0;
1714 self.n_bytes_read = 0;
1715 }
1716 }
1717 self.eof_reached = false;
1718 }
1719
1720 pub fn open(&mut self, filename : String) {
1721 if self.filename != "" {
1722 warn!("Overiding previously set filename {}", self.filename);
1723 }
1724 let self_filename = filename.clone();
1725 self.filename = self_filename;
1726 if filename != "" {
1727 let path = Path::new(&filename);
1728 info!("Reading from {}", &self.filename);
1729 let file = OpenOptions::new().create(false).append(true).read(true).open(path).expect("Unable to open file {filename}");
1730 self.file_reader = Some(BufReader::new(file));
1731 }
1732 }
1733
1734 pub fn precache_events(&mut self, n_events : usize) {
1735 self.cache.clear();
1736 let mut n_ev = 0usize;
1737 if self.eof_reached {
1738 return;
1739 }
1740 for _ in 0..n_events {
1741 let event = self.next();
1742 n_ev += 1;
1743 if let Some(ev) = event {
1744 self.cache.insert(ev.header.event_id, ev);
1745 } else {
1746 error!("Can not cache {}th event!", n_ev);
1747 self.eof_reached = true;
1748 break
1749 }
1750 }
1751 }
1752
1753 pub fn max_cached_event_id(&self) -> Option<u32> {
1754 let keys : Vec<u32> = self.cache.keys().cloned().collect();
1755 keys.iter().max().copied()
1756 }
1757
1758 pub fn min_cached_event_id(&self) -> Option<u32> {
1759 let keys : Vec<u32> = self.cache.keys().cloned().collect();
1760 keys.iter().min().copied()
1761 }
1762
1763 pub fn is_cached(&self, event_id : &u32) -> bool {
1764 let keys : Vec<&u32> = self.cache.keys().collect();
1765 keys.contains(&event_id)
1766 }
1767
1768 pub fn get_event_by_id(&mut self, event_id : &u32) -> Option<RBEvent> {
1769 self.cache.remove(event_id)
1770 }
1771
1772 pub fn is_expired(&self) -> bool {
1773 self.eof_reached && self.cache.len() == 0
1774 }
1775
1776 pub fn event_ids_in_cache(&self) -> Vec<u32> {
1777 trace!("We have {} elements in the cache!", self.cache.len());
1778 let mut keys : Vec<u32> = self.cache.keys().cloned().collect();
1779 trace!("We have {} elements in the cache!", keys.len());
1780 keys.sort();
1781 keys
1782 }
1783
1784 pub fn get_events(&self) -> Vec<RBEvent> {
1785 self.cache.values().cloned().collect()
1786 }
1787
1788 pub fn count_packets(&self) -> u64 {
1789 let metadata = self.file_reader.as_ref().unwrap().get_ref().metadata().unwrap();
1790 let file_size = metadata.len();
1791 let n_packets = file_size/RobinReader::EVENT_SIZE as u64;
1792 info!("The file {} contains likely ~{} event packets!", self.filename, n_packets);
1793 n_packets
1794 }
1795}
1796
1797impl Default for RobinReader {
1798
1799 fn default() -> Self {
1800 RobinReader::new(String::from(""))
1801 }
1802}
1803
1804impl Iterator for RobinReader {
1805 type Item = RBEvent;
1806
1807 fn next(&mut self) -> Option<Self::Item> {
1808 match self.streamer.next() {
1809 Some(event) => {
1810 return Some(event);
1811 },
1812 None => {
1813 const CHUNKSIZE : usize = 200000;
1816 let mut buffer = [0u8;CHUNKSIZE];
1817 match self.file_reader.as_mut().unwrap().read(&mut buffer) {
1818 Err(err) => {
1819 error!("Unable to read any bytes from file {}, {}", self.filename, err);
1820 return None;
1821 },
1822 Ok(_nbytes) => {
1823 self.n_bytes_read += _nbytes;
1824 if _nbytes == 0 {
1825 self.eof_reached = true;
1826 if self.extra_filenames.len() > 0 {
1827 let next_filename = self.extra_filenames.pop().unwrap();
1828 self.open(next_filename);
1829 self.eof_reached = false;
1830 match self.file_reader.as_mut().unwrap().read(&mut buffer) {
1831 Err(err) => {
1832 error!("Failed reading bytes from buffer! {}", err);
1833 },
1834 Ok(_nbytes2) => {}
1835 }
1836 }
1837 return None;
1838 }
1839 self.streamer.add(&buffer.to_vec(), _nbytes);
1840 match self.streamer.next() {
1841 None => {
1842 return None;
1844 },
1845 Some(event) => {
1846 return Some(event);
1847 }
1849 }
1850 }
1851 }
1852 }
1853 }
1854 }
1855}
1856
1857#[test]
1858fn crc32() {
1859 let crc32_sum = Crc::<u32>::new(&ALGO);
1860 let mut dig = crc32_sum.digest();
1861 dig.update(&0u16.to_le_bytes());
1862 let result = dig.finalize();
1863 assert_eq!(1104745215,result);
1865}
1866
1867