1use std::collections::{
2 HashMap,
3 VecDeque,
4};
5
6use std::time::Instant;
7
8use crossbeam_channel::{
14 Receiver,
15 Sender,
16};
17
18use telemetry_dataclasses::packets::{
19 TelemetryHeader,
20 TelemetryPacket,
21 MergedEvent,
22};
24
25use ratatui::prelude::*;
26
27use ratatui::Frame;
30use ratatui::layout::Rect;
31use ratatui::widgets::{
32 Block,
33 BorderType,
34 Borders,
35 Paragraph,
36 Table,
37 Row,
38};
39
40use tof_dataclasses::serialization::{
41 Serialization,
42};
44use tof_dataclasses::errors::SerializationError;
45use tof_dataclasses::packets::{
46 TofPacket,
47};
49
50use telemetry_dataclasses::packets::TelemetryPacketType;
51
52use crate::colors::ColorTheme;
53use crate::telly_packet_counter;
54
55#[derive(Debug, Copy, Clone)]
56pub enum TelemetryTabView {
57 Stream,
58 MergedEvents
59}
60
61pub struct TelemetryTab<'a> {
64 pub theme : ColorTheme,
65 pub tele_recv : Receiver<TelemetryPacket>,
66 pub queue_size : usize, pub merged_queue : VecDeque<MergedEvent>,
68 pub header_queue : VecDeque<TelemetryHeader>,
69 pub tp_sender : Option<Sender<TofPacket>>,
72 pub view : TelemetryTabView,
73 pub pack_map : HashMap<&'a str, usize>,
74 start_time : Instant,
75}
76
77impl TelemetryTab<'_> {
78 pub fn new(tp_sender : Option<Sender<TofPacket>>,
79 tele_recv : Receiver<TelemetryPacket>,
80 theme : ColorTheme) -> Self {
81 Self {
82 theme,
83 tele_recv : tele_recv,
84 queue_size : 20000,
85 merged_queue : VecDeque::<MergedEvent>::new(),
86 header_queue : VecDeque::<TelemetryHeader>::new(),
87 tp_sender : tp_sender,
88 view : TelemetryTabView::Stream,
89 pack_map : HashMap::<&str, usize>::new(),
90 start_time : Instant::now(),
91 }
92 }
93
94 pub fn receive_packet(&mut self) -> Result<(), SerializationError> {
95 match self.tele_recv.try_recv() {
96 Err(crossbeam_channel::TryRecvError::Empty) => {
97 trace!("No data available yet.");
98 },
100 Err(crossbeam_channel::TryRecvError::Disconnected) => {
101 error!("Telemetry channel disconnected.");
102 return Err(SerializationError::Disconnected);
104 },
105 Ok(packet) => {
106 let telly_ptype = TelemetryPacketType::from(packet.header.ptype);
108 telly_packet_counter(&mut self.pack_map, &telly_ptype);
109
110 self.header_queue.push_back(packet.header.clone());
111 if self.header_queue.len() > self.queue_size {
112 let _ = self.header_queue.pop_front();
113 }
114 if packet.header.ptype == 92 {
115 match TofPacket::from_bytestream(&packet.payload, &mut 0) {
116 Err(err) => {
117 error!("Unable to decode AnyHKpacket! {err}");
118 }
119 Ok(tpack) => {
120 if let Some(sender) = &self.tp_sender {
121 if let Err(err) = sender.send(tpack) {
122 error!("Unable to send TP over channel! {err}");
123 }
124 }
125 }
126 }
127 }
128
129 if packet.header.ptype == 90 || packet.header.ptype == 191 {
130 let expected_size = (packet.header.length as usize) - TelemetryHeader::SIZE;
131 if expected_size > packet.payload.len() {
132 error!("Unable to decode MergedEvent Telemetry packet of type {}! The expected size is {}, but the payload len is {}", packet.header.ptype, expected_size - TelemetryHeader::SIZE, packet.payload.len());
133 return Err(SerializationError::StreamTooShort);
134 }
135 match MergedEvent::from_bytestream(&packet.payload, &mut 0) {
136 Err(err) => {
137 error!("Unable to decode MergedEvent Telemetry packet of type {}! The expected size is {}, but the payload len is {}! {err}", packet.header.ptype, expected_size - TelemetryHeader::SIZE, packet.payload.len());
138 }
139 Ok(me) => {
140 if let Some(sender) = &self.tp_sender {
141 match TofPacket::from_bytestream(&me.tof_data, &mut 0) {
142 Err(err) => {
143 error!("Can't unpack TofPacket! {err}");
144 }
145 Ok(tp) => {
146 if let Err(err) = sender.send(tp) {
147 error!("Unable to send TP over channel! {err}");
148 }
149 }
150 }
151 }
152 self.merged_queue.push_back(me);
153 if self.merged_queue.len() > self.queue_size {
154 let _ = self.merged_queue.pop_front();
155 }
156 }
157 }
158 }
159 }
160 }
161 Ok(())
162 }
163
164 pub fn render(&mut self, main_window: &Rect, frame: &mut Frame) {
165 match self.view {
166 TelemetryTabView::Stream => {
167 let main_lo = Layout::default()
169 .direction(Direction::Horizontal)
170 .constraints(
171 [Constraint::Percentage(33),
172 Constraint::Percentage(33),
173 Constraint::Percentage(34)].as_ref(),
174 )
175 .split(*main_window);
176
177 let packet_lo = Layout::default()
178 .direction(Direction::Vertical)
179 .constraints(
180 [Constraint::Percentage(30), Constraint::Percentage(70)].as_ref(),
181 )
182 .split(main_lo[0]);
183
184 let header_string = if let Some(header) = self.header_queue.back() {
186 format!("{}", header)
187 } else {
188 String::from("No header available")
189 };
190
191 let header_view = Paragraph::new(header_string)
192 .style(self.theme.style())
193 .alignment(Alignment::Left)
194 .block(
195 Block::default()
196 .borders(Borders::ALL)
197 .border_type(BorderType::Rounded)
198 .title("Last Header from Telemetry stream")
199 );
200
201 frame.render_widget(header_view, packet_lo[0]);
202
203 let merged_string = if let Some(ev) = self.merged_queue.back() {
205 format!("{}", ev)
206 } else {
207 String::from("No merged event available")
208 };
209
210 let merged_view = Paragraph::new(merged_string)
211 .style(self.theme.style())
212 .alignment(Alignment::Left)
213 .block(
214 Block::default()
215 .borders(Borders::ALL)
216 .border_type(BorderType::Rounded)
217 .title("Last MergedEvent from Telemetry stream")
218 );
219
220 frame.render_widget(merged_view, packet_lo[1]);
221
222 let mut rows = Vec::<Row>::new();
224 let mut sum_pack = 0;
225 let passed_time = self.start_time.elapsed().as_secs_f64();
226 for k in self.pack_map.keys() {
227 if self.pack_map[k] != 0 {
229 sum_pack += self.pack_map[k];
230 if k.contains("Heart"){
231 rows.push(Row::new(vec![format!(" \u{1f493} {:.1}", self.pack_map[k]),
232 format!("{:.1}", (self.pack_map[k] as f64)/passed_time,),
233 format!("[{}]", k)]));
234 } else {
235 rows.push(Row::new(vec![format!(" \u{279f} {:.1}", self.pack_map[k]),
236 format!("{:.1}", (self.pack_map[k] as f64)/passed_time,),
237 format!("[{}]", k)]));
238 }
239 }
240 }
241 rows.push(Row::new(vec![" \u{FE4C}\u{FE4C}\u{FE4C}","\u{FE4C}\u{FE4C}","\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}"]));
242 rows.push(Row::new(vec![format!(" \u{279f}{}", sum_pack),
243 format!("{:.1}/s", (sum_pack as f64)/passed_time),
244 format!("[TOTAL]")]));
245
246 let widths = [Constraint::Percentage(30),
247 Constraint::Percentage(20),
248 Constraint::Percentage(50)];
249 let table = Table::new(rows, widths)
250 .column_spacing(1)
251 .header(
252 Row::new(vec![" N", "\u{1f4e6}/s", "Type"])
253 .bottom_margin(1)
254 .top_margin(1)
255 .style(Style::new().add_modifier(Modifier::UNDERLINED))
256 )
257 .block(Block::new()
258 .title("Telemetry Packet summary \u{1f4e6}")
259 .borders(Borders::ALL)
260 )
262 .style(self.theme.style());
263 frame.render_widget(table, main_lo[2])
264 }
265 TelemetryTabView::MergedEvents => {
266 }
267 }
268 }
269}