liftof_tui/tabs/
tab_telemetry.rs

1use std::collections::{
2  HashMap,
3  VecDeque,
4};
5
6use std::time::Instant;
7
8//use std::sync::{
9//  Arc,
10//  Mutex,
11//};
12
13use crossbeam_channel::{
14  Receiver,
15  Sender,
16};
17
18use telemetry_dataclasses::packets::{
19  TelemetryHeader,
20  TelemetryPacket,
21  MergedEvent,
22//  TrackerPacket,
23};
24
25use ratatui::prelude::*;
26
27//use ratatui::terminal::Frame;
28
29
30use ratatui::Frame;
31use ratatui::layout::Rect;
32use ratatui::widgets::{
33  Block,
34  BorderType,
35  Borders,
36  Paragraph,
37  Table,
38  Row,
39};
40
41use tof_dataclasses::serialization::{
42    Serialization,
43//    search_for_u16
44};
45use tof_dataclasses::errors::SerializationError;
46use tof_dataclasses::packets::{
47  TofPacket,
48};
49use tof_dataclasses::serialization::Packable;
50
51use telemetry_dataclasses::packets::TelemetryPacketType;
52
53use crate::colors::ColorTheme;
54use crate::telly_packet_counter;
55
56#[derive(Debug, Copy, Clone)]
57pub enum TelemetryTabView {
58  Stream,
59  MergedEvents
60}
61
62// no clone ore debug implemented for 
63// zmq socket
64pub struct TelemetryTab<'a> {
65  pub theme         : ColorTheme, 
66  pub tele_recv     : Receiver<TelemetryPacket>,
67  pub queue_size    : usize, //8
68  pub merged_queue  : VecDeque<MergedEvent>,
69  pub header_queue  : VecDeque<TelemetryHeader>,
70  // when we decide to use ONLY the telemetry stream,
71  // we have to pass on the decoded TOF packets
72  pub tp_sender     : Option<Sender<TofPacket>>,
73  pub view          : TelemetryTabView, 
74  pub pack_map      : HashMap<&'a str, usize>,
75  start_time        : Instant,
76}
77
78impl TelemetryTab<'_> {
79  pub fn new(tp_sender   : Option<Sender<TofPacket>>,
80             tele_recv   : Receiver<TelemetryPacket>,
81             theme       : ColorTheme) -> Self {
82    Self {
83      theme,
84      tele_recv    : tele_recv,
85      queue_size   : 20000,
86      merged_queue : VecDeque::<MergedEvent>::new(),
87      header_queue : VecDeque::<TelemetryHeader>::new(),
88      tp_sender    : tp_sender,
89      view         : TelemetryTabView::Stream, 
90      pack_map     : HashMap::<&str, usize>::new(),
91      start_time   : Instant::now(),
92    }
93  }
94 
95  pub fn receive_packet(&mut self) -> Result<(), SerializationError> {  
96    match self.tele_recv.try_recv() {
97      Err(crossbeam_channel::TryRecvError::Empty) => {
98        trace!("No data available yet.");
99        // Handle the empty case, possibly by doing nothing or logging.
100      },
101      Err(crossbeam_channel::TryRecvError::Disconnected) => {
102        error!("Telemetry channel disconnected.");
103        // Handle the disconnection, possibly by stopping processing or returning an error.
104        return Err(SerializationError::Disconnected);
105      },
106      Ok(packet) => {
107        // Process the received packet as before
108        let telly_ptype = TelemetryPacketType::from(packet.header.ptype);
109        telly_packet_counter(&mut self.pack_map, &telly_ptype);
110
111        self.header_queue.push_back(packet.header.clone());
112        if self.header_queue.len() > self.queue_size {
113          let _ = self.header_queue.pop_front();
114        }
115        if packet.header.ptype == 92 {
116          match TofPacket::from_bytestream(&packet.payload, &mut 0) {
117            Err(err) => {
118              error!("Unable to decode AnyHKpacket! {err}");
119            }
120            Ok(tpack) => {
121              if let Some(sender) = &self.tp_sender {
122                if let Err(err) = sender.send(tpack) {
123                  error!("Unable to send TP over channel! {err}");
124                }
125              }
126            }
127          }
128        }
129        
130        if packet.header.ptype == 90 || packet.header.ptype == 191  {
131          let expected_size = (packet.header.length as usize) - TelemetryHeader::SIZE;
132          if expected_size > packet.payload.len() {
133            error!("Unable to decode MergedEvent Telemetry packet of type {}! The expected size is {}, but the payload len is {}", packet.header.ptype, expected_size - TelemetryHeader::SIZE, packet.payload.len());
134            return Err(SerializationError::StreamTooShort);
135          }
136          match MergedEvent::from_bytestream(&packet.payload, &mut 0) {
137            Err(err) => {
138              error!("Unable to decode MergedEvent Telemetry packet of type {}! The expected size is {}, but the payload len is {}! {err}", packet.header.ptype, expected_size - TelemetryHeader::SIZE, packet.payload.len());
139            }
140            Ok(me) => {
141              if let Some(sender) = &self.tp_sender {
142                let ts = me.tof_event.clone();
143                // FIXME - we need to send TofEventSummary instead of TofPacket
144                let tp = ts.pack();
145                if let Err(err) = sender.send(tp) {
146                  error!("Unable to send TP over channel! {err}");
147                }
148              }
149              self.merged_queue.push_back(me);
150              if self.merged_queue.len() > self.queue_size {
151                let _ = self.merged_queue.pop_front();
152              }
153            }
154          }
155        }
156      }
157    }
158    Ok(())
159  }
160
161  pub fn render(&mut self, main_window: &Rect, frame: &mut Frame) {
162    match self.view {
163      TelemetryTabView::Stream => {
164        // Layout first
165        let main_lo = Layout::default()
166            .direction(Direction::Horizontal)
167            .constraints(
168                [Constraint::Percentage(33),
169                 Constraint::Percentage(33),
170                 Constraint::Percentage(34)].as_ref(),
171            )
172            .split(*main_window);
173        
174        let packet_lo = Layout::default()
175            .direction(Direction::Vertical)
176            .constraints(
177                [Constraint::Percentage(30), Constraint::Percentage(70)].as_ref(),
178            )
179            .split(main_lo[0]);
180  
181        // Create header_string safely
182        let header_string = if let Some(header) = self.header_queue.back() {
183            format!("{}", header)
184        } else {
185            String::from("No header available")
186        };
187        
188        let header_view = Paragraph::new(header_string)
189            .style(self.theme.style())
190            .alignment(Alignment::Left)
191            .block(
192                Block::default()
193                    .borders(Borders::ALL)
194                    .border_type(BorderType::Rounded)
195                    .title("Last Header from Telemetry stream")
196            );
197  
198        frame.render_widget(header_view, packet_lo[0]);
199  
200        // Create merged_string safely
201        let merged_string = if let Some(ev) = self.merged_queue.back() {
202            format!("{}", ev)
203        } else {
204            String::from("No merged event available")
205        };
206        
207        let merged_view = Paragraph::new(merged_string)
208            .style(self.theme.style())
209            .alignment(Alignment::Left)
210            .block(
211                Block::default()
212                    .borders(Borders::ALL)
213                    .border_type(BorderType::Rounded)
214                    .title("Last MergedEvent from Telemetry stream")
215            );
216  
217        frame.render_widget(merged_view, packet_lo[1]);
218  
219        // packet overview table, similar to home tab 
220        let mut rows   = Vec::<Row>::new();
221        let mut sum_pack = 0;
222        let passed_time = self.start_time.elapsed().as_secs_f64();
223        for k in self.pack_map.keys() {
224          //stat_string_render += "  -- -- -- -- -- -- -- -- -- --\n";
225          if self.pack_map[k] != 0 {
226            sum_pack += self.pack_map[k];
227            if k.contains("Heart"){
228              rows.push(Row::new(vec![format!("  \u{1f493} {:.1}", self.pack_map[k]),
229                                      format!("{:.1}", (self.pack_map[k] as f64)/passed_time,),
230                                      format!("[{}]", k)]));
231            } else {
232              rows.push(Row::new(vec![format!("  \u{279f} {:.1}", self.pack_map[k]),
233                                      format!("{:.1}", (self.pack_map[k] as f64)/passed_time,),
234                                      format!("[{}]", k)]));
235            }
236          }
237        }
238        rows.push(Row::new(vec!["  \u{FE4C}\u{FE4C}\u{FE4C}","\u{FE4C}\u{FE4C}","\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}"])); 
239        rows.push(Row::new(vec![format!("  \u{279f}{}", sum_pack),
240                           format!("{:.1}/s", (sum_pack as f64)/passed_time),
241                           format!("[TOTAL]")]));
242        
243        let widths = [Constraint::Percentage(30),
244                      Constraint::Percentage(20),
245                      Constraint::Percentage(50)];
246        let table  = Table::new(rows, widths)
247          .column_spacing(1)
248          .header(
249            Row::new(vec!["  N", "\u{1f4e6}/s", "Type"])
250            .bottom_margin(1)
251            .top_margin(1)
252            .style(Style::new().add_modifier(Modifier::UNDERLINED))
253          )
254          .block(Block::new()
255                 .title("Telemetry Packet summary \u{1f4e6}")
256                 .borders(Borders::ALL)
257                 //.border_type(BorderType::Rounded)
258                 )
259          .style(self.theme.style());
260        frame.render_widget(table, main_lo[2])
261      }
262      TelemetryTabView::MergedEvents => {
263      }
264    }
265  }
266}