liftof_tui/tabs/
tab_telemetry.rs

1use std::collections::{
2  HashMap,
3  VecDeque,
4};
5
6use std::time::Instant;
7
8//use std::sync::{
9//  Arc,
10//  Mutex,
11//};
12
13use crossbeam_channel::{
14  Receiver,
15  Sender,
16};
17
18use gondola_core::prelude::*;
19
20//use telemetry_dataclasses::packets::{
21//  TelemetryPacketHeader,
22//  TelemetryPacket,
23//  MergedEvent,
24////  TrackerPacket,
25//};
26
27use ratatui::prelude::*;
28
29//use ratatui::terminal::Frame;
30
31
32use ratatui::Frame;
33use ratatui::layout::Rect;
34use ratatui::widgets::{
35  Block,
36  BorderType,
37  Borders,
38  Paragraph,
39  Table,
40  Row,
41};
42
43//use tof_dataclasses::serialization::{
44//    Serialization,
45////    search_for_u16
46//};
47//use tof_dataclasses::errors::SerializationError;
48//use tof_dataclasses::packets::{
49//  TofPacket,
50//};
51//use tof_dataclasses::serialization::Packable;
52//
53//use telemetry_dataclasses::packets::TelemetryPacketType;
54
55use crate::colors::ColorTheme;
56use crate::telly_packet_counter;
57
58#[derive(Debug, Copy, Clone)]
59pub enum TelemetryTabView {
60  Stream,
61  MergedEvents
62}
63
64// no clone ore debug implemented for 
65// zmq socket
66pub struct TelemetryTab<'a> {
67  pub theme         : ColorTheme, 
68  pub tele_recv     : Receiver<TelemetryPacket>,
69  pub queue_size    : usize, //8
70  pub merged_queue  : VecDeque<TelemetryEvent>,
71  pub header_queue  : VecDeque<TelemetryPacketHeader>,
72  // when we decide to use ONLY the telemetry stream,
73  // we have to pass on the decoded TOF packets
74  pub tp_sender     : Option<Sender<TofPacket>>,
75  pub view          : TelemetryTabView, 
76  pub pack_map      : HashMap<&'a str, usize>,
77  start_time        : Instant,
78}
79
80impl TelemetryTab<'_> {
81  pub fn new(tp_sender   : Option<Sender<TofPacket>>,
82             tele_recv   : Receiver<TelemetryPacket>,
83             theme       : ColorTheme) -> Self {
84    Self {
85      theme,
86      tele_recv    : tele_recv,
87      queue_size   : 20000,
88      merged_queue : VecDeque::<TelemetryEvent>::new(),
89      header_queue : VecDeque::<TelemetryPacketHeader>::new(),
90      tp_sender    : tp_sender,
91      view         : TelemetryTabView::Stream, 
92      pack_map     : HashMap::<&str, usize>::new(),
93      start_time   : Instant::now(),
94    }
95  }
96 
97  pub fn receive_packet(&mut self) -> Result<(), SerializationError> {  
98    match self.tele_recv.try_recv() {
99      Err(crossbeam_channel::TryRecvError::Empty) => {
100        trace!("No data available yet.");
101        // Handle the empty case, possibly by doing nothing or logging.
102      },
103      Err(crossbeam_channel::TryRecvError::Disconnected) => {
104        error!("Telemetry channel disconnected.");
105        // Handle the disconnection, possibly by stopping processing or returning an error.
106        return Err(SerializationError::Disconnected);
107      },
108      Ok(packet) => {
109        // Process the received packet as before
110        let telly_ptype = TelemetryPacketType::from(packet.header.packet_type);
111        telly_packet_counter(&mut self.pack_map, &telly_ptype);
112
113        self.header_queue.push_back(packet.header.clone());
114        if self.header_queue.len() > self.queue_size {
115          let _ = self.header_queue.pop_front();
116        }
117        if packet.header.packet_type == TelemetryPacketType::AnyTofHK {
118          match TofPacket::from_bytestream(&packet.payload, &mut 0) {
119            Err(err) => {
120              error!("Unable to decode AnyHKpacket! {err}");
121            }
122            Ok(tpack) => {
123              if let Some(sender) = &self.tp_sender {
124                if let Err(err) = sender.send(tpack) {
125                  error!("Unable to send TP over channel! {err}");
126                }
127              }
128            }
129          }
130        }
131        
132        if packet.header.packet_type == TelemetryPacketType::BoringEvent 
133          || packet.header.packet_type == TelemetryPacketType::NoGapsTriggerEvent  {
134          let expected_size = (packet.header.length as usize) - TelemetryPacketHeader::SIZE;
135          if expected_size > packet.payload.len() {
136            error!("Unable to decode MergedEvent Telemetry packet of type {}! The expected size is {}, but the payload len is {}", packet.header.packet_type, expected_size - TelemetryPacketHeader::SIZE, packet.payload.len());
137            return Err(SerializationError::StreamTooShort);
138          }
139          match TelemetryEvent::from_bytestream(&packet.payload, &mut 0) {
140            Err(err) => {
141              error!("Unable to decode MergedEvent Telemetry packet of type {}! The expected size is {}, but the payload len is {}! {err}", packet.header.packet_type, expected_size - TelemetryPacketHeader::SIZE, packet.payload.len());
142            }
143            Ok(me) => {
144              if let Some(sender) = &self.tp_sender {
145                let ts = me.tof_event.clone();
146                // FIXME - we need to send TofEventSummary instead of TofPacket
147                let tp = ts.pack();
148                if let Err(err) = sender.send(tp) {
149                  error!("Unable to send TP over channel! {err}");
150                }
151              }
152              self.merged_queue.push_back(me);
153              if self.merged_queue.len() > self.queue_size {
154                let _ = self.merged_queue.pop_front();
155              }
156            }
157          }
158        }
159      }
160    }
161    Ok(())
162  }
163
164  pub fn render(&mut self, main_window: &Rect, frame: &mut Frame) {
165    match self.view {
166      TelemetryTabView::Stream => {
167        // Layout first
168        let main_lo = Layout::default()
169            .direction(Direction::Horizontal)
170            .constraints(
171                [Constraint::Percentage(33),
172                 Constraint::Percentage(33),
173                 Constraint::Percentage(34)].as_ref(),
174            )
175            .split(*main_window);
176        
177        let packet_lo = Layout::default()
178            .direction(Direction::Vertical)
179            .constraints(
180                [Constraint::Percentage(30), Constraint::Percentage(70)].as_ref(),
181            )
182            .split(main_lo[0]);
183  
184        // Create header_string safely
185        let header_string = if let Some(header) = self.header_queue.back() {
186            format!("{}", header)
187        } else {
188            String::from("No header available")
189        };
190        
191        let header_view = Paragraph::new(header_string)
192            .style(self.theme.style())
193            .alignment(Alignment::Left)
194            .block(
195                Block::default()
196                    .borders(Borders::ALL)
197                    .border_type(BorderType::Rounded)
198                    .title("Last Header from Telemetry stream")
199            );
200  
201        frame.render_widget(header_view, packet_lo[0]);
202  
203        // Create merged_string safely
204        let merged_string = if let Some(ev) = self.merged_queue.back() {
205            format!("{}", ev)
206        } else {
207            String::from("No merged event available")
208        };
209        
210        let merged_view = Paragraph::new(merged_string)
211            .style(self.theme.style())
212            .alignment(Alignment::Left)
213            .block(
214                Block::default()
215                    .borders(Borders::ALL)
216                    .border_type(BorderType::Rounded)
217                    .title("Last MergedEvent from Telemetry stream")
218            );
219  
220        frame.render_widget(merged_view, packet_lo[1]);
221  
222        // packet overview table, similar to home tab 
223        let mut rows   = Vec::<Row>::new();
224        let mut sum_pack = 0;
225        let passed_time = self.start_time.elapsed().as_secs_f64();
226        for k in self.pack_map.keys() {
227          //stat_string_render += "  -- -- -- -- -- -- -- -- -- --\n";
228          if self.pack_map[k] != 0 {
229            sum_pack += self.pack_map[k];
230            if k.contains("Heart"){
231              rows.push(Row::new(vec![format!("  \u{1f493} {:.1}", self.pack_map[k]),
232                                      format!("{:.1}", (self.pack_map[k] as f64)/passed_time,),
233                                      format!("[{}]", k)]));
234            } else {
235              rows.push(Row::new(vec![format!("  \u{279f} {:.1}", self.pack_map[k]),
236                                      format!("{:.1}", (self.pack_map[k] as f64)/passed_time,),
237                                      format!("[{}]", k)]));
238            }
239          }
240        }
241        rows.push(Row::new(vec!["  \u{FE4C}\u{FE4C}\u{FE4C}","\u{FE4C}\u{FE4C}","\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}"])); 
242        rows.push(Row::new(vec![format!("  \u{279f}{}", sum_pack),
243                           format!("{:.1}/s", (sum_pack as f64)/passed_time),
244                           format!("[TOTAL]")]));
245        
246        let widths = [Constraint::Percentage(30),
247                      Constraint::Percentage(20),
248                      Constraint::Percentage(50)];
249        let table  = Table::new(rows, widths)
250          .column_spacing(1)
251          .header(
252            Row::new(vec!["  N", "\u{1f4e6}/s", "Type"])
253            .bottom_margin(1)
254            .top_margin(1)
255            .style(Style::new().add_modifier(Modifier::UNDERLINED))
256          )
257          .block(Block::new()
258                 .title("Telemetry Packet summary \u{1f4e6}")
259                 .borders(Borders::ALL)
260                 //.border_type(BorderType::Rounded)
261                 )
262          .style(self.theme.style());
263        frame.render_widget(table, main_lo[2])
264      }
265      TelemetryTabView::MergedEvents => {
266      }
267    }
268  }
269}