1use std::io::{
8 BufReader,
9 Read,
10 Seek,
11 SeekFrom,
12};
13
14use crate::prelude::*;
15
16#[derive(Debug)]
20#[cfg_attr(feature="pybindings", pyclass)]
21pub struct TelemetryPacketReader {
22 pub filenames : Vec<String>,
26 pub file_idx : usize,
29 pub dedup : bool,
34 pub start_time : Option<f64>,
36 pub end_time : Option<f64>,
38 file_reader : BufReader<File>,
39 cursor : usize,
41 pub filter : TelemetryPacketType,
43 n_packs_read : usize,
45 n_packs_skipped : usize,
47 pub skip_ahead : usize,
49 pub stop_after : usize,
51 pub n_duplicates : usize,
53 dedup_cache : HashMap<u16, VecDeque<u16>>
56}
57
58
59impl TelemetryPacketReader {
60 pub fn new(filename_or_directory : String, dedup : bool, start_time : Option<f64>, end_time : Option<f64>) -> Self {
61 let firstfile : String;
62 let re = Regex::new(r"RAW(\d{6})_(\d{6})\.bin$").unwrap();
63 match list_path_contents_sorted(&filename_or_directory, Some(re)) {
64 Err(err) => {
65 error!("{} does not seem to be either a valid directory or an existing file! {err}", filename_or_directory);
66 panic!("Unable to open files!");
67 }
68 Ok(mut files) => {
69
70 if let Some(start) = start_time {
71 info!("Removing files earlier than {}", start);
72 files.retain(|x| { get_unix_timestamp_from_telemetry(&x).unwrap() as f64 >= start} );
73 }
74 if let Some(end) = end_time {
75 info!("Removing files later than {}", end);
76 files.retain(|x| { get_unix_timestamp_from_telemetry(&x).unwrap() as f64 <= end } );
77 }
78 firstfile = files[0].clone();
79 match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
80 Err(err) => {
81 error!("Unable to open file {firstfile}! {err}");
82 panic!("Unable to create reader from {filename_or_directory}!");
83 }
84 Ok(file) => {
85 let mut dedup_cache = HashMap::<u16, VecDeque<u16>>::with_capacity(u16::MAX as usize + 1);
87 for k in 0..u16::MAX as usize + 1 {
88 dedup_cache.insert(k as u16, VecDeque::<u16>::with_capacity(4));
89 }
90 let packet_reader = Self {
91 filenames : files,
92 file_idx : 0,
93 dedup : dedup,
94 start_time : start_time,
95 end_time : end_time,
96 file_reader : BufReader::new(file),
97 cursor : 0,
98 filter : TelemetryPacketType::Unknown,
99 n_packs_read : 0,
100 skip_ahead : 0,
101 stop_after : 0,
102 n_packs_skipped : 0,
103 n_duplicates : 0,
104 dedup_cache : dedup_cache,
105 };
106 packet_reader
107 }
108 }
109 }
110 }
111 }
112
113 pub fn clear_dedup_cache(&mut self) {
114 let mut dedup_cache = HashMap::<u16, VecDeque<u16>>::with_capacity(u16::MAX as usize + 1);
115 for k in 0..u16::MAX as usize + 1 {
116 dedup_cache.insert(k as u16, VecDeque::<u16>::with_capacity(4));
117 }
118 self.dedup_cache = dedup_cache;
119 }
120
121 pub fn count_packets(&mut self) -> (usize, usize, HashMap<TelemetryPacketType,usize>) {
123 let _ = self.rewind();
124 self.clear_dedup_cache();
125 let mut nframes = 0usize;
126 let mut buffer = [0];
127 let mut incomplete = 0usize;
128 let mut index = HashMap::<TelemetryPacketType,usize>::new();
129 for k in TelemetryPacketType::iter() {
130 index.insert(k, 0);
131 }
132 let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
133 let bar_style = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
134 let bar = ProgressBar::new(self.filenames.len() as u64);
135 bar.set_position(0);
136 bar.set_message (String::from("Counting packets.."));
137 bar.set_prefix ("\u{2728}");
138 bar.set_style (bar_style);
139 bar.set_position(self.file_idx as u64);
140 loop {
141 match self.file_reader.read_exact(&mut buffer) {
142 Err(err) => {
143 debug!("Unable to read from file! {err}");
144 match self.prime_next_file() {
145 None => break,
146 Some(_) => {
147 bar.set_position(self.file_idx as u64);
148 continue;
149 }
150 };
151 }
152 Ok(_) => {
153 self.cursor += 1;
154 }
155 }
156
157 if buffer[0] != 0xeb {
160 continue;
161 } else {
162 match self.file_reader.read_exact(&mut buffer) {
163 Err(err) => {
164 debug!("Unable to read from file! {err}");
165 match self.prime_next_file() {
166 None => break,
167 Some(_) => {
168 bar.set_position(self.file_idx as u64);
169 continue;
170 }
171 };
172 }
173 Ok(_) => {
174 self.cursor += 1;
175 }
176 }
177 if buffer[0] != 0x90 {
179 continue;
180 } else {
181 match self.file_reader.read_exact(&mut buffer) {
183 Err(err) => {
184 debug!("Unable to read from file! {err}");
185 match self.prime_next_file() {
186 None => break,
187 Some(_) => {
188 bar.set_position(self.file_idx as u64);
189 continue;
190 }
191 };
192 }
193 Ok(_) => {
194 *index.get_mut(&TelemetryPacketType::from(buffer[0])).unwrap() += 1;
195 self.cursor += 1;
196 }
197 }
198 let mut buffer_skip = [0,0,0,0,0,0,0];
201 match self.file_reader.read_exact(&mut buffer_skip) {
202 Err(err) => {
203 debug!("Unable to read from file! {err}");
204 match self.prime_next_file() {
205 None => break,
206 Some(_) => {
207 bar.set_position(self.file_idx as u64);
208 continue;
209 }
210 };
211 }
212 Ok(_) => {
213 self.cursor += 7;
214 }
215 }
216 let mut buffer_psize = [0,0];
217 match self.file_reader.read_exact(&mut buffer_psize) {
218 Err(_err) => {
219 match self.prime_next_file() {
220 None => break,
221 Some(_) => {
222 bar.set_position(self.file_idx as u64);
223 continue;
224 }
225 }
226 }
227 Ok(_) => {
228 self.cursor += 2;
229 }
230 }
231 let vec_data = buffer_psize.to_vec();
232 let size = parse_u16(&vec_data, &mut 0);
233 let mut temp_buffer = vec![0; size as usize];
234 match self.file_reader.read_exact(&mut buffer_psize) {
236 Err(_err) => {
237 match self.prime_next_file() {
238 None => break,
239 Some(_) => {
240 bar.set_position(self.file_idx as u64);
241 continue;
242 }
243 }
244 }
245 Ok(_) => {
246 self.cursor += 2;
247 }
248 }
249 match self.file_reader.read_exact(&mut temp_buffer) {
250 Err(err) => {
253 incomplete += 1;
254 warn!("Unable to read {size} bytes from {}! {err}", self.get_current_filename().unwrap());
255 match self.prime_next_file() {
256 None => break,
257 Some(_) => {
258 bar.set_position(self.file_idx as u64);
259 continue;
260 }
261 }
262 }
263 Ok(_) => {
264 self.cursor += size as usize;
265 nframes += 1;
266 }
267 }
268 }
269 } } bar.finish_with_message("Done!");
272 let _ = self.rewind();
273 (nframes, incomplete, index)
274 } pub fn read_next_item(&mut self) -> Option<TelemetryPacket> {
282 let mut buffer = [0];
284 loop {
285 match self.file_reader.read_exact(&mut buffer) {
286 Err(err) => {
287 debug!("Unable to read from file! {err}");
288 self.prime_next_file()?;
289 return self.read_next_item();
290 }
291 Ok(_) => {
292 self.cursor += 1;
293 }
294 }
295 if buffer[0] != 0xeb {
296 continue;
297 } else {
298 match self.file_reader.read_exact(&mut buffer) {
299 Err(err) => {
300 debug!("Unable to read from file! {err}");
301 self.prime_next_file()?;
302 return self.read_next_item();
303 }
304 Ok(_) => {
305 self.cursor += 1;
306 }
307 }
308
309 if buffer[0] != 0x90 {
310 continue;
311 } else {
312 match self.file_reader.read_exact(&mut buffer) {
315 Err(err) => {
316 debug!("Unable to read from file! {err}");
317 self.prime_next_file()?;
318 return self.read_next_item();
319 }
320 Ok(_) => {
321 self.cursor += 1;
322 }
323 }
324 let mut thead = TelemetryPacketHeader::new();
325 thead.sync = 0x90eb;
326 thead.packet_type = TelemetryPacketType::from(buffer[0]);
327 let ptype = TelemetryPacketType::from(buffer[0]);
328 let mut buffer_ts = [0,0,0,0];
330 match self.file_reader.read_exact(&mut buffer_ts) {
331 Err(err) => {
332 debug!("Unable to read from file! {err}");
333 self.prime_next_file()?;
334 return self.read_next_item();
335 }
336 Ok(_) => {
337 self.cursor += 4;
338 thead.timestamp = u32::from_le_bytes(buffer_ts);
339 }
340 }
341 let mut buffer_counter = [0,0];
342 match self.file_reader.read_exact(&mut buffer_counter) {
343 Err(err) => {
344 debug!("Unable to read from file! {err}");
345 self.prime_next_file()?;
346 return self.read_next_item();
347 }
348 Ok(_) => {
349 self.cursor += 2;
350 thead.counter = u16::from_le_bytes(buffer_counter);
351 }
352 }
353 let mut buffer_length = [0,0];
354 match self.file_reader.read_exact(&mut buffer_length) {
355 Err(err) => {
356 debug!("Unable to read from file! {err}");
357 return None;
358 }
359 Ok(_) => {
360 self.cursor += 2;
361 thead.length = u16::from_le_bytes(buffer_length);
362 }
363 }
364 let mut buffer_checksum = [0,0];
365 match self.file_reader.read_exact(&mut buffer_checksum) {
366 Err(err) => {
367 debug!("Unable to read from file! {err}");
368 self.prime_next_file()?;
369 return self.read_next_item();
370 }
371 Ok(_) => {
372 self.cursor += 2;
373 thead.checksum = u16::from_le_bytes(buffer_checksum);
374 }
375 }
376
377 let mut size = thead.length;
378 if (size as usize) < TelemetryPacketHeader::SIZE {
380 error!("This packet might be empty or corrupt!");
381 return None;
382 }
383 size -= TelemetryPacketHeader::SIZE as u16;
384 if ptype != self.filter && self.filter != TelemetryPacketType::Unknown {
385 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
386 Err(err) => {
387 debug!("Unable to read more data! {err}");
388 self.prime_next_file()?;
389 return self.read_next_item();
390 }
391 Ok(_) => {
392 self.cursor += size as usize;
393 }
394 }
395 continue; }
397 if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
400 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
402 Err(err) => {
403 debug!("Unable to read more data! {err}");
404 self.prime_next_file()?;
405 return self.read_next_item();
406 }
407 Ok(_) => {
408 self.n_packs_skipped += 1;
409 self.cursor += size as usize;
410 }
411 }
412 continue; }
414 if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
415 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
417 Err(err) => {
418 debug!("Unable to read more data! {err}");
419 self.prime_next_file()?;
420 return self.read_next_item();
421 }
422 Ok(_) => {
423 self.cursor += size as usize;
424 }
425 }
426 continue; }
428
429
430 let mut tp = TelemetryPacket::new();
431 tp.header = thead;
432
433 let mut payload = vec![0u8;size as usize];
446 match self.file_reader.read_exact(&mut payload) {
447 Err(err) => {
448 debug!("Unable to read from file! {err}");
449 self.prime_next_file()?;
450 return self.read_next_item();
451 }
452 Ok(_) => {
453 self.cursor += tp.header.length as usize;
454 }
455 }
456
457 tp.payload = payload;
458 self.n_packs_read += 1;
459 if self.dedup {
461 let mut will_send : bool;
462 if self.dedup_cache[&tp.header.counter].len() == 0 {
463 will_send = true;
464 } else {
465 if !self.dedup_cache.contains_key(&tp.header.counter) {
466 panic!("The dedup cache does not contain {}", tp.header.counter);
467 }
468 will_send = true;
469 for checksum in &self.dedup_cache[&tp.header.counter] {
470 if checksum == &tp.header.checksum {
471 will_send = false;
472 }
473 }
474 }
475 if will_send {
478 self.dedup_cache.get_mut(&tp.header.counter).unwrap().push_back(tp.header.checksum);
479 return Some(tp);
480 } else {
481 self.n_duplicates += 1;
484 if self.dedup_cache[&tp.header.counter].len() > 4 {
485 self.dedup_cache.get_mut(&tp.header.counter).unwrap().pop_front();
486 }
487
488 return self.read_next_item();
489 }
490 }
491 if self.start_time.is_some() {
492 if tp.header.get_gcutime() < self.start_time.unwrap() {
493 return self.read_next_item();
494 }
495 }
496 if self.end_time.is_some() {
497 if tp.header.get_gcutime() > self.end_time.unwrap() {
498 return self.read_next_item();
499 }
500 }
501 return Some(tp);
502 }
503 } } } }
507
508impl Default for TelemetryPacketReader {
509 fn default() -> Self {
510 TelemetryPacketReader::new(String::from(""), false, None, None)
511 }
512}
513
514impl fmt::Display for TelemetryPacketReader {
515 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
516 let mut range_repr = String::from("");
517 if self.skip_ahead > 0 {
518 range_repr += &(format!("({}", self.skip_ahead));
519 } else {
520 range_repr += "(";
521 }
522 if self.stop_after > 0 {
523 range_repr += &(format!("..{})", self.stop_after));
524 } else {
525 range_repr += "..)";
526 }
527 let repr = format!("<TelemetryPacketReader : read {} packets, filter {}, range {},\n files {:?}>", self.n_packs_read, self.filter, range_repr, self.filenames);
528 write!(f, "{}", repr)
529 }
530}
531
532reader!(TelemetryPacketReader, TelemetryPacket);
533
534#[cfg(feature="pybindings")]
535#[pymethods]
536impl TelemetryPacketReader {
537
538 #[new]
539 #[pyo3(signature = (filenames_or_directory, dedup = true, start_time = None, end_time = None))]
540 fn new_py(filenames_or_directory : &Bound<'_,PyAny>, dedup : bool, start_time : Option<f64>, end_time : Option<f64>) -> PyResult<Self> {
541
542 let mut string_value = String::from("foo");
543 let mut fnames = Vec::<String>::new();
544 if let Ok(s) = filenames_or_directory.extract::<String>() {
545 string_value = s;
546 } if let Ok(fspath_method) = filenames_or_directory.getattr("__fspath__") {
548 if let Ok(fspath_result) = fspath_method.call0() {
549 if let Ok(py_string) = fspath_result.extract::<String>() {
550 string_value = py_string;
551 }
552 }
553 }
554 if let Ok(list) = filenames_or_directory.extract::<Vec<String>>() {
555 for k in list {
556 fnames.push(k);
557 } }
567 let mut reader : Self;
568 if fnames.len() > 0 {
569 string_value = fnames[0].clone();
570 reader = Self::new(string_value, dedup, start_time, end_time);
571 reader.filenames = fnames;
572 } else {
573 reader = Self::new(string_value, dedup, start_time, end_time);
574 }
575 Ok(reader)
576 }
585
586 #[getter]
587 fn get_n_duplicates(&self) -> usize {
588 self.n_duplicates
589 }
590
591 #[pyo3(name = "count_packets")]
592 fn count_packets_py(&mut self) -> (usize,usize,HashMap<TelemetryPacketType,usize>) {
593 self.count_packets()
594 }
595
596 #[getter]
607 fn filenames(&self) -> Vec<String> {
608 self.filenames.clone()
609 }
610 #[getter]
623 #[pyo3(name="current_filename")]
624 fn get_current_filename_py(&self) -> Option<&str> {
625 self.get_current_filename()
626 }
627
628 #[pyo3(name="rewind")]
632 fn rewind_py(&mut self) -> PyResult<()> {
633 self.clear_dedup_cache();
636 match self.rewind() {
637 Err(err) => {
638 return Err(PyValueError::new_err(err.to_string()));
639 }
640 Ok(_) => Ok(())
641 }
642 }
643
644 fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
650 slf
651 }
652
653 fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<TelemetryPacket> {
654 slf.next()
655 }
664}
665
666#[cfg(feature="pybindings")]
667pythonize_display!(TelemetryPacketReader);
668
669