liftof_tui/tabs/
tab_telemetry.rs

1use std::collections::{
2  HashMap,
3  VecDeque,
4};
5
6use std::time::Instant;
7
8//use std::sync::{
9//  Arc,
10//  Mutex,
11//};
12
13use crossbeam_channel::{
14  Receiver,
15  Sender,
16};
17
18use gondola_core::prelude::*;
19
20use ratatui::prelude::*;
21
22use ratatui::Frame;
23use ratatui::layout::Rect;
24use ratatui::widgets::{
25  Block,
26  BorderType,
27  Borders,
28  Paragraph,
29  Table,
30  Row,
31};
32
33use crate::colors::ColorTheme;
34use crate::telly_packet_counter;
35
36#[derive(Debug, Copy, Clone)]
37pub enum TelemetryTabView {
38  Stream,
39  MergedEvents
40}
41
42// no clone ore debug implemented for 
43// zmq socket
44pub struct TelemetryTab<'a> {
45  pub theme         : ColorTheme, 
46  pub tele_recv     : Receiver<TelemetryPacket>,
47  pub queue_size    : usize, //8
48  pub merged_queue  : VecDeque<TelemetryEvent>,
49  pub header_queue  : VecDeque<TelemetryPacketHeader>,
50  // when we decide to use ONLY the telemetry stream,
51  // we have to pass on the decoded TOF packets
52  pub tp_sender     : Option<Sender<TofPacket>>,
53  pub view          : TelemetryTabView, 
54  pub pack_map      : HashMap<&'a str, usize>,
55  start_time        : Instant,
56}
57
58impl TelemetryTab<'_> {
59  pub fn new(tp_sender   : Option<Sender<TofPacket>>,
60             tele_recv   : Receiver<TelemetryPacket>,
61             theme       : ColorTheme) -> Self {
62    Self {
63      theme,
64      tele_recv    : tele_recv,
65      queue_size   : 20000,
66      merged_queue : VecDeque::<TelemetryEvent>::new(),
67      header_queue : VecDeque::<TelemetryPacketHeader>::new(),
68      tp_sender    : tp_sender,
69      view         : TelemetryTabView::Stream, 
70      pack_map     : HashMap::<&str, usize>::new(),
71      start_time   : Instant::now(),
72    }
73  }
74 
75  pub fn receive_packet(&mut self) -> Result<(), SerializationError> {  
76    match self.tele_recv.try_recv() {
77      Err(crossbeam_channel::TryRecvError::Empty) => {
78        trace!("No data available yet.");
79        // Handle the empty case, possibly by doing nothing or logging.
80      },
81      Err(crossbeam_channel::TryRecvError::Disconnected) => {
82        error!("Telemetry channel disconnected.");
83        // Handle the disconnection, possibly by stopping processing or returning an error.
84        return Err(SerializationError::Disconnected);
85      },
86      Ok(packet) => {
87        // Process the received packet as before
88        let telly_ptype = TelemetryPacketType::from(packet.header.packet_type);
89        telly_packet_counter(&mut self.pack_map, &telly_ptype);
90
91        self.header_queue.push_back(packet.header.clone());
92        if self.header_queue.len() > self.queue_size {
93          let _ = self.header_queue.pop_front();
94        }
95        if packet.header.packet_type == TelemetryPacketType::RBWaveform {
96          match TofPacket::from_bytestream(&packet.payload, &mut 0) {
97            Err(err) => {
98              error!("Unable to decode AnyHKpacket! {err}");
99            }
100            Ok(tpack) => {
101              if let Some(sender) = &self.tp_sender {
102                if let Err(err) = sender.send(tpack) {
103                  error!("Unable to send TP over channel! {err}");
104                }
105              }
106            }
107          }
108        }
109        if packet.header.packet_type == TelemetryPacketType::AnyTofHK {
110          match TofPacket::from_bytestream(&packet.payload, &mut 0) {
111            Err(err) => {
112              error!("Unable to decode AnyHKpacket! {err}");
113            }
114            Ok(tpack) => {
115              if let Some(sender) = &self.tp_sender {
116                if let Err(err) = sender.send(tpack) {
117                  error!("Unable to send TP over channel! {err}");
118                }
119              }
120            }
121          }
122        }
123        
124        if packet.header.packet_type == TelemetryPacketType::BoringEvent 
125          || packet.header.packet_type == TelemetryPacketType::NoGapsTriggerEvent  
126          || packet.header.packet_type == TelemetryPacketType::InterestingEvent {
127          let expected_size = (packet.header.length as usize) - TelemetryPacketHeader::SIZE;
128          if expected_size > packet.payload.len() {
129            error!("Unable to decode MergedEvent Telemetry packet of type {}! The expected size is {}, but the payload len is {}", packet.header.packet_type, expected_size - TelemetryPacketHeader::SIZE, packet.payload.len());
130            return Err(SerializationError::StreamTooShort);
131          }
132          match TelemetryEvent::from_bytestream(&packet.payload, &mut 0) {
133            Err(err) => {
134              error!("Unable to decode MergedEvent Telemetry packet of type {}! The expected size is {}, but the payload len is {}! {err}", packet.header.packet_type, expected_size - TelemetryPacketHeader::SIZE, packet.payload.len());
135            }
136            Ok(me) => {
137              if let Some(sender) = &self.tp_sender {
138                let ts = me.tof_event.clone();
139                // FIXME - we need to send TofEventSummary instead of TofPacket
140                let tp = ts.pack();
141                if let Err(err) = sender.send(tp) {
142                  error!("Unable to send TP over channel! {err}");
143                }
144              }
145              self.merged_queue.push_back(me);
146              if self.merged_queue.len() > self.queue_size {
147                let _ = self.merged_queue.pop_front();
148              }
149            }
150          }
151        }
152      }
153    }
154    Ok(())
155  }
156
157  pub fn render(&mut self, main_window: &Rect, frame: &mut Frame) {
158    match self.view {
159      TelemetryTabView::Stream => {
160        // Layout first
161        let main_lo = Layout::default()
162            .direction(Direction::Horizontal)
163            .constraints(
164                [Constraint::Percentage(33),
165                 Constraint::Percentage(33),
166                 Constraint::Percentage(34)].as_ref(),
167            )
168            .split(*main_window);
169        
170        let packet_lo = Layout::default()
171            .direction(Direction::Vertical)
172            .constraints(
173                [Constraint::Percentage(30), Constraint::Percentage(70)].as_ref(),
174            )
175            .split(main_lo[0]);
176  
177        // Create header_string safely
178        let header_string = if let Some(header) = self.header_queue.back() {
179            format!("{}", header)
180        } else {
181            String::from("No header available")
182        };
183        
184        let header_view = Paragraph::new(header_string)
185            .style(self.theme.style())
186            .alignment(Alignment::Left)
187            .block(
188                Block::default()
189                    .borders(Borders::ALL)
190                    .border_type(BorderType::Rounded)
191                    .title("Last Header from Telemetry stream")
192            );
193  
194        frame.render_widget(header_view, packet_lo[0]);
195  
196        // Create merged_string safely
197        let merged_string = if let Some(ev) = self.merged_queue.back() {
198            format!("{}", ev)
199        } else {
200            String::from("No merged event available")
201        };
202        
203        let merged_view = Paragraph::new(merged_string)
204            .style(self.theme.style())
205            .alignment(Alignment::Left)
206            .block(
207                Block::default()
208                    .borders(Borders::ALL)
209                    .border_type(BorderType::Rounded)
210                    .title("Last MergedEvent from Telemetry stream")
211            );
212  
213        frame.render_widget(merged_view, packet_lo[1]);
214  
215        // packet overview table, similar to home tab 
216        let mut rows   = Vec::<Row>::new();
217        let mut sum_pack = 0;
218        let passed_time = self.start_time.elapsed().as_secs_f64();
219        for k in self.pack_map.keys() {
220          //stat_string_render += "  -- -- -- -- -- -- -- -- -- --\n";
221          if self.pack_map[k] != 0 {
222            sum_pack += self.pack_map[k];
223            if k.contains("Heart"){
224              rows.push(Row::new(vec![format!("  \u{1f493} {:.1}", self.pack_map[k]),
225                                      format!("{:.1}", (self.pack_map[k] as f64)/passed_time,),
226                                      format!("[{}]", k)]));
227            } else {
228              rows.push(Row::new(vec![format!("  \u{279f} {:.1}", self.pack_map[k]),
229                                      format!("{:.1}", (self.pack_map[k] as f64)/passed_time,),
230                                      format!("[{}]", k)]));
231            }
232          }
233        }
234        rows.push(Row::new(vec!["  \u{FE4C}\u{FE4C}\u{FE4C}","\u{FE4C}\u{FE4C}","\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}\u{FE4C}"])); 
235        rows.push(Row::new(vec![format!("  \u{279f}{}", sum_pack),
236                           format!("{:.1}/s", (sum_pack as f64)/passed_time),
237                           format!("[TOTAL]")]));
238        
239        let widths = [Constraint::Percentage(30),
240                      Constraint::Percentage(20),
241                      Constraint::Percentage(50)];
242        let table  = Table::new(rows, widths)
243          .column_spacing(1)
244          .header(
245            Row::new(vec!["  N", "\u{1f4e6}/s", "Type"])
246            .bottom_margin(1)
247            .top_margin(1)
248            .style(Style::new().add_modifier(Modifier::UNDERLINED))
249          )
250          .block(Block::new()
251                 .title("Telemetry Packet summary \u{1f4e6}")
252                 .borders(Borders::ALL)
253                 //.border_type(BorderType::Rounded)
254                 )
255          .style(self.theme.style());
256        frame.render_widget(table, main_lo[2])
257      }
258      TelemetryTabView::MergedEvents => {
259      }
260    }
261  }
262}