liftof_cc/threads/
global_data_sink.rs1use std::time::{
11 Instant,
12 Duration,
13};
14use std::thread::sleep;
15use std::sync::{
16 Arc,
17 Mutex,
18};
19
20use crossbeam_channel::Receiver;
21
22use gondola_core::prelude::*;
23
24pub fn global_data_sink(incoming : &Receiver<TofPacket>,
38 incoming_ev : &Receiver<TofEvent>,
39 thread_control : Arc<Mutex<ThreadControl>>) {
40 sleep(Duration::from_secs(10));
43 let mut flight_address = String::from("");
44 let mut mbytes_per_file = 420usize;
45 let mut write_stream_path = String::from("");
46 let mut send_tof_summary_packets = false;
47 let mut send_rbwaveform_packets = false;
48 let mut write_stream = false;
50 let mut send_rbwf_every_x_event = 1;
51 let mut hb_interval = Duration::from_secs(20u64);
53 let mut only_save_interesting = false;
54 let mut thr_n_hits_cbe = 0u8;
56 let mut thr_n_hits_outer = 0u8;
57 let mut thr_tot_edep_cbe = 0.0f32;
60 let mut thr_tot_edep_outer = 0.0f32;
61 match thread_control.lock() {
63 Ok(mut tc) => {
64 tc.thread_data_sink_active = true;
65 flight_address = tc.liftof_settings.data_publisher_settings.fc_pub_address.clone();
66 mbytes_per_file = tc.liftof_settings.data_publisher_settings.mbytes_per_file;
67 write_stream_path = tc.liftof_settings.data_publisher_settings.data_dir.clone();
68 write_stream = tc.write_data_to_disk;
69 send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
70 send_rbwaveform_packets = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
71 send_rbwf_every_x_event = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
72 only_save_interesting = tc.liftof_settings.event_builder_settings.only_save_interesting;
73 thr_n_hits_cbe = tc.liftof_settings.event_builder_settings.thr_n_hits_cbe.unwrap_or(0) ;
75 thr_n_hits_outer = tc.liftof_settings.event_builder_settings.thr_n_hits_outer.unwrap_or(0);
76 thr_tot_edep_outer = tc.liftof_settings.event_builder_settings.thr_tot_edep_outer.unwrap_or(0.0);
79 thr_tot_edep_cbe = tc.liftof_settings.event_builder_settings.thr_tot_edep_cbe.unwrap_or(0.0);
80 hb_interval = Duration::from_secs(tc.liftof_settings.data_publisher_settings.hb_send_interval as u64);
82 },
83 Err(err) => {
84 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
85 },
86 }
87
88 if send_rbwf_every_x_event == 0 {
89 error!("0 is not a reasonable value for send_rbwf_every_x_event!. We will switch of the sending of RBWaveforms instead!");
90 send_rbwaveform_packets = false;
91 }
92
93 let mut evid_check = Vec::<u32>::new();
94
95 let ctx = zmq::Context::new();
96 let data_socket = ctx.socket(zmq::PUB).expect("Can not create socket!");
98 let unlim : i32 = 1000000;
99 data_socket.set_sndhwm(unlim).unwrap();
100 match data_socket.bind(&flight_address) {
102 Err(err) => panic!("Can not bind to address {}! {}", flight_address, err),
107 Ok(_) => ()
108 }
109 info!("ZMQ PUB Socket for global data sink bound to {flight_address}");
110
111 let mut timer = Instant::now();
115 let mut check_settings_timer = Instant::now();
116
117 let mut writer : Option<TofPacketWriter> = None;
119 let mut runid : u32 = 0;
120 let mut new_run_start = false;
121 let mut retire = false;
122 let mut heartbeat = DataSinkHB::new();
123 let mut hb_timer = Instant::now();
124 let mut rbwf_ctr = 0u32;
125 loop {
126 if retire {
127 warn!("Will end data sink thread in 25 seconds!");
130 println!("= =>Will end data sink thread in 25 seconds!");
131 sleep(Duration::from_secs(25));
132 break;
133 }
134 if check_settings_timer.elapsed().as_secs_f32() > 1.5 {
138 match thread_control.try_lock() {
139 Ok(mut tc) => {
140 send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
141 send_rbwaveform_packets = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
142 if tc.stop_flag {
143 tc.thread_data_sink_active = false;
144 retire = true;
146 }
147 if tc.new_run_start_flag {
148 new_run_start = true;
149 write_stream = tc.write_data_to_disk;
150 write_stream_path = tc.liftof_settings.data_publisher_settings.data_dir.clone();
151 runid = tc.run_id;
152 write_stream_path += &(format!("/{}/", runid));
153 tc.new_run_start_flag = false;
154 }
155 },
156 Err(err) => {
157 error!("Can't acquire lock for ThreadControl! {err}");
158 },
159 }
160 check_settings_timer = Instant::now();
161 }
162 if write_stream && new_run_start {
163 let file_type = FileType::RunFile(runid as u32);
164 writer = Some(TofPacketWriter::new(write_stream_path.clone(), file_type));
166 writer.as_mut().unwrap().mbytes_per_file = mbytes_per_file as usize;
167 new_run_start = false;
168 } else if !write_stream {
169 writer = None;
170 }
171
172 match incoming.try_recv() {
173 Err(err) => trace!("No new packet, err {err}"),
174 Ok(pack) => {
175 debug!("Got new tof packet {}", pack.packet_type);
176 if writer.is_some() {
178 if !pack.no_write_to_disk {
179 writer.as_mut().unwrap().add_tof_packet(&pack);
180 heartbeat.n_pack_write_disk += 1;
181 heartbeat.n_bytes_written += pack.payload.len() as u64;
182 }
183 }
184 match data_socket.send(pack.to_bytestream(),0) {
185 Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
186 Ok(_) => {
187 trace!("TofPacket sent");
188 heartbeat.n_packets_sent += 1;
189 }
190 } }
192 }
193 match incoming_ev.recv() {
194 Err(err) => trace!("Unable to receive event packet! {err}"),
195 Ok(mut ev_) => {
196 ev_.calc_gcu_variables();
199 let mut write_to_disk = true;
200 if only_save_interesting {
201 if ev_.n_hits_cbe < thr_n_hits_cbe
203 && ev_.n_hits_cor + ev_.n_hits_umb < thr_n_hits_outer
204 && ev_.tot_edep_cbe < thr_tot_edep_cbe
207 && ev_.tot_edep_cor + ev_.tot_edep_umb < thr_tot_edep_outer {
208 write_to_disk = false;
210 }
211 }
212 let mut pack = ev_.pack();
213 if writer.is_some() && write_to_disk {
214 writer.as_mut().unwrap().add_tof_packet(&pack);
215 heartbeat.n_pack_write_disk += 1;
216 heartbeat.n_bytes_written += pack.payload.len() as u64;
217 }
218
219 if send_rbwaveform_packets {
220 if rbwf_ctr % send_rbwf_every_x_event == 0 {
221 for wf in ev_.get_waveforms() {
222 match data_socket.send(wf.pack().to_bytestream(),0) {
223 Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
224 Ok(_) => {
225 trace!("TofPacket sent");
226 heartbeat.n_packets_sent += 1;
227 }
228 } }
230 }
231 rbwf_ctr += 1;
232 }
233
234 ev_.move_hits();
236
237 ev_.strip_rbevents();
248 ev_.version = ProtocolVersion::V1;
249 pack = ev_.pack();
250 if send_tof_summary_packets {
251 match data_socket.send(pack.to_bytestream(),0) {
252 Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
253 Ok(_) => {
254 trace!("TofPacket sent");
255 heartbeat.n_packets_sent += 1;
256 }
257 } }
259 } } let evid_check_len = evid_check.len();
263 if timer.elapsed().as_secs() > 10 {
264 if evid_check_len > 0 {
266 let mut evid = evid_check[0];
267 for _ in 0..evid_check_len {
268 if !evid_check.contains(&evid) {
269 heartbeat.n_evid_missing += 1;
270 heartbeat.n_evid_chunksize = evid_check_len as u64;
271 }
272 evid += 1;
273 }
274 }
275 timer = Instant::now();
276 }
277 if hb_timer.elapsed() >= hb_interval {
278 heartbeat.met += hb_timer.elapsed().as_secs();
279
280 match data_socket.send(heartbeat.pack().to_bytestream(),0) {
281 Err(err) => error!("Not able to send heartbeat over 0MQ PUB! {err}"),
282 Ok(_) => {
283 trace!("Heartbeat sent");
284 }
285 }
286 evid_check.clear();
287 hb_timer = Instant::now();
288 }
289 } }