liftof_cc/threads/
global_data_sink.rs

1//! Global data sink - a 'funnel' for all packets
2//! generated through the liftof system.
3//!
4//! Each thread of liftof-cc can connect to the 
5//! data sink through a channel and it will 
6//! forward the tof packets to the designated
7//! zmq socket.
8//!
9
10use std::time::{
11  Instant,
12  Duration,
13};
14use std::thread::sleep;
15use std::sync::{
16  Arc,
17  Mutex,
18};
19
20use crossbeam_channel::Receiver; 
21
22use tof_dataclasses::packets::{
23  TofPacket,
24  PacketType
25};
26
27use tof_dataclasses::serialization::{
28  Serialization,
29  Packable,
30};
31
32use tof_dataclasses::io::{
33  TofPacketWriter,
34  FileType,
35};
36
37use tof_dataclasses::heartbeats::HeartBeatDataSink;
38use liftof_lib::thread_control::ThreadControl;
39
40/// Manages "outgoing" 0MQ PUB socket and writing
41/// data to disk
42///
43/// All received packets will be either forwarded
44/// over zmq or saved to disk
45///
46/// # Arguments
47///
48///     * incoming       : incoming connection for TofPackets
49///                        from any source
50///     * thread_control : inter-thread communications,
51///                        start/stop signals.
52///                        Keeps global settings.
53pub fn global_data_sink(incoming       : &Receiver<TofPacket>,
54                        thread_control : Arc<Mutex<ThreadControl>>) {
55  // when the thread starts, we need to wait a bit
56  // till thread_control becomes usable
57  sleep(Duration::from_secs(10));
58  let mut flight_address           = String::from("");
59  let mut mbytes_per_file          = 420usize;
60  let mut write_stream_path        = String::from("");
61  let mut send_tof_summary_packets = false;
62  let mut send_rbwaveform_packets  = false;
63  //let mut send_mtb_event_packets   = false;
64  let mut send_tof_event_packets   = false;
65  let mut write_stream             = false;
66  let mut send_rbwf_every_x_event  = 1;
67  // fixme - smaller hb interfal
68  let mut hb_interval              = Duration::from_secs(20u64);
69  match thread_control.lock() {
70    Ok(mut tc) => {
71      tc.thread_data_sink_active = true; 
72      flight_address             = tc.liftof_settings.data_publisher_settings.fc_pub_address.clone();
73      mbytes_per_file            = tc.liftof_settings.data_publisher_settings.mbytes_per_file; 
74      write_stream_path          = tc.liftof_settings.data_publisher_settings.data_dir.clone();
75      write_stream               = tc.write_data_to_disk;
76      send_tof_summary_packets   = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
77      send_rbwaveform_packets    = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
78      send_tof_event_packets     = tc.liftof_settings.data_publisher_settings.send_tof_event_packets;
79      send_rbwf_every_x_event    = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
80      hb_interval                = Duration::from_secs(tc.liftof_settings.data_publisher_settings.hb_send_interval as u64);
81    },
82    Err(err) => {
83      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
84    },
85  }
86  
87  if send_rbwf_every_x_event == 0 {
88    error!("0 is not a reasonable value for send_rbwf_every_x_event!. We will switch of the sending of RBWaveforms instead!");
89    send_rbwaveform_packets = false;
90  }
91
92  let mut evid_check        = Vec::<u32>::new();
93
94  let ctx = zmq::Context::new();
95  // FIXME - should we just move to another socket if that one is not working?
96  let data_socket = ctx.socket(zmq::PUB).expect("Can not create socket!");
97  let unlim : i32 = 1000000;
98  data_socket.set_sndhwm(unlim).unwrap();
99  //println!("==> Will bind zmq socket to address {}", flight_address);
100  match data_socket.bind(&flight_address) {
101    // FIXEM - this panic is no good! What we want to do instead is
102    // 1) set the flag in thread_control that we are running 
103    // to false, 
104    // 2) enter an eternal loop where we try to restart it
105    Err(err) => panic!("Can not bind to address {}! {}", flight_address, err),
106    Ok(_)    => ()
107  }
108  info!("ZMQ PUB Socket for global data sink bound to {flight_address}");
109
110  //let mut event_cache = Vec::<TofPacket>::with_capacity(100); 
111
112  // for debugging/profiling
113  let mut timer                = Instant::now();
114  let mut check_settings_timer = Instant::now();
115
116  // run settings 
117  let mut writer : Option<TofPacketWriter> = None;
118  let mut runid : u32   = 0;
119  let mut new_run_start = false;
120  let mut retire        = false;
121  let mut heartbeat     = HeartBeatDataSink::new();
122  let mut hb_timer      = Instant::now(); 
123  //let mut rbwf_ctr      = 0u32;
124  loop {
125    if retire {
126      // take a long nap to give other threads 
127      // a chance to finish first
128      warn!("Will end data sink thread in 25 seconds!");
129      println!("= =>Will end data sink thread in 25 seconds!");
130      sleep(Duration::from_secs(25));
131      break;
132    }
133    // even though this is called kill timer, check
134    // the settings in general, since they might have
135    // changed due to remote access.
136    if check_settings_timer.elapsed().as_secs_f32() > 1.5 {
137      match thread_control.try_lock() {
138        Ok(mut tc) => {
139          send_tof_event_packets   = tc.liftof_settings.data_publisher_settings.send_tof_event_packets;      
140          send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
141          send_rbwaveform_packets  = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
142    
143          if tc.stop_flag {
144            tc.thread_data_sink_active = false;
145            // we want to make sure that data sink ends the latest
146            retire = true;
147          } 
148          if tc.new_run_start_flag {
149            new_run_start         = true;
150            write_stream          = tc.write_data_to_disk;
151            write_stream_path     = tc.liftof_settings.data_publisher_settings.data_dir.clone(); 
152            runid                 = tc.run_id;
153            write_stream_path     += &(format!("/{}/", runid));
154            tc.new_run_start_flag = false;
155          }
156        },
157        Err(err) => {
158          error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
159        },
160      }
161      check_settings_timer = Instant::now();
162    }
163    if write_stream && new_run_start {
164      let file_type = FileType::RunFile(runid as u32);
165      //println!("==> Writing stream to file with prefix {}", streamfile_name);
166      writer = Some(TofPacketWriter::new(write_stream_path.clone(), file_type));
167      writer.as_mut().unwrap().mbytes_per_file = mbytes_per_file as usize;
168      new_run_start = false;
169    } else if !write_stream {
170      writer = None;
171    }
172    let mut send_this_packet = true;
173    match incoming.recv() {
174      Err(err) => trace!("No new packet, err {err}"),
175      Ok(pack) => {
176        debug!("Got new tof packet {}", pack.packet_type);
177        if writer.is_some() {
178          match pack.packet_type {
179            PacketType::TofEventSummary 
180            | PacketType::RBWaveform => (),
181            _ => {
182              writer.as_mut().unwrap().add_tof_packet(&pack);
183              heartbeat.n_pack_write_disk += 1;
184              heartbeat.n_bytes_written += pack.payload.len() as u64;   
185            }
186          }
187        }
188        
189        match pack.packet_type {
190          PacketType::TofEvent =>  {
191            send_this_packet = send_tof_event_packets; 
192          }
193          PacketType::RBWaveform => {
194            send_this_packet = send_rbwaveform_packets;
195          }
196          PacketType::TofEventSummary => {
197            send_this_packet = send_tof_summary_packets;
198          }
199          _ => ()
200        }
201        if send_this_packet {
202          match data_socket.send(pack.to_bytestream(),0) {
203            Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
204            Ok(_)    => {
205              trace!("TofPacket sent");
206              heartbeat.n_packets_sent += 1;
207            }
208          } // end match
209        }
210      } // end if pk == event packet
211    } // end incoming.recv
212      //
213      //
214
215    let evid_check_len = evid_check.len();
216    if timer.elapsed().as_secs() > 10 {
217      // FIXME - might be too slow?
218      if evid_check_len > 0 {
219        let mut evid = evid_check[0];
220        for _ in 0..evid_check_len {
221          if !evid_check.contains(&evid) {
222            heartbeat.n_evid_missing += 1;
223            heartbeat.n_evid_chunksize = evid_check_len as u64;
224          }
225          evid += 1;
226        }
227      }
228      timer = Instant::now();
229    }
230    if hb_timer.elapsed() >= hb_interval {
231      heartbeat.met += hb_timer.elapsed().as_secs();
232      
233      match data_socket.send(heartbeat.pack().to_bytestream(),0) {
234        Err(err) => error!("Not able to send heartbeat over 0MQ PUB! {err}"),
235        Ok(_)    => {
236          trace!("Heartbeat sent");
237        }
238      } 
239      evid_check.clear();
240      hb_timer = Instant::now();
241    }
242  } //end loop
243} //end function