1use std::collections::{
2 HashMap,
3 VecDeque,
4};
5
6use std::time::Instant;
7
8use crossbeam_channel::{
14 Receiver,
15 Sender,
16};
17
18use telemetry_dataclasses::packets::{
19 TelemetryHeader,
20 TelemetryPacket,
21 MergedEvent,
22};
24
25use ratatui::prelude::*;
26
27use ratatui::Frame;
31use ratatui::layout::Rect;
32use ratatui::widgets::{
33 Block,
34 BorderType,
35 Borders,
36 Paragraph,
37 Table,
38 Row,
39};
40
41use tof_dataclasses::serialization::{
42 Serialization,
43};
45use tof_dataclasses::errors::SerializationError;
46use tof_dataclasses::packets::{
47 TofPacket,
48};
49use tof_dataclasses::serialization::Packable;
50
51use telemetry_dataclasses::packets::TelemetryPacketType;
52
53use crate::colors::ColorTheme;
54use crate::telly_packet_counter;
55
56#[derive(Debug, Copy, Clone)]
57pub enum TelemetryTabView {
58 Stream,
59 MergedEvents
60}
61
62pub struct TelemetryTab<'a> {
65 pub theme : ColorTheme,
66 pub tele_recv : Receiver<TelemetryPacket>,
67 pub queue_size : usize, pub merged_queue : VecDeque<MergedEvent>,
69 pub header_queue : VecDeque<TelemetryHeader>,
70 pub tp_sender : Option<Sender<TofPacket>>,
73 pub view : TelemetryTabView,
74 pub pack_map : HashMap<&'a str, usize>,
75 start_time : Instant,
76}
77
78impl TelemetryTab<'_> {
79 pub fn new(tp_sender : Option<Sender<TofPacket>>,
80 tele_recv : Receiver<TelemetryPacket>,
81 theme : ColorTheme) -> Self {
82 Self {
83 theme,
84 tele_recv : tele_recv,
85 queue_size : 20000,
86 merged_queue : VecDeque::<MergedEvent>::new(),
87 header_queue : VecDeque::<TelemetryHeader>::new(),
88 tp_sender : tp_sender,
89 view : TelemetryTabView::Stream,
90 pack_map : HashMap::<&str, usize>::new(),
91 start_time : Instant::now(),
92 }
93 }
94
95 pub fn receive_packet(&mut self) -> Result<(), SerializationError> {
96 match self.tele_recv.try_recv() {
97 Err(crossbeam_channel::TryRecvError::Empty) => {
98 trace!("No data available yet.");
99 },
101 Err(crossbeam_channel::TryRecvError::Disconnected) => {
102 error!("Telemetry channel disconnected.");
103 return Err(SerializationError::Disconnected);
105 },
106 Ok(packet) => {
107 let telly_ptype = TelemetryPacketType::from(packet.header.ptype);
109 telly_packet_counter(&mut self.pack_map, &telly_ptype);
110
111 self.header_queue.push_back(packet.header.clone());
112 if self.header_queue.len() > self.queue_size {
113 let _ = self.header_queue.pop_front();
114 }
115 if packet.header.ptype == 92 {
116 match TofPacket::from_bytestream(&packet.payload, &mut 0) {
117 Err(err) => {
118 error!("Unable to decode AnyHKpacket! {err}");
119 }
120 Ok(tpack) => {
121 if let Some(sender) = &self.tp_sender {
122 if let Err(err) = sender.send(tpack) {
123 error!("Unable to send TP over channel! {err}");
124 }
125 }
126 }
127 }
128 }
129
130 if packet.header.ptype == 90 || packet.header.ptype == 191 {
131 let expected_size = (packet.header.length as usize) - TelemetryHeader::SIZE;
132 if expected_size > packet.payload.len() {
133 error!("Unable to decode MergedEvent Telemetry packet of type {}! The expected size is {}, but the payload len is {}", packet.header.ptype, expected_size - TelemetryHeader::SIZE, packet.payload.len());
134 return Err(SerializationError::StreamTooShort);
135 }
136 match MergedEvent::from_bytestream(&packet.payload, &mut 0) {
137 Err(err) => {
138 error!("Unable to decode MergedEvent Telemetry packet of type {}! The expected size is {}, but the payload len is {}! {err}", packet.header.ptype, expected_size - TelemetryHeader::SIZE, packet.payload.len());
139 }
140 Ok(me) => {
141 if let Some(sender) = &self.tp_sender {
142 let ts = me.tof_event.clone();
143 let tp = ts.pack();
145 if let Err(err) = sender.send(tp) {
146 error!("Unable to send TP over channel! {err}");
147 }
148 }
149 self.merged_queue.push_back(me);
150 if self.merged_queue.len() > self.queue_size {
151 let _ = self.merged_queue.pop_front();
152 }
153 }
154 }
155 }
156 }
157 }
158 Ok(())
159 }
160
161 pub fn render(&mut self, main_window: &Rect, frame: &mut Frame) {
162 match self.view {
163 TelemetryTabView::Stream => {
164 let main_lo = Layout::default()
166 .direction(Direction::Horizontal)
167 .constraints(
168 [Constraint::Percentage(33),
169 Constraint::Percentage(33),
170 Constraint::Percentage(34)].as_ref(),
171 )
172 .split(*main_window);
173
174 let packet_lo = Layout::default()
175 .direction(Direction::Vertical)
176 .constraints(
177 [Constraint::Percentage(30), Constraint::Percentage(70)].as_ref(),
178 )
179 .split(main_lo[0]);
180
181 let header_string = if let Some(header) = self.header_queue.back() {
183 format!("{}", header)
184 } else {
185 String::from("No header available")
186 };
187
188 let header_view = Paragraph::new(header_string)
189 .style(self.theme.style())
190 .alignment(Alignment::Left)
191 .block(
192 Block::default()
193 .borders(Borders::ALL)
194 .border_type(BorderType::Rounded)
195 .title("Last Header from Telemetry stream")
196 );
197
198 frame.render_widget(header_view, packet_lo[0]);
199
200 let merged_string = if let Some(ev) = self.merged_queue.back() {
202 format!("{}", ev)
203 } else {
204 String::from("No merged event available")
205 };
206
207 let merged_view = Paragraph::new(merged_string)
208 .style(self.theme.style())
209 .alignment(Alignment::Left)
210 .block(
211 Block::default()
212 .borders(Borders::ALL)
213 .border_type(BorderType::Rounded)
214 .title("Last MergedEvent from Telemetry stream")
215 );
216
217 frame.render_widget(merged_view, packet_lo[1]);
218
219 let mut rows = Vec::<Row>::new();
221 let mut sum_pack = 0;
222 let passed_time = self.start_time.elapsed().as_secs_f64();
223 for k in self.pack_map.keys() {
224 if self.pack_map[k] != 0 {
226 sum_pack += self.pack_map[k];
227 if k.contains("Heart"){
228 rows.push(Row::new(vec![format!(" \u{1f493} {:.1}", self.pack_map[k]),
229 format!("{:.1}", (self.pack_map[k] as f64)/passed_time,),
230 format!("[{}]", k)]));
231 } else {
232 rows.push(Row::new(vec![format!(" \u{279f} {:.1}", self.pack_map[k]),
233 format!("{:.1}", (self.pack_map[k] as f64)/passed_time,),
234 format!("[{}]", k)]));
235 }
236 }
237 }
238 rows.push(Row::new(vec![" \u{FE4C}\u{FE4C}\u{FE4C}","\u{FE4C}\u{FE4C}","\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}"]));
239 rows.push(Row::new(vec![format!(" \u{279f}{}", sum_pack),
240 format!("{:.1}/s", (sum_pack as f64)/passed_time),
241 format!("[TOTAL]")]));
242
243 let widths = [Constraint::Percentage(30),
244 Constraint::Percentage(20),
245 Constraint::Percentage(50)];
246 let table = Table::new(rows, widths)
247 .column_spacing(1)
248 .header(
249 Row::new(vec![" N", "\u{1f4e6}/s", "Type"])
250 .bottom_margin(1)
251 .top_margin(1)
252 .style(Style::new().add_modifier(Modifier::UNDERLINED))
253 )
254 .block(Block::new()
255 .title("Telemetry Packet summary \u{1f4e6}")
256 .borders(Borders::ALL)
257 )
259 .style(self.theme.style());
260 frame.render_widget(table, main_lo[2])
261 }
262 TelemetryTabView::MergedEvents => {
263 }
264 }
265 }
266}