1use std::io::{
8 BufReader,
9 Read,
10 Seek,
11 SeekFrom,
12};
13use std::cmp::Ordering;
14
15use crate::prelude::*;
16
17#[derive(Debug)]
21#[cfg_attr(feature="pybindings", pyclass)]
22pub struct TelemetryPacketReader {
23 pub filenames : Vec<String>,
27 pub file_idx : usize,
30 pub dedup : bool,
35 pub start_time : Option<f64>,
37 pub end_time : Option<f64>,
39 file_reader : BufReader<File>,
40 cursor : usize,
42 pub filter : TelemetryPacketType,
44 n_packs_read : usize,
46 n_packs_skipped : usize,
48 pub skip_ahead : usize,
50 pub stop_after : usize,
52 pub n_duplicates : usize,
54 dedup_cache : HashMap<u16, VecDeque<u16>>,
57 pub packet_cache : Vec<TelemetryPacket>
61}
62
63
64impl TelemetryPacketReader {
65 pub fn new(filename_or_directory : String, dedup : bool, start_time : Option<f64>, end_time : Option<f64>) -> Self {
66 let firstfile : String;
67 let re = Regex::new(r"RAW(\d{6})_(\d{6})\.bin$").unwrap();
68 match list_path_contents_sorted(&filename_or_directory, Some(re)) {
69 Err(err) => {
70 error!("{} does not seem to be either a valid directory or an existing file! {err}", filename_or_directory);
71 panic!("Unable to open files!");
72 }
73 Ok(mut files) => {
74
75 if let Some(start) = start_time {
76 info!("Removing files earlier than {}", start);
77 files.retain(|x| { get_unix_timestamp_from_telemetry(&x).unwrap() as f64 >= start} );
78 }
79 if let Some(end) = end_time {
80 info!("Removing files later than {}", end);
81 files.retain(|x| { get_unix_timestamp_from_telemetry(&x).unwrap() as f64 <= end } );
82 }
83 firstfile = files[0].clone();
84 match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
85 Err(err) => {
86 error!("Unable to open file {firstfile}! {err}");
87 panic!("Unable to create reader from {filename_or_directory}!");
88 }
89 Ok(file) => {
90 let mut dedup_cache = HashMap::<u16, VecDeque<u16>>::with_capacity(u16::MAX as usize + 1);
92 for k in 0..u16::MAX as usize + 1 {
93 dedup_cache.insert(k as u16, VecDeque::<u16>::with_capacity(4));
94 }
95 let packet_reader = Self {
96 filenames : files,
97 file_idx : 0,
98 dedup : dedup,
99 start_time : start_time,
100 end_time : end_time,
101 file_reader : BufReader::new(file),
102 cursor : 0,
103 filter : TelemetryPacketType::Unknown,
104 n_packs_read : 0,
105 skip_ahead : 0,
106 stop_after : 0,
107 n_packs_skipped : 0,
108 n_duplicates : 0,
109 dedup_cache : dedup_cache,
110 packet_cache : Vec::<TelemetryPacket>::new()
111 };
112 packet_reader
113 }
114 }
115 }
116 }
117 }
118
119 pub fn cache_all_packets(&mut self) {
125 loop {
126 match self.read_next_item() {
127 None => {
128 info!("Read all packets!");
129 break;
130 }
131 Some(pack) => {
132 self.packet_cache.push(pack);
133 }
134 }
135 }
136 self.packet_cache.sort_by(|a,b|{
139 b.header.get_gcutime().partial_cmp(&a.header.get_gcutime()).unwrap_or(Ordering::Equal)
140 .then(b.header.counter.cmp(&a.header.counter))
141 });
142 self.packet_cache.reverse();
145 }
146
147 pub fn clear_dedup_cache(&mut self) {
148 let mut dedup_cache = HashMap::<u16, VecDeque<u16>>::with_capacity(u16::MAX as usize + 1);
149 for k in 0..u16::MAX as usize + 1 {
150 dedup_cache.insert(k as u16, VecDeque::<u16>::with_capacity(4));
151 }
152 self.dedup_cache = dedup_cache;
153 }
154
155 pub fn count_packets(&mut self) -> (usize, usize, HashMap<TelemetryPacketType,usize>) {
157 let _ = self.rewind();
158 self.clear_dedup_cache();
159 let mut nframes = 0usize;
160 let mut buffer = [0];
161 let mut incomplete = 0usize;
162 let mut index = HashMap::<TelemetryPacketType,usize>::new();
163 for k in TelemetryPacketType::iter() {
164 index.insert(k, 0);
165 }
166 let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
167 let bar_style = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
168 let bar = ProgressBar::new(self.filenames.len() as u64);
169 bar.set_position(0);
170 bar.set_message (String::from("Counting packets.."));
171 bar.set_prefix ("\u{2728}");
172 bar.set_style (bar_style);
173 bar.set_position(self.file_idx as u64);
174 loop {
175 match self.file_reader.read_exact(&mut buffer) {
176 Err(err) => {
177 debug!("Unable to read from file! {err}");
178 match self.prime_next_file() {
179 None => break,
180 Some(_) => {
181 bar.set_position(self.file_idx as u64);
182 continue;
183 }
184 };
185 }
186 Ok(_) => {
187 self.cursor += 1;
188 }
189 }
190
191 if buffer[0] != 0xeb {
194 continue;
195 } else {
196 match self.file_reader.read_exact(&mut buffer) {
197 Err(err) => {
198 debug!("Unable to read from file! {err}");
199 match self.prime_next_file() {
200 None => break,
201 Some(_) => {
202 bar.set_position(self.file_idx as u64);
203 continue;
204 }
205 };
206 }
207 Ok(_) => {
208 self.cursor += 1;
209 }
210 }
211 if buffer[0] != 0x90 {
213 continue;
214 } else {
215 match self.file_reader.read_exact(&mut buffer) {
217 Err(err) => {
218 debug!("Unable to read from file! {err}");
219 match self.prime_next_file() {
220 None => break,
221 Some(_) => {
222 bar.set_position(self.file_idx as u64);
223 continue;
224 }
225 };
226 }
227 Ok(_) => {
228 *index.get_mut(&TelemetryPacketType::from(buffer[0])).unwrap() += 1;
229 self.cursor += 1;
230 }
231 }
232 let mut buffer_skip = [0,0,0,0,0,0];
235 match self.file_reader.read_exact(&mut buffer_skip) {
236 Err(err) => {
237 debug!("Unable to read from file! {err}");
238 match self.prime_next_file() {
239 None => break,
240 Some(_) => {
241 bar.set_position(self.file_idx as u64);
242 continue;
243 }
244 };
245 }
246 Ok(_) => {
247 self.cursor += 6;
248 }
249 }
250 let mut buffer_psize = [0,0];
251 match self.file_reader.read_exact(&mut buffer_psize) {
252 Err(_err) => {
253 match self.prime_next_file() {
254 None => break,
255 Some(_) => {
256 bar.set_position(self.file_idx as u64);
257 continue;
258 }
259 }
260 }
261 Ok(_) => {
262 self.cursor += 2;
263 }
264 }
265 let vec_data = buffer_psize.to_vec();
266 let size = parse_u16(&vec_data, &mut 0) - 13;
269 let mut temp_buffer = vec![0; size as usize];
270 match self.file_reader.read_exact(&mut buffer_psize) {
272 Err(_err) => {
273 match self.prime_next_file() {
274 None => break,
275 Some(_) => {
276 bar.set_position(self.file_idx as u64);
277 continue;
278 }
279 }
280 }
281 Ok(_) => {
282 self.cursor += 2;
283 }
284 }
285 match self.file_reader.read_exact(&mut temp_buffer) {
286 Err(err) => {
289 incomplete += 1;
290 warn!("Unable to read {size} bytes from {}! {err}", self.get_current_filename().unwrap());
291 match self.prime_next_file() {
292 None => break,
293 Some(_) => {
294 bar.set_position(self.file_idx as u64);
295 continue;
296 }
297 }
298 }
299 Ok(_) => {
300 self.cursor += size as usize;
301 nframes += 1;
302 }
303 }
304 }
305 } } bar.finish_with_message("Done!");
308 let _ = self.rewind();
309 (nframes, incomplete, index)
310 } pub fn read_next_item(&mut self) -> Option<TelemetryPacket> {
318 let mut buffer = [0];
320 loop {
321 match self.file_reader.read_exact(&mut buffer) {
322 Err(err) => {
323 debug!("Unable to read from file! {err}");
324 self.prime_next_file()?;
325 return self.read_next_item();
326 }
327 Ok(_) => {
328 self.cursor += 1;
329 }
330 }
331 if buffer[0] != 0xeb {
332 continue;
333 } else {
334 match self.file_reader.read_exact(&mut buffer) {
335 Err(err) => {
336 debug!("Unable to read from file! {err}");
337 self.prime_next_file()?;
338 return self.read_next_item();
339 }
340 Ok(_) => {
341 self.cursor += 1;
342 }
343 }
344
345 if buffer[0] != 0x90 {
346 continue;
347 } else {
348 match self.file_reader.read_exact(&mut buffer) {
351 Err(err) => {
352 debug!("Unable to read from file! {err}");
353 self.prime_next_file()?;
354 return self.read_next_item();
355 }
356 Ok(_) => {
357 self.cursor += 1;
358 }
359 }
360 let mut thead = TelemetryPacketHeader::new();
361 thead.sync = 0x90eb;
362 thead.packet_type = TelemetryPacketType::from(buffer[0]);
363 let mut buffer_ts = [0,0,0,0];
365 match self.file_reader.read_exact(&mut buffer_ts) {
366 Err(err) => {
367 debug!("Unable to read from file! {err}");
368 self.prime_next_file()?;
369 return self.read_next_item();
370 }
371 Ok(_) => {
372 self.cursor += 4;
373 thead.timestamp = u32::from_le_bytes(buffer_ts);
374 }
375 }
376 let mut buffer_counter = [0,0];
377 match self.file_reader.read_exact(&mut buffer_counter) {
378 Err(err) => {
379 debug!("Unable to read from file! {err}");
380 self.prime_next_file()?;
381 return self.read_next_item();
382 }
383 Ok(_) => {
384 self.cursor += 2;
385 thead.counter = u16::from_le_bytes(buffer_counter);
386 }
387 }
388 let mut buffer_length = [0,0];
389 match self.file_reader.read_exact(&mut buffer_length) {
390 Err(err) => {
391 debug!("Unable to read from file! {err}");
392 return None;
393 }
394 Ok(_) => {
395 self.cursor += 2;
396 thead.length = u16::from_le_bytes(buffer_length);
397 }
398 }
399 let mut buffer_checksum = [0,0];
400 match self.file_reader.read_exact(&mut buffer_checksum) {
401 Err(err) => {
402 debug!("Unable to read from file! {err}");
403 self.prime_next_file()?;
404 return self.read_next_item();
405 }
406 Ok(_) => {
407 self.cursor += 2;
408 thead.checksum = u16::from_le_bytes(buffer_checksum);
409 }
410 }
411
412 let mut size = thead.length;
413 if (size as usize) < TelemetryPacketHeader::SIZE {
415 error!("This packet might be empty or corrupt!");
416 return None;
417 }
418 size -= TelemetryPacketHeader::SIZE as u16;
419 if thead.packet_type != self.filter && self.filter != TelemetryPacketType::Unknown {
420 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
421 Err(err) => {
422 debug!("Unable to read more data! {err}");
423 self.prime_next_file()?;
424 return self.read_next_item();
425 }
426 Ok(_) => {
427 self.cursor += size as usize;
428 }
429 }
430 continue; }
432 if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
435 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
437 Err(err) => {
438 debug!("Unable to read more data! {err}");
439 self.prime_next_file()?;
440 return self.read_next_item();
441 }
442 Ok(_) => {
443 self.n_packs_skipped += 1;
444 self.cursor += size as usize;
445 }
446 }
447 continue; }
449 if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
450 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
452 Err(err) => {
453 debug!("Unable to read more data! {err}");
454 self.prime_next_file()?;
455 return self.read_next_item();
456 }
457 Ok(_) => {
458 self.cursor += size as usize;
459 }
460 }
461 continue; }
463
464
465 let mut tp = TelemetryPacket::new();
466 tp.header = thead;
467
468 let mut payload = vec![0u8;size as usize];
481 match self.file_reader.read_exact(&mut payload) {
482 Err(err) => {
483 debug!("Unable to read from file! {err}");
484 self.prime_next_file()?;
485 return self.read_next_item();
486 }
487 Ok(_) => {
488 self.cursor += tp.header.length as usize;
489 }
490 }
491
492 tp.payload = payload;
493 self.n_packs_read += 1;
494 if self.dedup {
496 let mut will_send : bool;
497 if self.dedup_cache[&tp.header.counter].len() == 0 {
498 will_send = true;
499 } else {
500 if !self.dedup_cache.contains_key(&tp.header.counter) {
501 panic!("The dedup cache does not contain {}", tp.header.counter);
502 }
503 will_send = true;
504 for checksum in &self.dedup_cache[&tp.header.counter] {
505 if checksum == &tp.header.checksum {
506 will_send = false;
507 }
508 }
509 }
510 if will_send {
513 self.dedup_cache.get_mut(&tp.header.counter).unwrap().push_back(tp.header.checksum);
514 return Some(tp);
515 } else {
516 self.n_duplicates += 1;
519 if self.dedup_cache[&tp.header.counter].len() > 4 {
520 self.dedup_cache.get_mut(&tp.header.counter).unwrap().pop_front();
521 }
522
523 return self.read_next_item();
524 }
525 }
526 if self.start_time.is_some() {
527 if tp.header.get_gcutime() < self.start_time.unwrap() {
528 return self.read_next_item();
529 }
530 }
531 if self.end_time.is_some() {
532 if tp.header.get_gcutime() > self.end_time.unwrap() {
533 return self.read_next_item();
534 }
535 }
536 return Some(tp);
537 }
538 } } } }
542
543impl Default for TelemetryPacketReader {
544 fn default() -> Self {
545 TelemetryPacketReader::new(String::from(""), false, None, None)
546 }
547}
548
549impl fmt::Display for TelemetryPacketReader {
550 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
551 let mut range_repr = String::from("");
552 if self.skip_ahead > 0 {
553 range_repr += &(format!("({}", self.skip_ahead));
554 } else {
555 range_repr += "(";
556 }
557 if self.stop_after > 0 {
558 range_repr += &(format!("..{})", self.stop_after));
559 } else {
560 range_repr += "..)";
561 }
562 let repr = format!("<TelemetryPacketReader : read {} packets, filter {}, range {},\n files {:?}>", self.n_packs_read, self.filter, range_repr, self.filenames);
563 write!(f, "{}", repr)
564 }
565}
566
567reader!(TelemetryPacketReader, TelemetryPacket);
568
569#[cfg(feature="pybindings")]
570#[pymethods]
571impl TelemetryPacketReader {
572
573 #[new]
574 #[pyo3(signature = (filenames_or_directory, dedup = false, start_time = None, end_time = None))]
575 fn new_py(filenames_or_directory : &Bound<'_,PyAny>, dedup : bool, start_time : Option<f64>, end_time : Option<f64>) -> PyResult<Self> {
576
577 let mut string_value = String::from("foo");
578 let mut fnames = Vec::<String>::new();
579 if let Ok(s) = filenames_or_directory.extract::<String>() {
580 string_value = s;
581 } if let Ok(fspath_method) = filenames_or_directory.getattr("__fspath__") {
583 if let Ok(fspath_result) = fspath_method.call0() {
584 if let Ok(py_string) = fspath_result.extract::<String>() {
585 string_value = py_string;
586 }
587 }
588 }
589 if let Ok(list) = filenames_or_directory.extract::<Vec<String>>() {
590 for k in list {
591 fnames.push(k);
592 } }
602 let mut reader : Self;
603 if fnames.len() > 0 {
604 string_value = fnames[0].clone();
605 reader = Self::new(string_value, dedup, start_time, end_time);
606 reader.filenames = fnames;
607 } else {
608 reader = Self::new(string_value, dedup, start_time, end_time);
609 }
610 Ok(reader)
611 }
620
621 #[getter]
622 fn get_n_duplicates(&self) -> usize {
623 self.n_duplicates
624 }
625
626 #[pyo3(name = "count_packets")]
627 fn count_packets_py(&mut self) -> (usize,usize,HashMap<TelemetryPacketType,usize>) {
628 self.count_packets()
629 }
630
631 #[pyo3(name = "cache_all_packets")]
632 fn cache_all_packets_py(&mut self) {
633 self.cache_all_packets();
634 }
635
636 #[pyo3(name = "copy_packet_cache")]
642 fn copy_packet_cache(&self) -> Vec<TelemetryPacket> {
643 self.packet_cache.clone()
644 }
645
646 #[getter]
657 fn filenames(&self) -> Vec<String> {
658 self.filenames.clone()
659 }
660 #[getter]
673 #[pyo3(name="current_filename")]
674 fn get_current_filename_py(&self) -> Option<&str> {
675 self.get_current_filename()
676 }
677
678 #[pyo3(name="rewind")]
682 fn rewind_py(&mut self) -> PyResult<()> {
683 self.clear_dedup_cache();
686 match self.rewind() {
687 Err(err) => {
688 return Err(PyValueError::new_err(err.to_string()));
689 }
690 Ok(_) => Ok(())
691 }
692 }
693
694 fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
700 slf
701 }
702
703 fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<TelemetryPacket> {
704 slf.next()
705 }
714}
715
716#[cfg(feature="pybindings")]
717pythonize_display!(TelemetryPacketReader);
718
719