liftof_tui/tabs/
tab_telemetry.rs

1use std::collections::{
2  HashMap,
3  VecDeque,
4};
5
6use std::time::Instant;
7
8//use std::sync::{
9//  Arc,
10//  Mutex,
11//};
12
13use crossbeam_channel::{
14  Receiver,
15  Sender,
16};
17
18use telemetry_dataclasses::packets::{
19  TelemetryHeader,
20  TelemetryPacket,
21  MergedEvent,
22//  TrackerPacket,
23};
24
25use ratatui::prelude::*;
26
27//use ratatui::terminal::Frame;
28
29use ratatui::Frame;
30use ratatui::layout::Rect;
31use ratatui::widgets::{
32  Block,
33  BorderType,
34  Borders,
35  Paragraph,
36  Table,
37  Row,
38};
39
40use tof_dataclasses::serialization::{
41    Serialization,
42//    search_for_u16
43};
44use tof_dataclasses::errors::SerializationError;
45use tof_dataclasses::packets::{
46    TofPacket,
47//    PacketType,
48};
49
50use telemetry_dataclasses::packets::TelemetryPacketType;
51
52use crate::colors::ColorTheme;
53use crate::telly_packet_counter;
54
55#[derive(Debug, Copy, Clone)]
56pub enum TelemetryTabView {
57  Stream,
58  MergedEvents
59}
60
61// no clone ore debug implemented for 
62// zmq socket
63pub struct TelemetryTab<'a> {
64  pub theme         : ColorTheme, 
65  pub tele_recv     : Receiver<TelemetryPacket>,
66  pub queue_size    : usize, //8
67  pub merged_queue  : VecDeque<MergedEvent>,
68  pub header_queue  : VecDeque<TelemetryHeader>,
69  // when we decide to use ONLY the telemetry stream,
70  // we have to pass on the decoded TOF packets
71  pub tp_sender     : Option<Sender<TofPacket>>,
72  pub view          : TelemetryTabView, 
73  pub pack_map      : HashMap<&'a str, usize>,
74  start_time        : Instant,
75}
76
77impl TelemetryTab<'_> {
78  pub fn new(tp_sender   : Option<Sender<TofPacket>>,
79             tele_recv   : Receiver<TelemetryPacket>,
80             theme       : ColorTheme) -> Self {
81    Self {
82      theme,
83      tele_recv    : tele_recv,
84      queue_size   : 20000,
85      merged_queue : VecDeque::<MergedEvent>::new(),
86      header_queue : VecDeque::<TelemetryHeader>::new(),
87      tp_sender    : tp_sender,
88      view         : TelemetryTabView::Stream, 
89      pack_map     : HashMap::<&str, usize>::new(),
90      start_time   : Instant::now(),
91    }
92  }
93 
94  pub fn receive_packet(&mut self) -> Result<(), SerializationError> {  
95    match self.tele_recv.try_recv() {
96      Err(crossbeam_channel::TryRecvError::Empty) => {
97        trace!("No data available yet.");
98        // Handle the empty case, possibly by doing nothing or logging.
99      },
100      Err(crossbeam_channel::TryRecvError::Disconnected) => {
101        error!("Telemetry channel disconnected.");
102        // Handle the disconnection, possibly by stopping processing or returning an error.
103        return Err(SerializationError::Disconnected);
104      },
105      Ok(packet) => {
106        // Process the received packet as before
107        let telly_ptype = TelemetryPacketType::from(packet.header.ptype);
108        telly_packet_counter(&mut self.pack_map, &telly_ptype);
109
110        self.header_queue.push_back(packet.header.clone());
111        if self.header_queue.len() > self.queue_size {
112          let _ = self.header_queue.pop_front();
113        }
114        if packet.header.ptype == 92 {
115          match TofPacket::from_bytestream(&packet.payload, &mut 0) {
116            Err(err) => {
117              error!("Unable to decode AnyHKpacket! {err}");
118            }
119            Ok(tpack) => {
120              if let Some(sender) = &self.tp_sender {
121                if let Err(err) = sender.send(tpack) {
122                  error!("Unable to send TP over channel! {err}");
123                }
124              }
125            }
126          }
127        }
128        
129        if packet.header.ptype == 90 || packet.header.ptype == 191  {
130          let expected_size = (packet.header.length as usize) - TelemetryHeader::SIZE;
131          if expected_size > packet.payload.len() {
132            error!("Unable to decode MergedEvent Telemetry packet of type {}! The expected size is {}, but the payload len is {}", packet.header.ptype, expected_size - TelemetryHeader::SIZE, packet.payload.len());
133            return Err(SerializationError::StreamTooShort);
134          }
135          match MergedEvent::from_bytestream(&packet.payload, &mut 0) {
136            Err(err) => {
137              error!("Unable to decode MergedEvent Telemetry packet of type {}! The expected size is {}, but the payload len is {}! {err}", packet.header.ptype, expected_size - TelemetryHeader::SIZE, packet.payload.len());
138            }
139            Ok(me) => {
140              if let Some(sender) = &self.tp_sender {
141                match TofPacket::from_bytestream(&me.tof_data, &mut 0) {
142                  Err(err) => {
143                    error!("Can't unpack TofPacket! {err}");
144                  }
145                  Ok(tp) => {
146                    if let Err(err) = sender.send(tp) {
147                      error!("Unable to send TP over channel! {err}");
148                    }
149                  }
150                }
151              }
152              self.merged_queue.push_back(me);
153              if self.merged_queue.len() > self.queue_size {
154                let _ = self.merged_queue.pop_front();
155              }
156            }
157          }
158        }
159      }
160    }
161    Ok(())
162  }
163
164  pub fn render(&mut self, main_window: &Rect, frame: &mut Frame) {
165    match self.view {
166      TelemetryTabView::Stream => {
167        // Layout first
168        let main_lo = Layout::default()
169            .direction(Direction::Horizontal)
170            .constraints(
171                [Constraint::Percentage(33),
172                 Constraint::Percentage(33),
173                 Constraint::Percentage(34)].as_ref(),
174            )
175            .split(*main_window);
176        
177        let packet_lo = Layout::default()
178            .direction(Direction::Vertical)
179            .constraints(
180                [Constraint::Percentage(30), Constraint::Percentage(70)].as_ref(),
181            )
182            .split(main_lo[0]);
183  
184        // Create header_string safely
185        let header_string = if let Some(header) = self.header_queue.back() {
186            format!("{}", header)
187        } else {
188            String::from("No header available")
189        };
190        
191        let header_view = Paragraph::new(header_string)
192            .style(self.theme.style())
193            .alignment(Alignment::Left)
194            .block(
195                Block::default()
196                    .borders(Borders::ALL)
197                    .border_type(BorderType::Rounded)
198                    .title("Last Header from Telemetry stream")
199            );
200  
201        frame.render_widget(header_view, packet_lo[0]);
202  
203        // Create merged_string safely
204        let merged_string = if let Some(ev) = self.merged_queue.back() {
205            format!("{}", ev)
206        } else {
207            String::from("No merged event available")
208        };
209        
210        let merged_view = Paragraph::new(merged_string)
211            .style(self.theme.style())
212            .alignment(Alignment::Left)
213            .block(
214                Block::default()
215                    .borders(Borders::ALL)
216                    .border_type(BorderType::Rounded)
217                    .title("Last MergedEvent from Telemetry stream")
218            );
219  
220        frame.render_widget(merged_view, packet_lo[1]);
221  
222        // packet overview table, similar to home tab 
223        let mut rows   = Vec::<Row>::new();
224        let mut sum_pack = 0;
225        let passed_time = self.start_time.elapsed().as_secs_f64();
226        for k in self.pack_map.keys() {
227          //stat_string_render += "  -- -- -- -- -- -- -- -- -- --\n";
228          if self.pack_map[k] != 0 {
229            sum_pack += self.pack_map[k];
230            if k.contains("Heart"){
231              rows.push(Row::new(vec![format!("  \u{1f493} {:.1}", self.pack_map[k]),
232                                      format!("{:.1}", (self.pack_map[k] as f64)/passed_time,),
233                                      format!("[{}]", k)]));
234            } else {
235              rows.push(Row::new(vec![format!("  \u{279f} {:.1}", self.pack_map[k]),
236                                      format!("{:.1}", (self.pack_map[k] as f64)/passed_time,),
237                                      format!("[{}]", k)]));
238            }
239          }
240        }
241        rows.push(Row::new(vec!["  \u{FE4C}\u{FE4C}\u{FE4C}","\u{FE4C}\u{FE4C}","\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}"])); 
242        rows.push(Row::new(vec![format!("  \u{279f}{}", sum_pack),
243                           format!("{:.1}/s", (sum_pack as f64)/passed_time),
244                           format!("[TOTAL]")]));
245        
246        let widths = [Constraint::Percentage(30),
247                      Constraint::Percentage(20),
248                      Constraint::Percentage(50)];
249        let table  = Table::new(rows, widths)
250          .column_spacing(1)
251          .header(
252            Row::new(vec!["  N", "\u{1f4e6}/s", "Type"])
253            .bottom_margin(1)
254            .top_margin(1)
255            .style(Style::new().add_modifier(Modifier::UNDERLINED))
256          )
257          .block(Block::new()
258                 .title("Telemetry Packet summary \u{1f4e6}")
259                 .borders(Borders::ALL)
260                 //.border_type(BorderType::Rounded)
261                 )
262          .style(self.theme.style());
263        frame.render_widget(table, main_lo[2])
264      }
265      TelemetryTabView::MergedEvents => {
266      }
267    }
268  }
269}