1use std::io::{
8 BufReader,
9 Read,
10 Seek,
11 SeekFrom,
12};
13use std::cmp::Ordering;
14
15use crate::prelude::*;
16
17#[derive(Debug)]
21#[cfg_attr(feature="pybindings", pyclass)]
22pub struct TelemetryPacketReader {
23 pub filenames : Vec<String>,
27 pub file_idx : usize,
30 pub dedup : bool,
35 pub start_time : Option<f64>,
37 pub end_time : Option<f64>,
39 file_reader : BufReader<File>,
40 cursor : usize,
42 pub filter : TelemetryPacketType,
44 n_packs_read : usize,
46 n_packs_skipped : usize,
48 pub skip_ahead : usize,
50 pub stop_after : usize,
52 pub n_duplicates : usize,
54 dedup_cache : HashMap<u16, VecDeque<u16>>,
57 pub packet_cache : Vec<TelemetryPacket>,
61 pub tof_paddles : Arc<HashMap<u8,TofPaddle>>,
64 pub trk_strips : Arc<HashMap<u32, TrackerStrip>>,
66}
67
68
69impl TelemetryPacketReader {
70 pub fn new(filename_or_directory : String, dedup : bool, start_time : Option<f64>, end_time : Option<f64>) -> Self {
71 #[cfg(feature="database")]
72 let mut paddles = HashMap::<u8, TofPaddle>::new();
73 #[cfg(not(feature="database"))]
74 let paddles = HashMap::<u8, TofPaddle>::new();
75 #[cfg(feature="database")]
76 let mut strips = HashMap::<u32, TrackerStrip>::with_capacity(11520);
77 #[cfg(not(feature="database"))]
78 let strips = HashMap::<u32, TrackerStrip>::with_capacity(11520);
79 #[cfg(feature="database")]
80 match TofPaddle::all_as_dict() {
81 Err(err) => {
82 error!("Unable to retrieve paddle information from DB! {err}");
83 }
84 Ok(pdls) => {
85 paddles = pdls;
86 }
87 }
88 #[cfg(feature="database")]
89 match TrackerStrip::all_as_dict() {
90 Err(err) => {
91 error!("Unable to retrieve tracker strip information from DB! {err}");
92 }
95 Ok(strips_) => {
96 strips = strips_;
97 }
98 }
99 let firstfile : String;
100 let re = Regex::new(r"RAW(\d{6})_(\d{6})\.bin$").unwrap();
101 match list_path_contents_sorted(&filename_or_directory, Some(re)) {
102 Err(err) => {
103 error!("{} does not seem to be either a valid directory or an existing file! {err}", filename_or_directory);
104 panic!("Unable to open files!");
105 }
106 Ok(mut files) => {
107
108 if let Some(start) = start_time {
109 info!("Removing files earlier than {}", start);
110 files.retain(|x| { get_unix_timestamp_from_telemetry(&x).unwrap() as f64 >= start} );
111 }
112 if let Some(end) = end_time {
113 info!("Removing files later than {}", end);
114 files.retain(|x| { get_unix_timestamp_from_telemetry(&x).unwrap() as f64 <= end } );
115 }
116 firstfile = files[0].clone();
117 match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
118 Err(err) => {
119 error!("Unable to open file {firstfile}! {err}");
120 panic!("Unable to create reader from {filename_or_directory}!");
121 }
122 Ok(file) => {
123 let mut dedup_cache = HashMap::<u16, VecDeque<u16>>::with_capacity(u16::MAX as usize + 1);
125 for k in 0..u16::MAX as usize + 1 {
126 dedup_cache.insert(k as u16, VecDeque::<u16>::with_capacity(4));
127 }
128 let packet_reader = Self {
129 filenames : files,
130 file_idx : 0,
131 dedup : dedup,
132 start_time : start_time,
133 end_time : end_time,
134 file_reader : BufReader::new(file),
135 cursor : 0,
136 filter : TelemetryPacketType::Unknown,
137 n_packs_read : 0,
138 skip_ahead : 0,
139 stop_after : 0,
140 n_packs_skipped : 0,
141 n_duplicates : 0,
142 dedup_cache : dedup_cache,
143 packet_cache : Vec::<TelemetryPacket>::new(),
144 tof_paddles : Arc::new(paddles),
145 trk_strips : Arc::new(strips),
146 };
147 packet_reader
148 }
149 }
150 }
151 }
152 }
153
154 pub fn cache_all_packets(&mut self) {
160 loop {
161 match self.read_next_item() {
162 None => {
163 info!("Read all packets!");
164 break;
165 }
166 Some(pack) => {
167 self.packet_cache.push(pack);
168 }
169 }
170 }
171 self.packet_cache.sort_by(|a,b|{
174 b.header.get_gcutime().partial_cmp(&a.header.get_gcutime()).unwrap_or(Ordering::Equal)
175 .then(b.header.counter.cmp(&a.header.counter))
176 });
177 self.packet_cache.reverse();
180 }
181
182 pub fn clear_dedup_cache(&mut self) {
183 let mut dedup_cache = HashMap::<u16, VecDeque<u16>>::with_capacity(u16::MAX as usize + 1);
184 for k in 0..u16::MAX as usize + 1 {
185 dedup_cache.insert(k as u16, VecDeque::<u16>::with_capacity(4));
186 }
187 self.dedup_cache = dedup_cache;
188 }
189
190 pub fn count_packets(&mut self) -> (usize, usize, HashMap<TelemetryPacketType,usize>) {
192 let _ = self.rewind();
193 self.clear_dedup_cache();
194 let mut nframes = 0usize;
195 let mut buffer = [0];
196 let mut incomplete = 0usize;
197 let mut index = HashMap::<TelemetryPacketType,usize>::new();
198 for k in TelemetryPacketType::iter() {
199 index.insert(k, 0);
200 }
201 let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
202 let bar_style = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
203 let bar = ProgressBar::new(self.filenames.len() as u64);
204 bar.set_position(0);
205 bar.set_message (String::from("Counting packets.."));
206 bar.set_prefix ("\u{2728}");
207 bar.set_style (bar_style);
208 bar.set_position(self.file_idx as u64);
209 loop {
210 match self.file_reader.read_exact(&mut buffer) {
211 Err(err) => {
212 debug!("Unable to read from file! {err}");
213 match self.prime_next_file() {
214 None => break,
215 Some(_) => {
216 bar.set_position(self.file_idx as u64);
217 continue;
218 }
219 };
220 }
221 Ok(_) => {
222 self.cursor += 1;
223 }
224 }
225
226 if buffer[0] != 0xeb {
229 continue;
230 } else {
231 match self.file_reader.read_exact(&mut buffer) {
232 Err(err) => {
233 debug!("Unable to read from file! {err}");
234 match self.prime_next_file() {
235 None => break,
236 Some(_) => {
237 bar.set_position(self.file_idx as u64);
238 continue;
239 }
240 };
241 }
242 Ok(_) => {
243 self.cursor += 1;
244 }
245 }
246 if buffer[0] != 0x90 {
248 continue;
249 } else {
250 match self.file_reader.read_exact(&mut buffer) {
252 Err(err) => {
253 debug!("Unable to read from file! {err}");
254 match self.prime_next_file() {
255 None => break,
256 Some(_) => {
257 bar.set_position(self.file_idx as u64);
258 continue;
259 }
260 };
261 }
262 Ok(_) => {
263 *index.get_mut(&TelemetryPacketType::from(buffer[0])).unwrap() += 1;
264 self.cursor += 1;
265 }
266 }
267 let mut buffer_skip = [0,0,0,0,0,0];
270 match self.file_reader.read_exact(&mut buffer_skip) {
271 Err(err) => {
272 debug!("Unable to read from file! {err}");
273 match self.prime_next_file() {
274 None => break,
275 Some(_) => {
276 bar.set_position(self.file_idx as u64);
277 continue;
278 }
279 };
280 }
281 Ok(_) => {
282 self.cursor += 6;
283 }
284 }
285 let mut buffer_psize = [0,0];
286 match self.file_reader.read_exact(&mut buffer_psize) {
287 Err(_err) => {
288 match self.prime_next_file() {
289 None => break,
290 Some(_) => {
291 bar.set_position(self.file_idx as u64);
292 continue;
293 }
294 }
295 }
296 Ok(_) => {
297 self.cursor += 2;
298 }
299 }
300 let vec_data = buffer_psize.to_vec();
301 let size = parse_u16(&vec_data, &mut 0) - 13;
304 let mut temp_buffer = vec![0; size as usize];
305 match self.file_reader.read_exact(&mut buffer_psize) {
307 Err(_err) => {
308 match self.prime_next_file() {
309 None => break,
310 Some(_) => {
311 bar.set_position(self.file_idx as u64);
312 continue;
313 }
314 }
315 }
316 Ok(_) => {
317 self.cursor += 2;
318 }
319 }
320 match self.file_reader.read_exact(&mut temp_buffer) {
321 Err(err) => {
324 incomplete += 1;
325 warn!("Unable to read {size} bytes from {}! {err}", self.get_current_filename().unwrap());
326 match self.prime_next_file() {
327 None => break,
328 Some(_) => {
329 bar.set_position(self.file_idx as u64);
330 continue;
331 }
332 }
333 }
334 Ok(_) => {
335 self.cursor += size as usize;
336 nframes += 1;
337 }
338 }
339 }
340 } } bar.finish_with_message("Done!");
343 let _ = self.rewind();
344 (nframes, incomplete, index)
345 } pub fn read_next_item(&mut self) -> Option<TelemetryPacket> {
353 let mut buffer = [0];
355 loop {
356 match self.file_reader.read_exact(&mut buffer) {
357 Err(err) => {
358 debug!("Unable to read from file! {err}");
359 self.prime_next_file()?;
360 return self.read_next_item();
361 }
362 Ok(_) => {
363 self.cursor += 1;
364 }
365 }
366 if buffer[0] != 0xeb {
367 continue;
368 } else {
369 match self.file_reader.read_exact(&mut buffer) {
370 Err(err) => {
371 debug!("Unable to read from file! {err}");
372 self.prime_next_file()?;
373 return self.read_next_item();
374 }
375 Ok(_) => {
376 self.cursor += 1;
377 }
378 }
379
380 if buffer[0] != 0x90 {
381 continue;
382 } else {
383 match self.file_reader.read_exact(&mut buffer) {
386 Err(err) => {
387 debug!("Unable to read from file! {err}");
388 self.prime_next_file()?;
389 return self.read_next_item();
390 }
391 Ok(_) => {
392 self.cursor += 1;
393 }
394 }
395 let mut thead = TelemetryPacketHeader::new();
396 thead.sync = 0x90eb;
397 thead.packet_type = TelemetryPacketType::from(buffer[0]);
398 let mut buffer_ts = [0,0,0,0];
400 match self.file_reader.read_exact(&mut buffer_ts) {
401 Err(err) => {
402 debug!("Unable to read from file! {err}");
403 self.prime_next_file()?;
404 return self.read_next_item();
405 }
406 Ok(_) => {
407 self.cursor += 4;
408 thead.timestamp = u32::from_le_bytes(buffer_ts);
409 }
410 }
411 let mut buffer_counter = [0,0];
412 match self.file_reader.read_exact(&mut buffer_counter) {
413 Err(err) => {
414 debug!("Unable to read from file! {err}");
415 self.prime_next_file()?;
416 return self.read_next_item();
417 }
418 Ok(_) => {
419 self.cursor += 2;
420 thead.counter = u16::from_le_bytes(buffer_counter);
421 }
422 }
423 let mut buffer_length = [0,0];
424 match self.file_reader.read_exact(&mut buffer_length) {
425 Err(err) => {
426 debug!("Unable to read from file! {err}");
427 return None;
428 }
429 Ok(_) => {
430 self.cursor += 2;
431 thead.length = u16::from_le_bytes(buffer_length);
432 }
433 }
434 let mut buffer_checksum = [0,0];
435 match self.file_reader.read_exact(&mut buffer_checksum) {
436 Err(err) => {
437 debug!("Unable to read from file! {err}");
438 self.prime_next_file()?;
439 return self.read_next_item();
440 }
441 Ok(_) => {
442 self.cursor += 2;
443 thead.checksum = u16::from_le_bytes(buffer_checksum);
444 }
445 }
446
447 let mut size = thead.length;
448 if (size as usize) < TelemetryPacketHeader::SIZE {
450 error!("This packet might be empty or corrupt!");
451 return None;
452 }
453 size -= TelemetryPacketHeader::SIZE as u16;
454 if thead.packet_type != self.filter && self.filter != TelemetryPacketType::Unknown {
455 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
456 Err(err) => {
457 debug!("Unable to read more data! {err}");
458 self.prime_next_file()?;
459 return self.read_next_item();
460 }
461 Ok(_) => {
462 self.cursor += size as usize;
463 }
464 }
465 continue; }
467 if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
470 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
472 Err(err) => {
473 debug!("Unable to read more data! {err}");
474 self.prime_next_file()?;
475 return self.read_next_item();
476 }
477 Ok(_) => {
478 self.n_packs_skipped += 1;
479 self.cursor += size as usize;
480 }
481 }
482 continue; }
484 if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
485 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
487 Err(err) => {
488 debug!("Unable to read more data! {err}");
489 self.prime_next_file()?;
490 return self.read_next_item();
491 }
492 Ok(_) => {
493 self.cursor += size as usize;
494 }
495 }
496 continue; }
498
499
500 let mut tp = TelemetryPacket::new();
501 tp.header = thead;
502
503 let mut payload = vec![0u8;size as usize];
516 match self.file_reader.read_exact(&mut payload) {
517 Err(err) => {
518 debug!("Unable to read from file! {err}");
519 self.prime_next_file()?;
520 return self.read_next_item();
521 }
522 Ok(_) => {
523 self.cursor += tp.header.length as usize;
524 }
525 }
526
527 tp.payload = payload;
528 if tp.header.packet_type == TelemetryPacketType::InterestingEvent
529 || tp.header.packet_type == TelemetryPacketType::BoringEvent
530 || tp.header.packet_type == TelemetryPacketType::NoGapsTriggerEvent {
531 tp.tof_paddles = self.tof_paddles.clone();
532 tp.trk_strips = self.trk_strips.clone();
533 }
534 self.n_packs_read += 1;
535 if self.dedup {
537 let mut will_send : bool;
538 if self.dedup_cache[&tp.header.counter].len() == 0 {
539 will_send = true;
540 } else {
541 if !self.dedup_cache.contains_key(&tp.header.counter) {
542 panic!("The dedup cache does not contain {}", tp.header.counter);
543 }
544 will_send = true;
545 for checksum in &self.dedup_cache[&tp.header.counter] {
546 if checksum == &tp.header.checksum {
547 will_send = false;
548 }
549 }
550 }
551 if will_send {
554 self.dedup_cache.get_mut(&tp.header.counter).unwrap().push_back(tp.header.checksum);
555 return Some(tp);
556 } else {
557 self.n_duplicates += 1;
560 if self.dedup_cache[&tp.header.counter].len() > 4 {
561 self.dedup_cache.get_mut(&tp.header.counter).unwrap().pop_front();
562 }
563
564 return self.read_next_item();
565 }
566 }
567 if self.start_time.is_some() {
568 if tp.header.get_gcutime() < self.start_time.unwrap() {
569 return self.read_next_item();
570 }
571 }
572 if self.end_time.is_some() {
573 if tp.header.get_gcutime() > self.end_time.unwrap() {
574 return self.read_next_item();
575 }
576 }
577 return Some(tp);
578 }
579 } } } }
583
584impl Default for TelemetryPacketReader {
585 fn default() -> Self {
586 TelemetryPacketReader::new(String::from(""), false, None, None)
587 }
588}
589
590impl fmt::Display for TelemetryPacketReader {
591 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
592 let mut range_repr = String::from("");
593 if self.skip_ahead > 0 {
594 range_repr += &(format!("({}", self.skip_ahead));
595 } else {
596 range_repr += "(";
597 }
598 if self.stop_after > 0 {
599 range_repr += &(format!("..{})", self.stop_after));
600 } else {
601 range_repr += "..)";
602 }
603 let repr = format!("<TelemetryPacketReader : read {} packets, filter {}, range {},\n files {:?}>", self.n_packs_read, self.filter, range_repr, self.filenames);
604 write!(f, "{}", repr)
605 }
606}
607
608reader!(TelemetryPacketReader, TelemetryPacket);
609
610#[cfg(feature="pybindings")]
611#[pymethods]
612impl TelemetryPacketReader {
613
614 #[new]
615 #[pyo3(signature = (filenames_or_directory, dedup = false, start_time = None, end_time = None))]
616 fn new_py(filenames_or_directory : &Bound<'_,PyAny>, dedup : bool, start_time : Option<f64>, end_time : Option<f64>) -> PyResult<Self> {
617
618 let mut string_value = String::from("foo");
619 let mut fnames = Vec::<String>::new();
620 if let Ok(s) = filenames_or_directory.extract::<String>() {
621 string_value = s;
622 } if let Ok(fspath_method) = filenames_or_directory.getattr("__fspath__") {
624 if let Ok(fspath_result) = fspath_method.call0() {
625 if let Ok(py_string) = fspath_result.extract::<String>() {
626 string_value = py_string;
627 }
628 }
629 }
630 if let Ok(list) = filenames_or_directory.extract::<Vec<String>>() {
631 for k in list {
632 fnames.push(k);
633 } }
643 let mut reader : Self;
644 if fnames.len() > 0 {
645 string_value = fnames[0].clone();
646 reader = Self::new(string_value, dedup, start_time, end_time);
647 reader.filenames = fnames;
648 } else {
649 reader = Self::new(string_value, dedup, start_time, end_time);
650 }
651 Ok(reader)
652 }
661
662 #[getter]
663 fn get_n_duplicates(&self) -> usize {
664 self.n_duplicates
665 }
666
667 #[pyo3(name = "count_packets")]
668 fn count_packets_py(&mut self) -> (usize,usize,HashMap<TelemetryPacketType,usize>) {
669 self.count_packets()
670 }
671
672 #[pyo3(name = "cache_all_packets")]
673 fn cache_all_packets_py(&mut self) {
674 self.cache_all_packets();
675 }
676
677 #[pyo3(name = "copy_packet_cache")]
683 fn copy_packet_cache(&self) -> Vec<TelemetryPacket> {
684 self.packet_cache.clone()
685 }
686
687 #[getter]
698 fn filenames(&self) -> Vec<String> {
699 self.filenames.clone()
700 }
701 #[getter]
714 #[pyo3(name="current_filename")]
715 fn get_current_filename_py(&self) -> Option<&str> {
716 self.get_current_filename()
717 }
718
719 #[pyo3(name="rewind")]
723 fn rewind_py(&mut self) -> PyResult<()> {
724 self.clear_dedup_cache();
727 match self.rewind() {
728 Err(err) => {
729 return Err(PyValueError::new_err(err.to_string()));
730 }
731 Ok(_) => Ok(())
732 }
733 }
734
735 fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
741 slf
742 }
743
744 fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<TelemetryPacket> {
745 slf.next()
746 }
755}
756
757#[cfg(feature="pybindings")]
758pythonize_display!(TelemetryPacketReader);
759
760