liftof_cc/threads/
global_data_sink.rs

1//! Global data sink - a 'funnel' for all packets
2//! generated through the liftof system.
3//!
4//! Each thread of liftof-cc can connect to the 
5//! data sink through a channel and it will 
6//! forward the tof packets to the designated
7//! zmq socket.
8//!
9
10use std::time::{
11  Instant,
12  Duration,
13};
14use std::thread::sleep;
15use std::sync::{
16  Arc,
17  Mutex,
18};
19
20use crossbeam_channel::Receiver; 
21
22use gondola_core::prelude::*;
23
24/// Manages "outgoing" 0MQ PUB socket and writing
25/// data to disk
26///
27/// All received packets will be either forwarded
28/// over zmq or saved to disk
29///
30/// # Arguments
31///
32///     * incoming       : incoming connection for TofPackets
33///                        from any source
34///     * thread_control : inter-thread communications,
35///                        start/stop signals.
36///                        Keeps global settings.
37pub fn global_data_sink(incoming       : &Receiver<TofPacket>,
38                        incoming_ev    : &Receiver<TofEvent>,
39                        thread_control : Arc<Mutex<ThreadControl>>) {
40  // when the thread starts, we need to wait a bit
41  // till thread_control becomes usable
42  sleep(Duration::from_secs(10));
43  let mut flight_address           = String::from("");
44  let mut mbytes_per_file          = 420usize;
45  let mut write_stream_path        = String::from("");
46  let mut send_tof_summary_packets = false;
47  let mut send_rbwaveform_packets  = false;
48  //let mut send_mtb_event_packets   = false;
49  let mut write_stream             = false;
50  let mut send_rbwf_every_x_event  = 1;
51  // fixme - smaller hb interfal
52  let mut hb_interval              = Duration::from_secs(20u64);
53  let mut only_save_interesting    = false;
54  //let mut thr_n_hits_umb           = 0u8;         
55  let mut thr_n_hits_cbe           = 0u8;
56  let mut thr_n_hits_outer         = 0u8;
57  //let mut thr_n_hits_cor           = 0u8;
58  //let mut thr_tot_edep_umb         = 0.0f32;
59  let mut thr_tot_edep_cbe         = 0.0f32;
60  let mut thr_tot_edep_outer       = 0.0f32;
61  //let mut thr_tot_edep_cor         = 0.0f32;
62  match thread_control.lock() {
63    Ok(mut tc) => {
64      tc.thread_data_sink_active = true; 
65      flight_address             = tc.liftof_settings.data_publisher_settings.fc_pub_address.clone();
66      mbytes_per_file            = tc.liftof_settings.data_publisher_settings.mbytes_per_file; 
67      write_stream_path          = tc.liftof_settings.data_publisher_settings.data_dir.clone();
68      write_stream               = tc.write_data_to_disk;
69      send_tof_summary_packets   = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
70      send_rbwaveform_packets    = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
71      send_rbwf_every_x_event    = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
72      only_save_interesting      = tc.liftof_settings.event_builder_settings.only_save_interesting;
73      //thr_n_hits_umb             = tc.liftof_settings.event_builder_settings.thr_n_hits_umb  ;         
74      thr_n_hits_cbe             = tc.liftof_settings.event_builder_settings.thr_n_hits_cbe.unwrap_or(0)  ;
75      thr_n_hits_outer           = tc.liftof_settings.event_builder_settings.thr_n_hits_outer.unwrap_or(0);
76      //thr_n_hits_cor             = tc.liftof_settings.event_builder_settings.thr_n_hits_cor  ;
77      //thr_tot_edep_umb           = tc.liftof_settings.event_builder_settings.thr_tot_edep_umb;
78      thr_tot_edep_outer         = tc.liftof_settings.event_builder_settings.thr_tot_edep_outer.unwrap_or(0.0);
79      thr_tot_edep_cbe           = tc.liftof_settings.event_builder_settings.thr_tot_edep_cbe.unwrap_or(0.0);
80      //thr_tot_edep_cor           = tc.liftof_settings.event_builder_settings.thr_tot_edep_cor;
81      hb_interval                = Duration::from_secs(tc.liftof_settings.data_publisher_settings.hb_send_interval as u64);
82    },
83    Err(err) => {
84      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
85    },
86  }
87  
88  if send_rbwf_every_x_event == 0 {
89    error!("0 is not a reasonable value for send_rbwf_every_x_event!. We will switch of the sending of RBWaveforms instead!");
90    send_rbwaveform_packets = false;
91  }
92
93  let mut evid_check        = Vec::<u32>::new();
94
95  let ctx = zmq::Context::new();
96  // FIXME - should we just move to another socket if that one is not working?
97  let data_socket = ctx.socket(zmq::PUB).expect("Can not create socket!");
98  let unlim : i32 = 1000000;
99  data_socket.set_sndhwm(unlim).unwrap();
100  //println!("==> Will bind zmq socket to address {}", flight_address);
101  match data_socket.bind(&flight_address) {
102    // FIXEM - this panic is no good! What we want to do instead is
103    // 1) set the flag in thread_control that we are running 
104    // to false, 
105    // 2) enter an eternal loop where we try to restart it
106    Err(err) => panic!("Can not bind to address {}! {}", flight_address, err),
107    Ok(_)    => ()
108  }
109  info!("ZMQ PUB Socket for global data sink bound to {flight_address}");
110
111  //let mut event_cache = Vec::<TofPacket>::with_capacity(100); 
112
113  // for debugging/profiling
114  let mut timer                = Instant::now();
115  let mut check_settings_timer = Instant::now();
116
117  // run settings 
118  let mut writer : Option<TofPacketWriter> = None;
119  let mut runid : u32   = 0;
120  let mut new_run_start = false;
121  let mut retire        = false;
122  let mut heartbeat     = DataSinkHB::new();
123  let mut hb_timer      = Instant::now(); 
124  let mut rbwf_ctr      = 0u32;
125  loop {
126    if retire {
127      // take a long nap to give other threads 
128      // a chance to finish first
129      warn!("Will end data sink thread in 25 seconds!");
130      println!("= =>Will end data sink thread in 25 seconds!");
131      sleep(Duration::from_secs(25));
132      break;
133    }
134    // even though this is called kill timer, check
135    // the settings in general, since they might have
136    // changed due to remote access.
137    if check_settings_timer.elapsed().as_secs_f32() > 1.5 {
138      match thread_control.try_lock() {
139        Ok(mut tc) => {
140          send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
141          send_rbwaveform_packets  = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets; 
142          if tc.stop_flag {
143            tc.thread_data_sink_active = false;
144            // we want to make sure that data sink ends the latest
145            retire = true;
146          } 
147          if tc.new_run_start_flag {
148            new_run_start         = true;
149            write_stream          = tc.write_data_to_disk;
150            write_stream_path     = tc.liftof_settings.data_publisher_settings.data_dir.clone(); 
151            runid                 = tc.run_id;
152            write_stream_path     += &(format!("/{}/", runid));
153            tc.new_run_start_flag = false;
154          }
155        },
156        Err(err) => {
157          error!("Can't acquire lock for ThreadControl! {err}");
158        },
159      }
160      check_settings_timer = Instant::now();
161    }
162    if write_stream && new_run_start {
163      let file_type = FileType::RunFile(runid as u32);
164      //println!("==> Writing stream to file with prefix {}", streamfile_name);
165      writer = Some(TofPacketWriter::new(write_stream_path.clone(), file_type));
166      writer.as_mut().unwrap().mbytes_per_file = mbytes_per_file as usize;
167      new_run_start = false;
168    } else if !write_stream {
169      writer = None;
170    }
171
172    match incoming.try_recv() {
173      Err(err) => trace!("No new packet, err {err}"),
174      Ok(pack) => {
175        debug!("Got new tof packet {}", pack.packet_type);
176        // if it is not some event type, just write it to disk
177        if writer.is_some() {
178          if !pack.no_write_to_disk {
179            writer.as_mut().unwrap().add_tof_packet(&pack);
180            heartbeat.n_pack_write_disk += 1;
181            heartbeat.n_bytes_written += pack.payload.len() as u64;   
182          }
183        }
184        match data_socket.send(pack.to_bytestream(),0) {
185          Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
186          Ok(_)    => {
187            trace!("TofPacket sent");
188            heartbeat.n_packets_sent += 1;
189          }
190        } // end match
191      } 
192    } 
193    match incoming_ev.recv() {
194      Err(err) => trace!("Unable to receive event packet! {err}"),
195      Ok(mut ev_)  => { 
196        // FIXME - we need to work on the system to 
197        // select how to send waveforms or not
198        ev_.calc_gcu_variables();
199        let mut write_to_disk = true; 
200        if only_save_interesting {
201          //if ev_.n_hits_umb   < thr_n_hits_umb 
202          if ev_.n_hits_cbe     < thr_n_hits_cbe
203          && ev_.n_hits_cor + ev_.n_hits_umb   < thr_n_hits_outer
204          //&& ev_.n_hits_cor   < thr_n_hits_cor
205          //&& ev_.tot_edep_umb < thr_tot_edep_umb
206          && ev_.tot_edep_cbe   < thr_tot_edep_cbe
207          && ev_.tot_edep_cor + ev_.tot_edep_umb  < thr_tot_edep_outer { 
208          //&& ev_.tot_edep_cor < thr_tot_edep_cor {
209            write_to_disk = false;
210          }
211        }
212        let mut pack = ev_.pack();
213        if writer.is_some() && write_to_disk {
214          writer.as_mut().unwrap().add_tof_packet(&pack);
215          heartbeat.n_pack_write_disk += 1;
216          heartbeat.n_bytes_written += pack.payload.len() as u64;   
217        }
218
219        if send_rbwaveform_packets {
220          if rbwf_ctr % send_rbwf_every_x_event == 0 {
221            for wf in ev_.get_waveforms() { 
222              match data_socket.send(wf.pack().to_bytestream(),0) {
223                Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
224                Ok(_)    => {
225                  trace!("TofPacket sent");
226                  heartbeat.n_packets_sent += 1;
227                }
228              } // end match
229            }
230          }
231          rbwf_ctr += 1;
232        }
233
234        // Now move hits, strip waveforms and calculate gcu variables 
235        ev_.move_hits();
236        
237        //--------------------------------
238        // DEBUG - have the cake and eat it too!
239        // This will not send waveforms over 
240        // the gcu (because of protocolversion :: V3
241        // but also not strip the RBEvents
242        //ev_.strip_rbevents();
243        //ev_.version = ProtocolVersion::V3;
244        //--------------------------------
245       
246        // this is that the events go over the gcu
247        ev_.strip_rbevents();
248        ev_.version = ProtocolVersion::V1;
249        pack = ev_.pack();
250        if send_tof_summary_packets {
251          match data_socket.send(pack.to_bytestream(),0) {
252            Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
253            Ok(_)    => {
254              trace!("TofPacket sent");
255              heartbeat.n_packets_sent += 1;
256            }
257          } // end match
258        }
259      } // end if pk == event packet
260    } // end incoming.recv
261
262    let evid_check_len = evid_check.len();
263    if timer.elapsed().as_secs() > 10 {
264      // FIXME - might be too slow?
265      if evid_check_len > 0 {
266        let mut evid = evid_check[0];
267        for _ in 0..evid_check_len {
268          if !evid_check.contains(&evid) {
269            heartbeat.n_evid_missing += 1;
270            heartbeat.n_evid_chunksize = evid_check_len as u64;
271          }
272          evid += 1;
273        }
274      }
275      timer = Instant::now();
276    }
277    if hb_timer.elapsed() >= hb_interval {
278      heartbeat.met += hb_timer.elapsed().as_secs();
279      
280      match data_socket.send(heartbeat.pack().to_bytestream(),0) {
281        Err(err) => error!("Not able to send heartbeat over 0MQ PUB! {err}"),
282        Ok(_)    => {
283          trace!("Heartbeat sent");
284        }
285      } 
286      evid_check.clear();
287      hb_timer = Instant::now();
288    }
289  } //end loop
290} //end function