telemetry_dataclasses/
io.rs1use std::fmt;
2use std::fs::{
3 self,
4 File,
5};
6use std::io;
7use std::io::SeekFrom;
8use std::io::Seek;
9use std::io::BufReader;
10use std::path::Path;
11use std::collections::HashMap;
12use std::fs::OpenOptions;
13use std::io::Read;
14use std::io::ErrorKind;
15use regex::Regex;
16
17use log::{
18 debug,
19 error
20};
21
22use tof_dataclasses::io::read_file;
23use tof_dataclasses::serialization::{
24 search_for_u16,
25 Serialization,
26 parse_u32,
28};
29
30use crate::packets::{
31 TelemetryHeader,
32 TelemetryPacket,
33 MergedEvent,
34 TrackerPacket,
35 GapsEvent,
36};
37use crate::packets::TelemetryPacketType;
38
39pub fn get_gaps_events(filename : String) -> Vec<GapsEvent> {
41 let mut events = Vec::<GapsEvent>::new();
42 let stream = read_file(Path::new(&filename)).expect("Unable to open input file!");
43 let mut pos : usize = 0;
44 let mut packet_types = Vec::<u8>::new();
46 loop {
47 match TelemetryHeader::from_bytestream(&stream, &mut pos) {
48 Err(err) => {
49 println!("Can not decode telemtry header! {err}");
50 match search_for_u16(0x90eb, &stream, pos) {
54 Err(err) => {
55 println!("Unable to find next header! {err}");
56 break;
57 }
58 Ok(head_pos) => {
59 pos = head_pos;
60 }
61 }
62 }
63 Ok(header) => {
64 println!("HEADER {}", header);
65 if header.ptype == 80 {
69 match TrackerPacket::from_bytestream(&stream, &mut pos) {
70 Err(err) => {
71 println!("Unable to decode TrackerPacket! {err}");
75 }
76 Ok(mut tp) => {
77 tp.telemetry_header = header;
78 println!("{}", tp);
79 }
80 }
81 }
82 if header.ptype == 90 {
83 match MergedEvent::from_bytestream(&stream, &mut pos) {
84 Err(err) => {
85 println!("Unable to decode MergedEvent! {err}");
86 }
87 Ok(mut me) => {
88 me.header = header;
89 let mut g_event = GapsEvent::new();
90 g_event.tof = me.tof_event;
94 g_event.tracker = me.tracker_events;
95 events.push(g_event)
96 }
97 }
98 }
99 packet_types.push(header.ptype);
101 match search_for_u16(0x90eb, &stream, pos) {
102 Err(err) => {
103 println!("Unable to find next header! {err}");
104 break;
105 }
106 Ok(head_pos) => {
107 pos = head_pos;
108 }
109 }
110 }
111 }
112 }
113 events
114}
115
116
117#[derive(Debug)]
121pub struct TelemetryPacketReader {
122 pub filenames : Vec<String>,
126 pub file_index : usize,
129 file_reader : BufReader<File>,
130 cursor : usize,
132 pub filter : TelemetryPacketType,
134 n_packs_read : usize,
136 n_packs_skipped : usize,
138 pub skip_ahead : usize,
140 pub stop_after : usize,
142}
143
144impl fmt::Display for TelemetryPacketReader {
145 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
146 let mut range_repr = String::from("");
147 if self.skip_ahead > 0 {
148 range_repr += &(format!("({}", self.skip_ahead));
149 } else {
150 range_repr += "(";
151 }
152 if self.stop_after > 0 {
153 range_repr += &(format!("..{})", self.stop_after));
154 } else {
155 range_repr += "..)";
156 }
157 let repr = format!("<TelemetryPacketReader : read {} packets, filter {}, range {},\n files {:?}>", self.n_packs_read, self.filter, range_repr, self.filenames);
158 write!(f, "{}", repr)
159 }
160}
161
162impl TelemetryPacketReader {
163
164 fn list_path_contents_sorted(input: &str) -> Result<Vec<String>, io::Error> {
165 let path = Path::new(input);
166 match fs::metadata(path) {
167 Ok(metadata) => {
168 if metadata.is_file() {
169 let fname = String::from(input);
170 return Ok(vec![fname]);
171 }
172 if metadata.is_dir() {
173 let re = Regex::new(r"RAW(\d{6})_(\d{6})\.bin$").unwrap();
174
175 let mut entries: Vec<(u32, u32, String)> = fs::read_dir(path)?
176 .filter_map(Result::ok) .filter_map(|entry| {
178 let filename = format!("{}/{}", path.display(), entry.file_name().into_string().ok()?);
179 re.captures(&filename.clone()).map(|caps| {
180 let date = caps.get(1)?.as_str().parse::<u32>().ok()?;
181 let time = caps.get(2)?.as_str().parse::<u32>().ok()?;
182 Some((date, time, filename))
183 })?
184 })
185 .collect();
186
187 entries.sort_by(|a, b| (a.0, a.1).cmp(&(b.0, b.1)));
189 return Ok(entries.into_iter().map(|(_, _, name)| name).collect());
191 }
192 Err(io::Error::new(ErrorKind::Other, "Path exists but is neither a file nor a directory"))
193 }
194 Err(e) => Err(e),
195 }
196 }
197
198 pub fn new(filename_or_directory : String) -> Self {
199 let firstfile : String;
200 match Self::list_path_contents_sorted(&filename_or_directory) {
201 Err(err) => {
202 error!("{} does not seem to be either a valid directory or an existing file! {err}", filename_or_directory);
203 panic!("Unable to open files!");
204 }
205 Ok(files) => {
206 firstfile = files[0].clone();
207 match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
208 Err(err) => {
209 error!("Unable to open file {firstfile}! {err}");
210 panic!("Unable to create reader from {filename_or_directory}!");
211 }
212 Ok(file) => {
213 let packet_reader = Self {
214 filenames : files,
215 file_index : 0,
216 file_reader : BufReader::new(file),
217 cursor : 0,
218 filter : TelemetryPacketType::Unknown,
219 n_packs_read : 0,
220 skip_ahead : 0,
221 stop_after : 0,
222 n_packs_skipped : 0,
223 };
224 packet_reader
225 }
226 }
227 }
228 }
229 }
230
231 pub fn get_packet_index(&mut self) -> io::Result<HashMap<u8, usize>> {
235 error!("The packet index function is currently broken and will only show the packet index for one file, not for all!");
236 error!("FIXME!");
237 let mut index = HashMap::<u8, usize>::new();
238 let mut buffer = [0];
239 loop {
240 match self.file_reader.read_exact(&mut buffer) {
241 Err(err) => {
242 debug!("Unable to read from file! {err}");
243 break;
245 }
246 Ok(_) => {
247 self.cursor += 1;
248 }
249 }
250 if buffer[0] != 0xeb {
251 continue;
252 } else {
253 match self.file_reader.read_exact(&mut buffer) {
254 Err(err) => {
255 debug!("Unable to read from file! {err}");
256 break;
258 }
259 Ok(_) => {
260 self.cursor += 1;
261 }
262 }
263
264 if buffer[0] != 0x90 {
265 continue;
266 } else {
267 match self.file_reader.read_exact(&mut buffer) {
269 Err(err) => {
270 debug!("Unable to read from file! {err}");
271 break;
272 }
273 Ok(_) => {
274 self.cursor += 1;
275 }
276 }
277 let ptype = TelemetryPacketType::from(buffer[0]);
278 let mut padding = [0,0,0,0,0,0];
279 match self.file_reader.read_exact(&mut padding) {
280 Err(err) => {
281 error!("Unable to read from file! {err}");
282 break;
283 }
284 Ok(_) => {
285 self.cursor += 6;
286 }
287 }
288 let mut buffer_psize = [0,0,0,0];
291 match self.file_reader.read_exact(&mut buffer_psize) {
292 Err(err) => {
293 error!("Unable to read from file! {err}");
294 break;
295 }
296 Ok(_) => {
297 self.cursor += 4;
298 }
299 }
300 let vec_data = buffer_psize.to_vec();
301 let mut size = parse_u32(&vec_data, &mut 0);
302 if (size as usize) < TelemetryHeader::SIZE {
304 error!("This packet might be empty or corrupt!");
305 break;
306 }
307 size -= TelemetryHeader::SIZE as u32;
308
309 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
310 Err(err) => {
311 debug!("Unable to read more data! {err}");
312 break;
313 }
314 Ok(_) => {
315 self.cursor += size as usize;
316 let ptype_key = ptype as u8;
319 if index.contains_key(&ptype_key) {
320 *index.get_mut(&ptype_key).unwrap() += 1;
321 } else {
322 index.insert(ptype_key, 1usize);
323 }
324 }
325 }
326 }
327 } } self.rewind()?;
330 Ok(index)
331 } pub fn rewind(&mut self) -> io::Result<()> {
334 let firstfile = &self.filenames[0];
335 match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
336 Err(err) => {
337 error!("Unable to open file {firstfile}! {err}");
338 panic!("Unable to create reader from {firstfile}!");
339 }
340 Ok(file) => {
341 self.file_reader = BufReader::new(file);
342 }
343 }
344 self.file_index = 0;
345 self.cursor = 0;
346 Ok(())
347 }
348
349 fn prime_next_file(&mut self) -> Option<usize> {
351 if self.file_index == self.filenames.len() -1 {
352 return None;
353 } else {
354 self.file_index += 1;
355 let nextfilename = self.filenames[self.file_index].clone();
356 let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
357 self.file_reader = BufReader::new(nextfile);
358 self.cursor = 0;
359 return Some(self.file_index);
360 }
361 }
362
363 pub fn get_next_packet(&mut self) -> Option<TelemetryPacket> {
369 let mut buffer = [0];
371 loop {
372 match self.file_reader.read_exact(&mut buffer) {
373 Err(err) => {
374 debug!("Unable to read from file! {err}");
375 self.prime_next_file()?;
376 return self.get_next_packet();
377 }
378 Ok(_) => {
379 self.cursor += 1;
380 }
381 }
382 if buffer[0] != 0xeb {
383 continue;
384 } else {
385 match self.file_reader.read_exact(&mut buffer) {
386 Err(err) => {
387 debug!("Unable to read from file! {err}");
388 self.prime_next_file()?;
389 return self.get_next_packet();
390 }
391 Ok(_) => {
392 self.cursor += 1;
393 }
394 }
395
396 if buffer[0] != 0x90 {
397 continue;
398 } else {
399 match self.file_reader.read_exact(&mut buffer) {
401 Err(err) => {
402 debug!("Unable to read from file! {err}");
403 self.prime_next_file()?;
404 return self.get_next_packet();
405 }
406 Ok(_) => {
407 self.cursor += 1;
408 }
409 }
410 let mut thead = TelemetryHeader::new();
411 thead.sync = 0x90eb;
412 thead.ptype = buffer[0];
413 let ptype = TelemetryPacketType::from(buffer[0]);
414 let mut buffer_ts = [0,0,0,0];
416 match self.file_reader.read_exact(&mut buffer_ts) {
417 Err(err) => {
418 debug!("Unable to read from file! {err}");
419 self.prime_next_file()?;
420 return self.get_next_packet();
421 }
422 Ok(_) => {
423 self.cursor += 4;
424 thead.timestamp = u32::from_le_bytes(buffer_ts);
425 }
426 }
427 let mut buffer_counter = [0,0];
428 match self.file_reader.read_exact(&mut buffer_counter) {
429 Err(err) => {
430 debug!("Unable to read from file! {err}");
431 self.prime_next_file()?;
432 return self.get_next_packet();
433 }
434 Ok(_) => {
435 self.cursor += 2;
436 thead.counter = u16::from_le_bytes(buffer_counter);
437 }
438 }
439 let mut buffer_length = [0,0];
440 match self.file_reader.read_exact(&mut buffer_length) {
441 Err(err) => {
442 debug!("Unable to read from file! {err}");
443 return None;
444 }
445 Ok(_) => {
446 self.cursor += 2;
447 thead.length = u16::from_le_bytes(buffer_length);
448 }
449 }
450 let mut buffer_checksum = [0,0];
451 match self.file_reader.read_exact(&mut buffer_checksum) {
452 Err(err) => {
453 debug!("Unable to read from file! {err}");
454 self.prime_next_file()?;
455 return self.get_next_packet();
456 }
457 Ok(_) => {
458 self.cursor += 2;
459 thead.checksum = u16::from_le_bytes(buffer_checksum);
460 }
461 }
462
463 let mut size = thead.length;
464 if (size as usize) < TelemetryHeader::SIZE {
466 error!("This packet might be empty or corrupt!");
467 return None;
468 }
469 size -= TelemetryHeader::SIZE as u16;
470 if ptype != self.filter && self.filter != TelemetryPacketType::Unknown {
471 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
472 Err(err) => {
473 debug!("Unable to read more data! {err}");
474 self.prime_next_file()?;
475 return self.get_next_packet();
476 }
477 Ok(_) => {
478 self.cursor += size as usize;
479 }
480 }
481 continue; }
483 if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
486 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
488 Err(err) => {
489 debug!("Unable to read more data! {err}");
490 self.prime_next_file()?;
491 return self.get_next_packet();
492 }
493 Ok(_) => {
494 self.n_packs_skipped += 1;
495 self.cursor += size as usize;
496 }
497 }
498 continue; }
500 if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
501 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
503 Err(err) => {
504 debug!("Unable to read more data! {err}");
505 self.prime_next_file()?;
506 return self.get_next_packet();
507 }
508 Ok(_) => {
509 self.cursor += size as usize;
510 }
511 }
512 continue; }
514
515
516 let mut tp = TelemetryPacket::new();
517 tp.header = thead;
518
519 let mut payload = vec![0u8;size as usize];
532 match self.file_reader.read_exact(&mut payload) {
533 Err(err) => {
534 debug!("Unable to read from file! {err}");
535 self.prime_next_file()?;
536 return self.get_next_packet();
537 }
538 Ok(_) => {
539 self.cursor += tp.header.length as usize;
540 }
541 }
542
543 tp.payload = payload;
544 self.n_packs_read += 1;
545 return Some(tp);
546 }
547 } } } }
551
552impl Default for TelemetryPacketReader {
553 fn default() -> Self {
554 TelemetryPacketReader::new(String::from(""))
555 }
556}
557
558impl Iterator for TelemetryPacketReader {
559 type Item = TelemetryPacket;
560
561 fn next(&mut self) -> Option<Self::Item> {
562 self.get_next_packet()
563 }
564}
565
566