liftof_tui/
lib.rs

1#[macro_use] extern crate log;
2
3pub mod menu;
4pub mod colors;
5pub mod widgets;
6pub mod tabs;
7pub mod layout;
8
9use std::sync::Mutex;
10use std::sync::Arc;
11
12use std::collections::HashMap;
13use std::collections::VecDeque;
14
15use tui_logger::TuiLoggerWidget;
16use ratatui::{
17  style::{
18    Color,
19    Style,
20  },
21  widgets::{
22    Block,
23    Borders,
24  },
25};
26
27pub use crate::tabs::*;
28pub use crate::layout::*;
29
30use crate::colors::ColorTheme;
31
32#[cfg(feature = "telemetry")]
33use telemetry_dataclasses::packets::{
34  TelemetryPacketType,
35  TelemetryPacket,
36  TelemetryHeader
37};
38
39use tof_dataclasses::packets::TofPacket;
40use tof_dataclasses::packets::PacketType;
41use tof_dataclasses::events::MasterTriggerEvent;
42use tof_dataclasses::events::TofEventSummary;
43use tof_dataclasses::events::TofHit;
44use tof_dataclasses::serialization::Packable;
45use tof_dataclasses::serialization::Serialization;
46use tof_dataclasses::io::TofPacketWriter;
47
48use crossbeam_channel::{
49  Sender,
50  Receiver
51};
52
53#[cfg(feature = "telemetry")]
54/// A map which keeps track of the types of telemetry packets 
55/// received
56pub fn telly_packet_counter(pack_map : &mut HashMap<&str, usize>, packet_type : &TelemetryPacketType) {
57  let pack_key : &str;
58  match packet_type {
59    TelemetryPacketType::Unknown            => pack_key = "Unknown",
60    TelemetryPacketType::CardHKP            => pack_key = "CardHKP",
61    TelemetryPacketType::CoolingHK          => pack_key = "CoolingHKP",
62    TelemetryPacketType::PDUHK              => pack_key = "PDUHK",
63    TelemetryPacketType::Tracker            => pack_key = "Tracker",
64    TelemetryPacketType::TrackerDAQCntr     => pack_key = "TrakcerDAQCntr",
65    TelemetryPacketType::GPS                => pack_key = "GPS",
66    TelemetryPacketType::TrkTempLeak        => pack_key = "TrkTempLeak",
67    TelemetryPacketType::BoringEvent        => pack_key = "BoringEvent",
68    TelemetryPacketType::RBWaveform         => pack_key = "RBWaveform",
69    TelemetryPacketType::AnyTofHK           => pack_key = "AnyTofHK",
70    TelemetryPacketType::GcuEvtBldSettings  => pack_key = "GcuEvtBldSettings",
71    TelemetryPacketType::LabJackHK          => pack_key = "LabJackHK",
72    TelemetryPacketType::MagHK              => pack_key = "MagHK",
73    TelemetryPacketType::GcuMon             => pack_key = "GcuMon",
74    TelemetryPacketType::InterestingEvent   => pack_key = "InterestingEvent",
75    TelemetryPacketType::NoGapsTriggerEvent => pack_key = "NoGapsTriggerEvent",
76    TelemetryPacketType::NoTofDataEvent     => pack_key = "NoTofDataEvent",
77    TelemetryPacketType::Ack                => pack_key = "Ack",     
78    TelemetryPacketType::AnyTrackerHK       => pack_key = "AnyTrackerHK",
79    TelemetryPacketType::TmP33              => pack_key = "TmP33",
80    TelemetryPacketType::TmP34              => pack_key = "TmP34",
81    TelemetryPacketType::TmP37              => pack_key = "TmP37",
82    TelemetryPacketType::TmP38              => pack_key = "TmP38",
83    TelemetryPacketType::TmP55              => pack_key = "TmP55",
84    TelemetryPacketType::TmP64              => pack_key = "TmP64",
85    TelemetryPacketType::TmP96              => pack_key = "TmP96",
86    TelemetryPacketType::TmP214             => pack_key = "TmP214",
87  //_                              => pack_key = "Unknown",
88  }
89  if pack_map.get(pack_key).is_some() {
90    *pack_map.get_mut(pack_key).unwrap() += 1;
91  } else {
92    pack_map.insert(pack_key, 0);
93  }
94}
95
96/// Use the TuiLoggerWidget to display 
97/// the most recent log messages
98///
99///
100pub fn render_logs<'a>(theme : ColorTheme) -> TuiLoggerWidget<'a> {
101  TuiLoggerWidget::default()
102    .style_error(Style::default().fg(Color::Red))
103    .style_debug(Style::default().fg(Color::Green))
104    .style_warn(Style::default().fg(Color::Yellow))
105    .style_trace(Style::default().fg(Color::Gray))
106    .style_info(Style::default().fg(Color::Blue))
107    .block(
108      Block::default()
109        .title("Logs")
110        .border_style(theme.style())
111        .borders(Borders::ALL),
112    )   
113    .style(theme.style())
114}
115
116/// Count the different types of tofpackets and store the result 
117/// in a HashMap
118///
119/// # Arguments:
120///
121///   * packet_type : TofPacket type to llokup it's position in 
122///                   the map
123///   * packet_map  : An arc/mutex to the HashMap we use to store
124///                   the counted values in.
125fn packet_sorter(packet_type : &PacketType,
126                 packet_map  : &Arc<Mutex<HashMap<&str,usize>>>) {
127  match packet_map.lock() {
128    Ok(mut pm) => {
129      let pack_key : &str;
130      match packet_type {
131        PacketType::Unknown               => pack_key = "Unknown", 
132        PacketType::RBEvent               => pack_key = "RBEvent",
133        PacketType::TofEvent              => pack_key = "TofEvent",
134        PacketType::RBWaveform            => pack_key = "RBWaveform",
135        PacketType::TofEventSummary       => pack_key = "TofEventSummary",
136        PacketType::HeartBeatDataSink     => pack_key = "HeartBeatDataSink",    
137        PacketType::MasterTrigger         => pack_key = "MasterTrigger",
138        PacketType::TriggerConfig         => pack_key = "TriggerConfig",
139        PacketType::MTBHeartbeat          => pack_key = "MTBHeartbeat", 
140        PacketType::EVTBLDRHeartbeat      => pack_key = "EVTBLDRHeartbeat",
141        PacketType::RBChannelMaskConfig   => pack_key = "RBChannelMaskConfig",
142        PacketType::TofRBConfig           => pack_key = "TofRBConfig",
143        PacketType::AnalysisEngineConfig  => pack_key = "AnalysisEngineConfig",
144        PacketType::RBEventHeader         => pack_key = "RBEventHeader",    // needs to go away
145        PacketType::TOFEventBuilderConfig => pack_key = "TOFEventBuilderConfig",
146        PacketType::DataPublisherConfig   => pack_key = "DataPublisherConfig",
147        PacketType::TofRunConfig          => pack_key = "TofRunConfig",
148        PacketType::CPUMoniData           => pack_key = "CPUMoniData",
149        PacketType::MonitorMtb            => pack_key = "MonitorMtb",
150        PacketType::RBMoniData            => pack_key = "RBMoniData",
151        PacketType::PBMoniData            => pack_key = "PBMoniData",
152        PacketType::LTBMoniData           => pack_key = "LTBMoniData",
153        PacketType::PAMoniData            => pack_key = "PAMoniData",
154        PacketType::RBEventMemoryView     => pack_key = "RBEventMemoryView", // We'll keep it for now - indicates that the event
155        PacketType::RBCalibration         => pack_key = "RBCalibration",
156        PacketType::TofCommand            => pack_key = "TofCommand",
157        PacketType::TofCommandV2          => pack_key = "TofCommandV2",
158        PacketType::TofResponse           => pack_key = "TofResponse",
159        PacketType::RBCommand             => pack_key = "RBCommand",
160        PacketType::RBPing                => pack_key = "RBPing",
161        PacketType::PreampBiasConfig      => pack_key = "PreampBiasConfig",
162        PacketType::RunConfig             => pack_key = "RunConfig",
163        PacketType::LTBThresholdConfig    => pack_key = "LTBThresholdConfig",
164        PacketType::TofDetectorStatus     => pack_key = "TofDetectorStatus",
165        PacketType::ConfigBinary          => pack_key = "ConfigBinary",
166        PacketType::LiftofRBBinary        => pack_key = "LiftofRBBinary",
167        PacketType::LiftofBinaryService   => pack_key = "LiftofBinaryService",
168        PacketType::LiftofCCBinary        => pack_key = "LiftofCCBinary",
169        PacketType::RBCalibrationFlightV  => pack_key = "RBCalibrationFlightV",
170        PacketType::RBCalibrationFlightT  => pack_key = "RBCalibrationFlightT",
171        PacketType::BfswAckPacket         => pack_key = "BfswAckPacket",
172        PacketType::MultiPacket           => pack_key = "MultiPacket",
173      }
174      if pm.get(pack_key).is_some() {
175        *pm.get_mut(pack_key).unwrap() += 1;
176      } else {
177        pm.insert(pack_key, 0);
178      }
179    }
180    Err(err) => {
181      error!("Can't lock shared memory! {err}");
182    }
183  }
184}
185
186/// Receive packets from an incoming stream
187/// and distrubute them to their receivers
188/// while taking notes of everything
189///
190/// This is a Pablo Pubsub kind of persona
191/// (see a fantastic talk at RustConf 2023)
192pub fn packet_distributor(tp_from_sock : Receiver<TofPacket>,
193                          tp_sender_mt : Sender<TofPacket>,
194                          tp_sender_rb : Sender<TofPacket>,
195                          tp_sender_ev : Sender<TofPacket>,
196                          tp_sender_cp : Sender<TofPacket>,
197                          tp_sender_tr : Sender<TofPacket>,
198                          rbwf_sender  : Sender<TofPacket>,
199                          ts_send      : Sender<TofEventSummary>,
200                          th_send      : Sender<TofHit>,
201                          tp_sender_hb : Sender<TofPacket>,
202                          str_list     : Arc<Mutex<VecDeque<String>>>,
203                          pck_map      : Arc<Mutex<HashMap<&str, usize>>>,
204                          mut writer   : Option<TofPacketWriter>) {
205  let mut n_pack = 0usize;
206  // per default, we create master trigger packets from TofEventSummary, 
207  // except we have "real" mtb packets
208  let mut craft_mte_packets = true;
209
210  loop {
211    //match data_socket.recv_bytes(0) {
212    match tp_from_sock.recv() {
213      Err(err) => error!("Can't receive TofPacket! {err}"),
214      Ok(tp) => {
215        //println!("{:?}", pck_map);
216        packet_sorter(&tp.packet_type, &pck_map);
217        n_pack += 1;
218        //println!("Got TP {}", tp);
219        match str_list.lock() {
220          Err(err) => error!("Can't lock shared memory! {err}"),
221          Ok(mut _list)    => {
222            //let prefix  = String::from_utf8(payload[0..4].to_vec()).expect("Can't get prefix!");
223            //let message = format!("{}-{} {}", n_pack,prefix, tp.to_string());
224            let message = format!("{} : {}", n_pack, tp);
225            _list.push_back(message);
226          }
227        }
228        // if captured, write file
229        if writer.is_some() {
230          writer.as_mut().unwrap().add_tof_packet(&tp);
231        }
232        match tp.packet_type {
233          PacketType::TofResponse => { 
234            match tp_sender_tr.send(tp) {
235              Err(err) => error!("Can't send TP! {err}"),
236              Ok(_)    => (),
237            }
238          }
239          PacketType::MonitorMtb |
240          PacketType::MasterTrigger => {
241            // apparently, we are getting MasterTriggerEvents, 
242            // sow we won't be needing to craft them from 
243            // TofEventSummary packets
244            if tp.packet_type == PacketType::MasterTrigger {
245              craft_mte_packets = false;
246            }
247            match tp_sender_mt.send(tp) {
248              Err(err) => error!("Can't send TP! {err}"),
249              Ok(_)    => (),
250            }
251          },
252          PacketType::RBWaveform => {
253            match rbwf_sender.send(tp) {
254              Err(err) => error!("Can't send TP! {err}"),
255              Ok(_)    => (),
256            }
257          }
258          PacketType::TofEventSummary => {
259            match TofEventSummary::from_tofpacket(&tp) {
260              Err(err) => {
261                error!("Unable to unpack TofEventSummary! {err}");
262              }
263              Ok(ts) => {
264                if craft_mte_packets {
265                  let mte    = MasterTriggerEvent::from(&ts);
266                  let mte_tp = mte.pack();
267                  //error!("We are sending the following tp {}", mte_tp);
268                  match tp_sender_mt.send(mte_tp) {
269                    Err(err) => error!("Can't send MTE TP! {err}"),
270                    Ok(_)    => ()
271                  }
272                }
273                for h in &ts.hits {
274                  match th_send.send(*h) {
275                    Err(err) => error!("Can't send TP! {err}"),
276                    Ok(_)    => (),
277                  }
278                }
279                match ts_send.send(ts) {
280                  Err(err) => error!("Can't send TP! {err}"),
281                  Ok(_)    => (),
282                }
283              }
284            }
285          }
286          PacketType::TofEvent => {
287            // since the tof event contains MTEs, we don't need
288            // to craft them
289            craft_mte_packets = false;
290            match tp_sender_ev.send(tp) {
291              Err(err) => error!("Can't send TP! {err}"),
292              Ok(_)    => (),
293            }
294            // Disasemble the packets
295            //match TofEvent::from_bytestream(tp.payload, &mut 0) {
296            //  Err(err) => {
297            //    error!("Can't decode TofEvent");
298            //  },
299            //  Ok(ev) => {
300            //    //for rbev in ev.rb_events {
301            //    //  let 
302            //    //  match tp_sender_rb.send
303            //    //}
304            //  }
305            //}
306          }
307          PacketType::RBEvent |
308          PacketType::RBEventMemoryView | 
309          PacketType::LTBMoniData |
310          PacketType::PAMoniData  |
311          PacketType::PBMoniData  |
312          PacketType::RBMoniData => {
313            match tp_sender_rb.send(tp) {
314              Err(err) => error!("Can't send TP! {err}"),
315              Ok(_)    => (),
316            }
317          }
318          PacketType::CPUMoniData => {
319            match tp_sender_cp.send(tp) {
320              Err(err) => error!("Can't send TP! {err}"),
321              Ok(_)    => (),
322            }
323          }
324          PacketType::HeartBeatDataSink |
325          PacketType::EVTBLDRHeartbeat  | 
326          PacketType::MTBHeartbeat      => {
327            match tp_sender_hb.send(tp) {
328              Err(err) => error!("Can't send TP! {err}"),
329              Ok(_)    => {
330              },
331            }
332          }
333          _ => () 
334        }
335      }
336    } 
337  }
338}
339
340/// ZMQ socket wrapper for the zmq socket which is 
341/// supposed to receive data from the TOF system.
342pub fn socket_wrap_tofstream(address   : &str,
343                             tp_sender : Sender<TofPacket>) {
344  let ctx = zmq::Context::new();
345  // FIXME - don't hardcode this IP
346  let socket = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
347  socket.connect(address).expect("Unable to connect to data (PUB) socket {adress}");
348  socket.set_subscribe(b"").expect("Can't subscribe to any message on 0MQ socket!");
349  //let mut n_pack = 0usize;
350  info!("0MQ SUB socket connected to address {address}");
351  // per default, we create master trigger packets from TofEventSummary, 
352  // except we have "real" mtb packets
353  //let mut craft_mte_packets = true;
354  loop {
355    match socket.recv_bytes(0) {
356      Err(err) => error!("Can't receive TofPacket! {err}"),
357      Ok(payload)    => {
358        match TofPacket::from_bytestream(&payload, &mut 0) {
359          Ok(tp) => {
360            match tp_sender.send(tp) {
361              Ok(_) => (),
362              Err(err) => error!("Can't send TofPacket over channel! {err}")
363            }
364          }
365          Err(err) => {
366            debug!("Can't decode payload! {err}");
367            // that might have an RB prefix, forward 
368            // it 
369            match TofPacket::from_bytestream(&payload, &mut 4) {
370              Err(err) => {
371                error!("Don't understand bytestream! {err}"); 
372              },
373              Ok(tp) => {
374                match tp_sender.send(tp) {
375                  Ok(_) => (),
376                  Err(err) => error!("Can't send TofPacket over channel! {err}")
377                }
378              }
379            }
380          }  
381        }
382      }
383    }
384  }
385}
386
387cfg_if::cfg_if! {
388  if #[cfg(feature = "telemetry")]  {
389    //use telemetry_dataclasses::packets::{
390    //  TelemetryHeader,
391    //  TelemetryPacket,
392    //};
393
394    /// Get the GAPS merged event telemetry stream and 
395    /// broadcast it to the relevant tab
396    ///
397    /// # Arguments
398    ///
399    /// * address     : Address to susbscribe to for telemetry 
400    ///                 stream (must be zmq.PUB) on the Sender
401    ///                 side
402    /// * cachesize   : Getting the packets from the funneled stream leads
403    ///                 to duplicates. To eliminate these, we store the 
404    ///                 packet counter variable in a Dequee of a given 
405    ///                 size
406    /// * tele_sender : Channel to forward the received telemetry
407    ///                 packets
408    pub fn socket_wrap_telemetry(address     : &str,
409                                 cachesize   : usize,
410                                 tele_sender : Sender<TelemetryPacket>) {
411      let ctx = zmq::Context::new();
412      // FIXME - don't hardcode this IP
413      // typically how it is done is that this program runs either on a gse
414      // or there is a local forwarding of the port thrugh ssh
415      //let address : &str = "tcp://127.0.0.1:55555";
416      let socket = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
417      match socket.connect(&address) {
418        Err(err) => {
419          error!("Unable to connect to data (PUB) socket {address}! {err}");
420          panic!("Can not connect to zmq PUB socket!");
421        }
422        Ok(_) => ()
423      }
424      let mut cache = VecDeque::<u16>::with_capacity(cachesize);
425      socket.set_subscribe(b"") .expect("Can't subscribe to any message on 0MQ socket! {err}");
426      loop {
427        match socket.recv_bytes(0) {
428          Err(err)    => error!("Can't receive TofPacket! {err}"),
429          Ok(mut payload) => {
430            match TelemetryHeader::from_bytestream(&payload, &mut 0) {
431              Err(err) => {
432                error!("Can not decode telemtry header! {err}");
433                //for k in pos - 5 .. pos + 5 {
434                //  println!("{}",stream[k]);
435                //}
436              }
437              Ok(header) => {
438                let mut packet = TelemetryPacket::new();
439                if payload.len() > TelemetryHeader::SIZE {
440                  payload.drain(0..TelemetryHeader::SIZE);
441                }
442                if cache.contains(&header.counter) {
443                  // drop this packet
444                  continue;
445                } else {
446                  cache.push_back(header.counter); 
447                }
448                if cache.len() == cachesize {
449                  cache.pop_front();
450                }
451
452                packet.header  = header;
453                packet.payload = payload;
454                match tele_sender.send(packet) {
455                  Err(err) => error!("Can not send telemetry packet to downstream! {err}"),
456                  Ok(_)    => ()
457                }
458              }
459            }
460          }
461        }
462      }
463    }
464  }
465}
466