liftof_cc/threads/
global_data_sink.rs

1//! Global data sink - a 'funnel' for all packets
2//! generated through the liftof system.
3//!
4//! Each thread of liftof-cc can connect to the 
5//! data sink through a channel and it will 
6//! forward the tof packets to the designated
7//! zmq socket.
8//!
9
10use std::time::{
11  Instant,
12  Duration,
13};
14use std::thread::sleep;
15use std::sync::{
16  Arc,
17  Mutex,
18};
19
20use crossbeam_channel::Receiver; 
21
22use gondola_core::prelude::*;
23
24/// Manages "outgoing" 0MQ PUB socket and writing
25/// data to disk
26///
27/// All received packets will be either forwarded
28/// over zmq or saved to disk
29///
30/// # Arguments
31///
32///     * incoming       : incoming connection for TofPackets
33///                        from any source
34///     * thread_control : inter-thread communications,
35///                        start/stop signals.
36///                        Keeps global settings.
37pub fn global_data_sink(incoming       : &Receiver<TofPacket>,
38                        thread_control : Arc<Mutex<ThreadControl>>) {
39  // when the thread starts, we need to wait a bit
40  // till thread_control becomes usable
41  sleep(Duration::from_secs(10));
42  let mut flight_address           = String::from("");
43  let mut mbytes_per_file          = 420usize;
44  let mut write_stream_path        = String::from("");
45  let mut send_tof_summary_packets = false;
46  let mut send_rbwaveform_packets  = false;
47  //let mut send_mtb_event_packets   = false;
48  let mut send_tof_event_packets   = false;
49  let mut write_stream             = false;
50  let mut send_rbwf_every_x_event  = 1;
51  // fixme - smaller hb interfal
52  let mut hb_interval              = Duration::from_secs(20u64);
53  match thread_control.lock() {
54    Ok(mut tc) => {
55      tc.thread_data_sink_active = true; 
56      flight_address             = tc.liftof_settings.data_publisher_settings.fc_pub_address.clone();
57      mbytes_per_file            = tc.liftof_settings.data_publisher_settings.mbytes_per_file; 
58      write_stream_path          = tc.liftof_settings.data_publisher_settings.data_dir.clone();
59      write_stream               = tc.write_data_to_disk;
60      send_tof_summary_packets   = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
61      send_rbwaveform_packets    = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
62      send_tof_event_packets     = tc.liftof_settings.data_publisher_settings.send_tof_event_packets;
63      send_rbwf_every_x_event    = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
64      hb_interval                = Duration::from_secs(tc.liftof_settings.data_publisher_settings.hb_send_interval as u64);
65    },
66    Err(err) => {
67      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
68    },
69  }
70  
71  if send_rbwf_every_x_event == 0 {
72    error!("0 is not a reasonable value for send_rbwf_every_x_event!. We will switch of the sending of RBWaveforms instead!");
73    send_rbwaveform_packets = false;
74  }
75
76  let mut evid_check        = Vec::<u32>::new();
77
78  let ctx = zmq::Context::new();
79  // FIXME - should we just move to another socket if that one is not working?
80  let data_socket = ctx.socket(zmq::PUB).expect("Can not create socket!");
81  let unlim : i32 = 1000000;
82  data_socket.set_sndhwm(unlim).unwrap();
83  //println!("==> Will bind zmq socket to address {}", flight_address);
84  match data_socket.bind(&flight_address) {
85    // FIXEM - this panic is no good! What we want to do instead is
86    // 1) set the flag in thread_control that we are running 
87    // to false, 
88    // 2) enter an eternal loop where we try to restart it
89    Err(err) => panic!("Can not bind to address {}! {}", flight_address, err),
90    Ok(_)    => ()
91  }
92  info!("ZMQ PUB Socket for global data sink bound to {flight_address}");
93
94  //let mut event_cache = Vec::<TofPacket>::with_capacity(100); 
95
96  // for debugging/profiling
97  let mut timer                = Instant::now();
98  let mut check_settings_timer = Instant::now();
99
100  // run settings 
101  let mut writer : Option<TofPacketWriter> = None;
102  let mut runid : u32   = 0;
103  let mut new_run_start = false;
104  let mut retire        = false;
105  let mut heartbeat     = DataSinkHB::new();
106  let mut hb_timer      = Instant::now(); 
107  //let mut rbwf_ctr      = 0u32;
108  loop {
109    if retire {
110      // take a long nap to give other threads 
111      // a chance to finish first
112      warn!("Will end data sink thread in 25 seconds!");
113      println!("= =>Will end data sink thread in 25 seconds!");
114      sleep(Duration::from_secs(25));
115      break;
116    }
117    // even though this is called kill timer, check
118    // the settings in general, since they might have
119    // changed due to remote access.
120    if check_settings_timer.elapsed().as_secs_f32() > 1.5 {
121      match thread_control.try_lock() {
122        Ok(mut tc) => {
123          send_tof_event_packets   = tc.liftof_settings.data_publisher_settings.send_tof_event_packets;      
124          send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
125          send_rbwaveform_packets  = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
126    
127          if tc.stop_flag {
128            tc.thread_data_sink_active = false;
129            // we want to make sure that data sink ends the latest
130            retire = true;
131          } 
132          if tc.new_run_start_flag {
133            new_run_start         = true;
134            write_stream          = tc.write_data_to_disk;
135            write_stream_path     = tc.liftof_settings.data_publisher_settings.data_dir.clone(); 
136            runid                 = tc.run_id;
137            write_stream_path     += &(format!("/{}/", runid));
138            tc.new_run_start_flag = false;
139          }
140        },
141        Err(err) => {
142          error!("Can't acquire lock for ThreadControl! {err}");
143        },
144      }
145      check_settings_timer = Instant::now();
146    }
147    if write_stream && new_run_start {
148      let file_type = FileType::RunFile(runid as u32);
149      //println!("==> Writing stream to file with prefix {}", streamfile_name);
150      writer = Some(TofPacketWriter::new(write_stream_path.clone(), file_type));
151      writer.as_mut().unwrap().mbytes_per_file = mbytes_per_file as usize;
152      new_run_start = false;
153    } else if !write_stream {
154      writer = None;
155    }
156    let mut send_this_packet = true;
157    match incoming.recv() {
158      Err(err) => trace!("No new packet, err {err}"),
159      Ok(pack) => {
160        debug!("Got new tof packet {}", pack.packet_type);
161        // if it is not some event type, just write it to disk
162        if writer.is_some() {
163          match pack.packet_type {
164            TofPacketType::TofEvent 
165            | TofPacketType::RBWaveform => (),
166            _ => {
167              writer.as_mut().unwrap().add_tof_packet(&pack);
168              heartbeat.n_pack_write_disk += 1;
169              heartbeat.n_bytes_written += pack.payload.len() as u64;   
170            }
171          }
172        }
173       
174        // FIXME - we need to work on the system to 
175        // select how to send waveforms or not
176        match pack.packet_type {
177          TofPacketType::TofEventDeprecated =>  {
178            send_this_packet = send_tof_event_packets; 
179          }
180          TofPacketType::RBWaveform => {
181            send_this_packet = send_rbwaveform_packets;
182          }
183          TofPacketType::TofEvent => {
184            send_this_packet = send_tof_summary_packets;
185          }
186          _ => ()
187        }
188        if send_this_packet {
189          match data_socket.send(pack.to_bytestream(),0) {
190            Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
191            Ok(_)    => {
192              trace!("TofPacket sent");
193              heartbeat.n_packets_sent += 1;
194            }
195          } // end match
196        }
197      } // end if pk == event packet
198    } // end incoming.recv
199      //
200      //
201
202    let evid_check_len = evid_check.len();
203    if timer.elapsed().as_secs() > 10 {
204      // FIXME - might be too slow?
205      if evid_check_len > 0 {
206        let mut evid = evid_check[0];
207        for _ in 0..evid_check_len {
208          if !evid_check.contains(&evid) {
209            heartbeat.n_evid_missing += 1;
210            heartbeat.n_evid_chunksize = evid_check_len as u64;
211          }
212          evid += 1;
213        }
214      }
215      timer = Instant::now();
216    }
217    if hb_timer.elapsed() >= hb_interval {
218      heartbeat.met += hb_timer.elapsed().as_secs();
219      
220      match data_socket.send(heartbeat.pack().to_bytestream(),0) {
221        Err(err) => error!("Not able to send heartbeat over 0MQ PUB! {err}"),
222        Ok(_)    => {
223          trace!("Heartbeat sent");
224        }
225      } 
226      evid_check.clear();
227      hb_timer = Instant::now();
228    }
229  } //end loop
230} //end function