liftof_cc/threads/
global_data_sink.rs

1//! Global data sink - a 'funnel' for all packets
2//! generated through the liftof system.
3//!
4//! Each thread of liftof-cc can connect to the 
5//! data sink through a channel and it will 
6//! forward the tof packets to the designated
7//! zmq socket.
8//!
9
10use std::time::{
11  Instant,
12  Duration,
13};
14use std::thread::sleep;
15use std::sync::{
16  Arc,
17  Mutex,
18};
19
20use crossbeam_channel::Receiver; 
21
22use gondola_core::prelude::*;
23
24/// Manages "outgoing" 0MQ PUB socket and writing
25/// data to disk
26///
27/// All received packets will be either forwarded
28/// over zmq or saved to disk
29///
30/// # Arguments:
31///   * incoming       : incoming connection for TofPackets
32///                      from any source
33///   * thread_control : inter-thread communications,
34///                      start/stop signals.
35///                      Keeps global settings.
36pub fn global_data_sink(incoming       : &Receiver<TofPacket>,
37                        incoming_ev    : &Receiver<TofEvent>,
38                        thread_control : Arc<Mutex<ThreadControl>>) {
39  // when the thread starts, we need to wait a bit
40  // till thread_control becomes usable
41  sleep(Duration::from_secs(10));
42  let mut flight_address           = String::from("");
43  let mut mbytes_per_file          = 420usize;
44  let mut write_stream_path        = String::from("");
45  let mut send_tof_summary_packets = false;
46  let mut send_rbwaveform_packets  = false;
47  //let mut send_mtb_event_packets   = false;
48  let mut write_stream             = false;
49  let mut send_rbwf_every_x_event  = 1;
50  // fixme - smaller hb interfal
51  let mut hb_interval              = Duration::from_secs(20u64);
52  let mut only_save_interesting    = false;
53  //let mut thr_n_hits_umb           = 0u8;         
54  let mut thr_n_hits_cbe           = 0u8;
55  let mut thr_n_hits_outer         = 0u8;
56  //let mut thr_n_hits_cor           = 0u8;
57  //let mut thr_tot_edep_umb         = 0.0f32;
58  let mut thr_tot_edep_cbe         = 0.0f32;
59  let mut thr_tot_edep_outer       = 0.0f32;
60  //let mut thr_tot_edep_cor         = 0.0f32;
61  match thread_control.lock() {
62    Ok(mut tc) => {
63      tc.thread_data_sink_active = true; 
64      flight_address             = tc.liftof_settings.data_publisher_settings.fc_pub_address.clone();
65      mbytes_per_file            = tc.liftof_settings.data_publisher_settings.mbytes_per_file; 
66      write_stream_path          = tc.liftof_settings.data_publisher_settings.data_dir.clone();
67      write_stream               = tc.write_data_to_disk;
68      send_tof_summary_packets   = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
69      send_rbwaveform_packets    = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
70      send_rbwf_every_x_event    = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
71      only_save_interesting      = tc.liftof_settings.event_builder_settings.only_save_interesting;
72      //thr_n_hits_umb             = tc.liftof_settings.event_builder_settings.thr_n_hits_umb  ;         
73      thr_n_hits_cbe             = tc.liftof_settings.event_builder_settings.thr_n_hits_cbe.unwrap_or(0)  ;
74      thr_n_hits_outer           = tc.liftof_settings.event_builder_settings.thr_n_hits_outer.unwrap_or(0);
75      //thr_n_hits_cor             = tc.liftof_settings.event_builder_settings.thr_n_hits_cor  ;
76      //thr_tot_edep_umb           = tc.liftof_settings.event_builder_settings.thr_tot_edep_umb;
77      thr_tot_edep_outer         = tc.liftof_settings.event_builder_settings.thr_tot_edep_outer.unwrap_or(0.0);
78      thr_tot_edep_cbe           = tc.liftof_settings.event_builder_settings.thr_tot_edep_cbe.unwrap_or(0.0);
79      //thr_tot_edep_cor           = tc.liftof_settings.event_builder_settings.thr_tot_edep_cor;
80      hb_interval                = Duration::from_secs(tc.liftof_settings.data_publisher_settings.hb_send_interval as u64);
81    },
82    Err(err) => {
83      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
84    },
85  }
86  
87  if send_rbwf_every_x_event == 0 {
88    error!("0 is not a reasonable value for send_rbwf_every_x_event!. We will switch of the sending of RBWaveforms instead!");
89    send_rbwaveform_packets = false;
90  }
91
92  let mut evid_check        = Vec::<u32>::new();
93
94  let ctx = zmq::Context::new();
95  // FIXME - should we just move to another socket if that one is not working?
96  let data_socket = ctx.socket(zmq::PUB).expect("Can not create socket!");
97  let unlim : i32 = 1000000;
98  data_socket.set_sndhwm(unlim).unwrap();
99  //println!("==> Will bind zmq socket to address {}", flight_address);
100  match data_socket.bind(&flight_address) {
101    // FIXEM - this panic is no good! What we want to do instead is
102    // 1) set the flag in thread_control that we are running 
103    // to false, 
104    // 2) enter an eternal loop where we try to restart it
105    Err(err) => panic!("Can not bind to address {}! {}", flight_address, err),
106    Ok(_)    => ()
107  }
108  info!("ZMQ PUB Socket for global data sink bound to {flight_address}");
109
110  //let mut event_cache = Vec::<TofPacket>::with_capacity(100); 
111
112  // for debugging/profiling
113  let mut timer                = Instant::now();
114  let mut check_settings_timer = Instant::now();
115
116  // run settings 
117  let mut writer : Option<TofPacketWriter> = None;
118  let mut runid : u32   = 0;
119  let mut new_run_start = false;
120  let mut retire        = false;
121  let mut heartbeat     = DataSinkHB::new();
122  let mut hb_timer      = Instant::now(); 
123  let mut rbwf_ctr      = 0u32;
124  loop {
125    if retire {
126      // take a long nap to give other threads 
127      // a chance to finish first
128      warn!("Will end data sink thread in 25 seconds!");
129      println!("= =>Will end data sink thread in 25 seconds!");
130      sleep(Duration::from_secs(25));
131      break;
132    }
133    // even though this is called kill timer, check
134    // the settings in general, since they might have
135    // changed due to remote access.
136    if check_settings_timer.elapsed().as_secs_f32() > 1.5 {
137      match thread_control.try_lock() {
138        Ok(mut tc) => {
139          send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
140          send_rbwaveform_packets  = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets; 
141          if tc.stop_flag {
142            tc.thread_data_sink_active = false;
143            // we want to make sure that data sink ends the latest
144            retire = true;
145          } 
146          if tc.new_run_start_flag {
147            new_run_start         = true;
148            write_stream          = tc.write_data_to_disk;
149            write_stream_path     = tc.liftof_settings.data_publisher_settings.data_dir.clone(); 
150            runid                 = tc.run_id;
151            write_stream_path     += &(format!("/{}/", runid));
152            tc.new_run_start_flag = false;
153          }
154        },
155        Err(err) => {
156          error!("Can't acquire lock for ThreadControl! {err}");
157        },
158      }
159      check_settings_timer = Instant::now();
160    }
161    if write_stream && new_run_start {
162      let file_type = FileType::RunFile(runid as u32);
163      //println!("==> Writing stream to file with prefix {}", streamfile_name);
164      writer = Some(TofPacketWriter::new(write_stream_path.clone(), file_type));
165      writer.as_mut().unwrap().mbytes_per_file = mbytes_per_file as usize;
166      new_run_start = false;
167    } else if !write_stream {
168      writer = None;
169    }
170
171    match incoming.try_recv() {
172      Err(err) => trace!("No new packet, err {err}"),
173      Ok(pack) => {
174        debug!("Got new tof packet {}", pack.packet_type);
175        // if it is not some event type, just write it to disk
176        if writer.is_some() {
177          if !pack.no_write_to_disk {
178            writer.as_mut().unwrap().add_tof_packet(&pack);
179            heartbeat.n_pack_write_disk += 1;
180            heartbeat.n_bytes_written += pack.payload.len() as u64;   
181          }
182        }
183        match data_socket.send(pack.to_bytestream(),0) {
184          Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
185          Ok(_)    => {
186            trace!("TofPacket sent");
187            heartbeat.n_packets_sent += 1;
188          }
189        } // end match
190      } 
191    } 
192    match incoming_ev.recv() {
193      Err(err) => trace!("Unable to receive event packet! {err}"),
194      Ok(mut ev_)  => { 
195        // FIXME - we need to work on the system to 
196        // select how to send waveforms or not
197        ev_.calc_gcu_variables();
198        let mut write_to_disk = true; 
199        if only_save_interesting {
200          //if ev_.n_hits_umb   < thr_n_hits_umb 
201          if ev_.n_hits_cbe     < thr_n_hits_cbe
202          && ev_.n_hits_cor + ev_.n_hits_umb   < thr_n_hits_outer
203          //&& ev_.n_hits_cor   < thr_n_hits_cor
204          //&& ev_.tot_edep_umb < thr_tot_edep_umb
205          && ev_.tot_edep_cbe   < thr_tot_edep_cbe
206          && ev_.tot_edep_cor + ev_.tot_edep_umb  < thr_tot_edep_outer { 
207          //&& ev_.tot_edep_cor < thr_tot_edep_cor {
208            write_to_disk = false;
209          }
210        }
211        let mut pack = ev_.pack();
212        if writer.is_some() && write_to_disk {
213          writer.as_mut().unwrap().add_tof_packet(&pack);
214          heartbeat.n_pack_write_disk += 1;
215          heartbeat.n_bytes_written += pack.payload.len() as u64;   
216        }
217
218        if send_rbwaveform_packets {
219          if rbwf_ctr % send_rbwf_every_x_event == 0 {
220            for wf in ev_.get_waveforms() { 
221              match data_socket.send(wf.pack().to_bytestream(),0) {
222                Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
223                Ok(_)    => {
224                  trace!("TofPacket sent");
225                  heartbeat.n_packets_sent += 1;
226                }
227              } // end match
228            }
229          }
230          rbwf_ctr += 1;
231        }
232
233        // Now move hits, strip waveforms and calculate gcu variables 
234        ev_.move_hits();
235        
236        //--------------------------------
237        // DEBUG - have the cake and eat it too!
238        // This will not send waveforms over 
239        // the gcu (because of protocolversion :: V3
240        // but also not strip the RBEvents
241        //ev_.strip_rbevents();
242        //ev_.version = ProtocolVersion::V3;
243        //--------------------------------
244       
245        // this is that the events go over the gcu
246        ev_.strip_rbevents();
247        ev_.version = ProtocolVersion::V1;
248        pack = ev_.pack();
249        if send_tof_summary_packets {
250          match data_socket.send(pack.to_bytestream(),0) {
251            Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
252            Ok(_)    => {
253              trace!("TofPacket sent");
254              heartbeat.n_packets_sent += 1;
255            }
256          } // end match
257        }
258      } // end if pk == event packet
259    } // end incoming.recv
260
261    let evid_check_len = evid_check.len();
262    if timer.elapsed().as_secs() > 10 {
263      // FIXME - might be too slow?
264      if evid_check_len > 0 {
265        let mut evid = evid_check[0];
266        for _ in 0..evid_check_len {
267          if !evid_check.contains(&evid) {
268            heartbeat.n_evid_missing += 1;
269            heartbeat.n_evid_chunksize = evid_check_len as u64;
270          }
271          evid += 1;
272        }
273      }
274      timer = Instant::now();
275    }
276    if hb_timer.elapsed() >= hb_interval {
277      heartbeat.met += hb_timer.elapsed().as_secs();
278      
279      match data_socket.send(heartbeat.pack().to_bytestream(),0) {
280        Err(err) => error!("Not able to send heartbeat over 0MQ PUB! {err}"),
281        Ok(_)    => {
282          trace!("Heartbeat sent");
283        }
284      } 
285      evid_check.clear();
286      hb_timer = Instant::now();
287    }
288  } //end loop
289} //end function