liftof_cc/threads/
global_data_sink.rs1use std::time::{
11 Instant,
12 Duration,
13};
14use std::thread::sleep;
15use std::sync::{
16 Arc,
17 Mutex,
18};
19
20use crossbeam_channel::Receiver;
21
22use gondola_core::prelude::*;
23
24pub fn global_data_sink(incoming : &Receiver<TofPacket>,
37 incoming_ev : &Receiver<TofEvent>,
38 thread_control : Arc<Mutex<ThreadControl>>) {
39 sleep(Duration::from_secs(10));
42 let mut flight_address = String::from("");
43 let mut mbytes_per_file = 420usize;
44 let mut write_stream_path = String::from("");
45 let mut send_tof_summary_packets = false;
46 let mut send_rbwaveform_packets = false;
47 let mut write_stream = false;
49 let mut send_rbwf_every_x_event = 1;
50 let mut hb_interval = Duration::from_secs(20u64);
52 let mut only_save_interesting = false;
53 let mut thr_n_hits_cbe = 0u8;
55 let mut thr_n_hits_outer = 0u8;
56 let mut thr_tot_edep_cbe = 0.0f32;
59 let mut thr_tot_edep_outer = 0.0f32;
60 match thread_control.lock() {
62 Ok(mut tc) => {
63 tc.thread_data_sink_active = true;
64 flight_address = tc.liftof_settings.data_publisher_settings.fc_pub_address.clone();
65 mbytes_per_file = tc.liftof_settings.data_publisher_settings.mbytes_per_file;
66 write_stream_path = tc.liftof_settings.data_publisher_settings.data_dir.clone();
67 write_stream = tc.write_data_to_disk;
68 send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
69 send_rbwaveform_packets = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
70 send_rbwf_every_x_event = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
71 only_save_interesting = tc.liftof_settings.event_builder_settings.only_save_interesting;
72 thr_n_hits_cbe = tc.liftof_settings.event_builder_settings.thr_n_hits_cbe.unwrap_or(0) ;
74 thr_n_hits_outer = tc.liftof_settings.event_builder_settings.thr_n_hits_outer.unwrap_or(0);
75 thr_tot_edep_outer = tc.liftof_settings.event_builder_settings.thr_tot_edep_outer.unwrap_or(0.0);
78 thr_tot_edep_cbe = tc.liftof_settings.event_builder_settings.thr_tot_edep_cbe.unwrap_or(0.0);
79 hb_interval = Duration::from_secs(tc.liftof_settings.data_publisher_settings.hb_send_interval as u64);
81 },
82 Err(err) => {
83 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
84 },
85 }
86
87 if send_rbwf_every_x_event == 0 {
88 error!("0 is not a reasonable value for send_rbwf_every_x_event!. We will switch of the sending of RBWaveforms instead!");
89 send_rbwaveform_packets = false;
90 }
91
92 let mut evid_check = Vec::<u32>::new();
93
94 let ctx = zmq::Context::new();
95 let data_socket = ctx.socket(zmq::PUB).expect("Can not create socket!");
97 let unlim : i32 = 1000000;
98 data_socket.set_sndhwm(unlim).unwrap();
99 match data_socket.bind(&flight_address) {
101 Err(err) => panic!("Can not bind to address {}! {}", flight_address, err),
106 Ok(_) => ()
107 }
108 info!("ZMQ PUB Socket for global data sink bound to {flight_address}");
109
110 let mut timer = Instant::now();
114 let mut check_settings_timer = Instant::now();
115
116 let mut writer : Option<TofPacketWriter> = None;
118 let mut runid : u32 = 0;
119 let mut new_run_start = false;
120 let mut retire = false;
121 let mut heartbeat = DataSinkHB::new();
122 let mut hb_timer = Instant::now();
123 let mut rbwf_ctr = 0u32;
124 loop {
125 if retire {
126 warn!("Will end data sink thread in 25 seconds!");
129 println!("= =>Will end data sink thread in 25 seconds!");
130 sleep(Duration::from_secs(25));
131 break;
132 }
133 if check_settings_timer.elapsed().as_secs_f32() > 1.5 {
137 match thread_control.try_lock() {
138 Ok(mut tc) => {
139 send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
140 send_rbwaveform_packets = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
141 if tc.stop_flag {
142 tc.thread_data_sink_active = false;
143 retire = true;
145 }
146 if tc.new_run_start_flag {
147 new_run_start = true;
148 write_stream = tc.write_data_to_disk;
149 write_stream_path = tc.liftof_settings.data_publisher_settings.data_dir.clone();
150 runid = tc.run_id;
151 write_stream_path += &(format!("/{}/", runid));
152 tc.new_run_start_flag = false;
153 }
154 },
155 Err(err) => {
156 error!("Can't acquire lock for ThreadControl! {err}");
157 },
158 }
159 check_settings_timer = Instant::now();
160 }
161 if write_stream && new_run_start {
162 let file_type = FileType::RunFile(runid as u32);
163 writer = Some(TofPacketWriter::new(write_stream_path.clone(), file_type));
165 writer.as_mut().unwrap().mbytes_per_file = mbytes_per_file as usize;
166 new_run_start = false;
167 } else if !write_stream {
168 writer = None;
169 }
170
171 match incoming.try_recv() {
172 Err(err) => trace!("No new packet, err {err}"),
173 Ok(pack) => {
174 debug!("Got new tof packet {}", pack.packet_type);
175 if writer.is_some() {
177 if !pack.no_write_to_disk {
178 writer.as_mut().unwrap().add_tof_packet(&pack);
179 heartbeat.n_pack_write_disk += 1;
180 heartbeat.n_bytes_written += pack.payload.len() as u64;
181 }
182 }
183 match data_socket.send(pack.to_bytestream(),0) {
184 Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
185 Ok(_) => {
186 trace!("TofPacket sent");
187 heartbeat.n_packets_sent += 1;
188 }
189 } }
191 }
192 match incoming_ev.recv() {
193 Err(err) => trace!("Unable to receive event packet! {err}"),
194 Ok(mut ev_) => {
195 ev_.calc_gcu_variables();
198 let mut write_to_disk = true;
199 if only_save_interesting {
200 if ev_.n_hits_cbe < thr_n_hits_cbe
202 && ev_.n_hits_cor + ev_.n_hits_umb < thr_n_hits_outer
203 && ev_.tot_edep_cbe < thr_tot_edep_cbe
206 && ev_.tot_edep_cor + ev_.tot_edep_umb < thr_tot_edep_outer {
207 write_to_disk = false;
209 }
210 }
211 let mut pack = ev_.pack();
212 if writer.is_some() && write_to_disk {
213 writer.as_mut().unwrap().add_tof_packet(&pack);
214 heartbeat.n_pack_write_disk += 1;
215 heartbeat.n_bytes_written += pack.payload.len() as u64;
216 }
217
218 if send_rbwaveform_packets {
219 if rbwf_ctr % send_rbwf_every_x_event == 0 {
220 for wf in ev_.get_waveforms() {
221 match data_socket.send(wf.pack().to_bytestream(),0) {
222 Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
223 Ok(_) => {
224 trace!("TofPacket sent");
225 heartbeat.n_packets_sent += 1;
226 }
227 } }
229 }
230 rbwf_ctr += 1;
231 }
232
233 ev_.move_hits();
235
236 ev_.strip_rbevents();
247 ev_.version = ProtocolVersion::V1;
248 pack = ev_.pack();
249 if send_tof_summary_packets {
250 match data_socket.send(pack.to_bytestream(),0) {
251 Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
252 Ok(_) => {
253 trace!("TofPacket sent");
254 heartbeat.n_packets_sent += 1;
255 }
256 } }
258 } } let evid_check_len = evid_check.len();
262 if timer.elapsed().as_secs() > 10 {
263 if evid_check_len > 0 {
265 let mut evid = evid_check[0];
266 for _ in 0..evid_check_len {
267 if !evid_check.contains(&evid) {
268 heartbeat.n_evid_missing += 1;
269 heartbeat.n_evid_chunksize = evid_check_len as u64;
270 }
271 evid += 1;
272 }
273 }
274 timer = Instant::now();
275 }
276 if hb_timer.elapsed() >= hb_interval {
277 heartbeat.met += hb_timer.elapsed().as_secs();
278
279 match data_socket.send(heartbeat.pack().to_bytestream(),0) {
280 Err(err) => error!("Not able to send heartbeat over 0MQ PUB! {err}"),
281 Ok(_) => {
282 trace!("Heartbeat sent");
283 }
284 }
285 evid_check.clear();
286 hb_timer = Instant::now();
287 }
288 } }