liftof_cc/threads/
global_data_sink.rs

1//! Global data sink - a 'funnel' for all packets
2//! generated through the liftof system.
3//!
4//! Each thread of liftof-cc can connect to the 
5//! data sink through a channel and it will 
6//! forward the tof packets to the designated
7//! zmq socket.
8//!
9
10use std::time::{
11  Instant,
12  Duration,
13};
14use std::thread::sleep;
15use std::sync::{
16  Arc,
17  Mutex,
18};
19
20use crossbeam_channel::Receiver; 
21
22use gondola_core::prelude::*;
23
24/// Manages "outgoing" 0MQ PUB socket and writing
25/// data to disk
26///
27/// All received packets will be either forwarded
28/// over zmq or saved to disk
29///
30/// # Arguments:
31///   * incoming       : incoming connection for TofPackets
32///                      from any source
33///   * thread_control : inter-thread communications,
34///                      start/stop signals.
35///                      Keeps global settings.
36pub fn global_data_sink(incoming       : &Receiver<TofPacket>,
37                        thread_control : Arc<Mutex<ThreadControl>>) {
38  // when the thread starts, we need to wait a bit
39  // till thread_control becomes usable
40  sleep(Duration::from_secs(10));
41  let mut flight_address           = String::from("");
42  let mut mbytes_per_file          = 420usize;
43  let mut write_stream_path        = String::from("");
44  let mut send_tof_summary_packets = false;
45  let mut send_rbwaveform_packets  = false;
46  //let mut send_mtb_event_packets   = false;
47  let mut send_tof_event_packets   = false;
48  let mut write_stream             = false;
49  let mut send_rbwf_every_x_event  = 1;
50  // fixme - smaller hb interfal
51  let mut hb_interval              = Duration::from_secs(20u64);
52  match thread_control.lock() {
53    Ok(mut tc) => {
54      tc.thread_data_sink_active = true; 
55      flight_address             = tc.liftof_settings.data_publisher_settings.fc_pub_address.clone();
56      mbytes_per_file            = tc.liftof_settings.data_publisher_settings.mbytes_per_file; 
57      write_stream_path          = tc.liftof_settings.data_publisher_settings.data_dir.clone();
58      write_stream               = tc.write_data_to_disk;
59      send_tof_summary_packets   = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
60      send_rbwaveform_packets    = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
61      send_tof_event_packets     = tc.liftof_settings.data_publisher_settings.send_tof_event_packets;
62      send_rbwf_every_x_event    = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
63      hb_interval                = Duration::from_secs(tc.liftof_settings.data_publisher_settings.hb_send_interval as u64);
64    },
65    Err(err) => {
66      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
67    },
68  }
69  
70  if send_rbwf_every_x_event == 0 {
71    error!("0 is not a reasonable value for send_rbwf_every_x_event!. We will switch of the sending of RBWaveforms instead!");
72    send_rbwaveform_packets = false;
73  }
74
75  let mut evid_check        = Vec::<u32>::new();
76
77  let ctx = zmq::Context::new();
78  // FIXME - should we just move to another socket if that one is not working?
79  let data_socket = ctx.socket(zmq::PUB).expect("Can not create socket!");
80  let unlim : i32 = 1000000;
81  data_socket.set_sndhwm(unlim).unwrap();
82  //println!("==> Will bind zmq socket to address {}", flight_address);
83  match data_socket.bind(&flight_address) {
84    // FIXEM - this panic is no good! What we want to do instead is
85    // 1) set the flag in thread_control that we are running 
86    // to false, 
87    // 2) enter an eternal loop where we try to restart it
88    Err(err) => panic!("Can not bind to address {}! {}", flight_address, err),
89    Ok(_)    => ()
90  }
91  info!("ZMQ PUB Socket for global data sink bound to {flight_address}");
92
93  //let mut event_cache = Vec::<TofPacket>::with_capacity(100); 
94
95  // for debugging/profiling
96  let mut timer                = Instant::now();
97  let mut check_settings_timer = Instant::now();
98
99  // run settings 
100  let mut writer : Option<TofPacketWriter> = None;
101  let mut runid : u32   = 0;
102  let mut new_run_start = false;
103  let mut retire        = false;
104  let mut heartbeat     = DataSinkHB::new();
105  let mut hb_timer      = Instant::now(); 
106  //let mut rbwf_ctr      = 0u32;
107  loop {
108    if retire {
109      // take a long nap to give other threads 
110      // a chance to finish first
111      warn!("Will end data sink thread in 25 seconds!");
112      println!("= =>Will end data sink thread in 25 seconds!");
113      sleep(Duration::from_secs(25));
114      break;
115    }
116    // even though this is called kill timer, check
117    // the settings in general, since they might have
118    // changed due to remote access.
119    if check_settings_timer.elapsed().as_secs_f32() > 1.5 {
120      match thread_control.try_lock() {
121        Ok(mut tc) => {
122          send_tof_event_packets   = tc.liftof_settings.data_publisher_settings.send_tof_event_packets;      
123          send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
124          send_rbwaveform_packets  = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
125    
126          if tc.stop_flag {
127            tc.thread_data_sink_active = false;
128            // we want to make sure that data sink ends the latest
129            retire = true;
130          } 
131          if tc.new_run_start_flag {
132            new_run_start         = true;
133            write_stream          = tc.write_data_to_disk;
134            write_stream_path     = tc.liftof_settings.data_publisher_settings.data_dir.clone(); 
135            runid                 = tc.run_id;
136            write_stream_path     += &(format!("/{}/", runid));
137            tc.new_run_start_flag = false;
138          }
139        },
140        Err(err) => {
141          error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
142        },
143      }
144      check_settings_timer = Instant::now();
145    }
146    if write_stream && new_run_start {
147      let file_type = FileType::RunFile(runid as u32);
148      //println!("==> Writing stream to file with prefix {}", streamfile_name);
149      writer = Some(TofPacketWriter::new(write_stream_path.clone(), file_type));
150      writer.as_mut().unwrap().mbytes_per_file = mbytes_per_file as usize;
151      new_run_start = false;
152    } else if !write_stream {
153      writer = None;
154    }
155    let mut send_this_packet = true;
156    match incoming.recv() {
157      Err(err) => trace!("No new packet, err {err}"),
158      Ok(pack) => {
159        debug!("Got new tof packet {}", pack.packet_type);
160        // if it is not some event type, just write it to disk
161        if writer.is_some() {
162          match pack.packet_type {
163            TofPacketType::TofEvent 
164            | TofPacketType::RBWaveform => (),
165            _ => {
166              writer.as_mut().unwrap().add_tof_packet(&pack);
167              heartbeat.n_pack_write_disk += 1;
168              heartbeat.n_bytes_written += pack.payload.len() as u64;   
169            }
170          }
171        }
172       
173        // FIXME - we need to work on the system to 
174        // select how to send waveforms or not
175        match pack.packet_type {
176          TofPacketType::TofEventDeprecated =>  {
177            send_this_packet = send_tof_event_packets; 
178          }
179          TofPacketType::RBWaveform => {
180            send_this_packet = send_rbwaveform_packets;
181          }
182          TofPacketType::TofEvent => {
183            send_this_packet = send_tof_summary_packets;
184          }
185          _ => ()
186        }
187        if send_this_packet {
188          match data_socket.send(pack.to_bytestream(),0) {
189            Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
190            Ok(_)    => {
191              trace!("TofPacket sent");
192              heartbeat.n_packets_sent += 1;
193            }
194          } // end match
195        }
196      } // end if pk == event packet
197    } // end incoming.recv
198      //
199      //
200
201    let evid_check_len = evid_check.len();
202    if timer.elapsed().as_secs() > 10 {
203      // FIXME - might be too slow?
204      if evid_check_len > 0 {
205        let mut evid = evid_check[0];
206        for _ in 0..evid_check_len {
207          if !evid_check.contains(&evid) {
208            heartbeat.n_evid_missing += 1;
209            heartbeat.n_evid_chunksize = evid_check_len as u64;
210          }
211          evid += 1;
212        }
213      }
214      timer = Instant::now();
215    }
216    if hb_timer.elapsed() >= hb_interval {
217      heartbeat.met += hb_timer.elapsed().as_secs();
218      
219      match data_socket.send(heartbeat.pack().to_bytestream(),0) {
220        Err(err) => error!("Not able to send heartbeat over 0MQ PUB! {err}"),
221        Ok(_)    => {
222          trace!("Heartbeat sent");
223        }
224      } 
225      evid_check.clear();
226      hb_timer = Instant::now();
227    }
228  } //end loop
229} //end function