telemetry_dataclasses/
io.rsuse std::fmt;
use std::fs::File;
use std::io;
use std::io::SeekFrom;
use std::io::Seek;
use std::io::BufReader;
use std::path::Path;
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::Read;
use log::{
debug,
error
};
use tof_dataclasses::io::read_file;
use tof_dataclasses::serialization::{
search_for_u16,
Serialization,
parse_u32,
};
use crate::packets::{
TelemetryHeader,
TelemetryPacket,
MergedEvent,
TrackerPacket,
GapsEvent,
};
use tof_dataclasses::packets::{
TofPacket,
PacketType,
};
use tof_dataclasses::events::TofEventSummary;
use crate::packets::TelemetryPacketType;
pub fn get_gaps_events(filename : String) -> Vec<GapsEvent> {
let mut events = Vec::<GapsEvent>::new();
let stream = read_file(Path::new(&filename)).expect("Unable to open input file!");
let mut pos : usize = 0;
let mut packet_types = Vec::<u8>::new();
loop {
match TelemetryHeader::from_bytestream(&stream, &mut pos) {
Err(err) => {
println!("Can not decode telemtry header! {err}");
match search_for_u16(0x90eb, &stream, pos) {
Err(err) => {
println!("Unable to find next header! {err}");
break;
}
Ok(head_pos) => {
pos = head_pos;
}
}
}
Ok(header) => {
println!("HEADER {}", header);
if header.ptype == 80 {
match TrackerPacket::from_bytestream(&stream, &mut pos) {
Err(err) => {
println!("Unable to decode TrackerPacket! {err}");
}
Ok(mut tp) => {
tp.telemetry_header = header;
println!("{}", tp);
}
}
}
if header.ptype == 90 {
match MergedEvent::from_bytestream(&stream, &mut pos) {
Err(err) => {
println!("Unable to decode MergedEvent! {err}");
}
Ok(mut me) => {
me.header = header;
let mut g_event = GapsEvent::new();
match TofPacket::from_bytestream(&me.tof_data, &mut 0) {
Err(err) => {
println!("Can't unpack TofPacket! {err}");
}
Ok(tp) => {
println!("{}", tp);
if tp.packet_type == PacketType::TofEventSummary {
match TofEventSummary::from_tofpacket(&tp) {
Err(err) => println!("Can't unpack TofEventSummary! {err}"),
Ok(ts) => {
println!("{}", ts);
g_event.tof = ts;
}
}
}
}
}
g_event.tracker = me.tracker_events;
events.push(g_event)
}
}
}
packet_types.push(header.ptype);
match search_for_u16(0x90eb, &stream, pos) {
Err(err) => {
println!("Unable to find next header! {err}");
break;
}
Ok(head_pos) => {
pos = head_pos;
}
}
}
}
}
events
}
#[derive(Debug)]
pub struct TelemetryPacketReader {
pub filename : String,
file_reader : BufReader<File>,
cursor : usize,
pub filter : TelemetryPacketType,
n_packs_read : usize,
n_packs_skipped : usize,
pub skip_ahead : usize,
pub stop_after : usize,
}
impl fmt::Display for TelemetryPacketReader {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut range_repr = String::from("");
if self.skip_ahead > 0 {
range_repr += &(format!("({}", self.skip_ahead));
} else {
range_repr += "(";
}
if self.stop_after > 0 {
range_repr += &(format!("..{})", self.stop_after));
} else {
range_repr += "..)";
}
let repr = format!("<TelemetryPacketReader : file {}, read {} packets, filter {}, range {}>", self.filename, self.n_packs_read, self.filter, range_repr);
write!(f, "{}", repr)
}
}
impl TelemetryPacketReader {
pub fn new(filename : String) -> TelemetryPacketReader {
let fname_c = filename.clone();
let file = OpenOptions::new().create(false).append(false).read(true).open(fname_c).expect("Unable to open file {filename}");
let packet_reader = Self {
filename,
file_reader : BufReader::new(file),
cursor : 0,
filter : TelemetryPacketType::Unknown,
n_packs_read : 0,
skip_ahead : 0,
stop_after : 0,
n_packs_skipped : 0,
};
packet_reader
}
pub fn get_packet_index(&mut self) -> io::Result<HashMap<u8, usize>> {
let mut index = HashMap::<u8, usize>::new();
let mut buffer = [0];
loop {
match self.file_reader.read_exact(&mut buffer) {
Err(err) => {
debug!("Unable to read from file! {err}");
break;
}
Ok(_) => {
self.cursor += 1;
}
}
if buffer[0] != 0xeb {
continue;
} else {
match self.file_reader.read_exact(&mut buffer) {
Err(err) => {
debug!("Unable to read from file! {err}");
break;
}
Ok(_) => {
self.cursor += 1;
}
}
if buffer[0] != 0x90 {
continue;
} else {
match self.file_reader.read_exact(&mut buffer) {
Err(err) => {
debug!("Unable to read from file! {err}");
break;
}
Ok(_) => {
self.cursor += 1;
}
}
let ptype = TelemetryPacketType::from(buffer[0]);
let mut padding = [0,0,0,0,0,0];
match self.file_reader.read_exact(&mut padding) {
Err(err) => {
error!("Unable to read from file! {err}");
break;
}
Ok(_) => {
self.cursor += 6;
}
}
let mut buffer_psize = [0,0,0,0];
match self.file_reader.read_exact(&mut buffer_psize) {
Err(err) => {
error!("Unable to read from file! {err}");
break;
}
Ok(_) => {
self.cursor += 4;
}
}
let vec_data = buffer_psize.to_vec();
let mut size = parse_u32(&vec_data, &mut 0);
if (size as usize) < TelemetryHeader::SIZE {
error!("This packet might be empty or corrupt!");
break;
}
size -= TelemetryHeader::SIZE as u32;
match self.file_reader.seek(SeekFrom::Current(size as i64)) {
Err(err) => {
debug!("Unable to read more data! {err}");
break;
}
Ok(_) => {
self.cursor += size as usize;
let ptype_key = ptype as u8;
if index.contains_key(&ptype_key) {
*index.get_mut(&ptype_key).unwrap() += 1;
} else {
index.insert(ptype_key, 1usize);
}
}
}
}
} } self.rewind()?;
Ok(index)
} pub fn rewind(&mut self) -> io::Result<()> {
self.file_reader.rewind()?;
self.cursor = 0;
Ok(())
}
pub fn get_next_packet(&mut self) -> Option<TelemetryPacket> {
let mut buffer = [0];
loop {
match self.file_reader.read_exact(&mut buffer) {
Err(err) => {
debug!("Unable to read from file! {err}");
return None;
}
Ok(_) => {
self.cursor += 1;
}
}
if buffer[0] != 0xeb {
continue;
} else {
match self.file_reader.read_exact(&mut buffer) {
Err(err) => {
debug!("Unable to read from file! {err}");
return None;
}
Ok(_) => {
self.cursor += 1;
}
}
if buffer[0] != 0x90 {
continue;
} else {
match self.file_reader.read_exact(&mut buffer) {
Err(err) => {
debug!("Unable to read from file! {err}");
return None;
}
Ok(_) => {
self.cursor += 1;
}
}
let mut thead = TelemetryHeader::new();
thead.sync = 0x90eb;
thead.ptype = buffer[0];
let ptype = TelemetryPacketType::from(buffer[0]);
let mut buffer_ts = [0,0,0,0];
match self.file_reader.read_exact(&mut buffer_ts) {
Err(err) => {
debug!("Unable to read from file! {err}");
return None;
}
Ok(_) => {
self.cursor += 4;
thead.timestamp = u32::from_le_bytes(buffer_ts);
}
}
let mut buffer_counter = [0,0];
match self.file_reader.read_exact(&mut buffer_counter) {
Err(err) => {
debug!("Unable to read from file! {err}");
return None;
}
Ok(_) => {
self.cursor += 2;
thead.counter = u16::from_le_bytes(buffer_counter);
}
}
let mut buffer_length = [0,0];
match self.file_reader.read_exact(&mut buffer_length) {
Err(err) => {
debug!("Unable to read from file! {err}");
return None;
}
Ok(_) => {
self.cursor += 2;
thead.length = u16::from_le_bytes(buffer_length);
}
}
let mut buffer_checksum = [0,0];
match self.file_reader.read_exact(&mut buffer_checksum) {
Err(err) => {
debug!("Unable to read from file! {err}");
return None;
}
Ok(_) => {
self.cursor += 2;
thead.checksum = u16::from_le_bytes(buffer_checksum);
}
}
let mut size = thead.length;
if (size as usize) < TelemetryHeader::SIZE {
error!("This packet might be empty or corrupt!");
return None;
}
size -= TelemetryHeader::SIZE as u16;
if ptype != self.filter && self.filter != TelemetryPacketType::Unknown {
match self.file_reader.seek(SeekFrom::Current(size as i64)) {
Err(err) => {
debug!("Unable to read more data! {err}");
return None;
}
Ok(_) => {
self.cursor += size as usize;
}
}
continue; }
if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
match self.file_reader.seek(SeekFrom::Current(size as i64)) {
Err(err) => {
debug!("Unable to read more data! {err}");
return None;
}
Ok(_) => {
self.n_packs_skipped += 1;
self.cursor += size as usize;
}
}
continue; }
if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
match self.file_reader.seek(SeekFrom::Current(size as i64)) {
Err(err) => {
debug!("Unable to read more data! {err}");
return None;
}
Ok(_) => {
self.cursor += size as usize;
}
}
continue; }
let mut tp = TelemetryPacket::new();
tp.header = thead;
let mut payload = vec![0u8;size as usize];
match self.file_reader.read_exact(&mut payload) {
Err(err) => {
debug!("Unable to read from file! {err}");
return None;
}
Ok(_) => {
self.cursor += tp.header.length as usize;
}
}
tp.payload = payload;
self.n_packs_read += 1;
return Some(tp);
}
} } } }
impl Default for TelemetryPacketReader {
fn default() -> Self {
TelemetryPacketReader::new(String::from(""))
}
}
impl Iterator for TelemetryPacketReader {
type Item = TelemetryPacket;
fn next(&mut self) -> Option<Self::Item> {
self.get_next_packet()
}
}