liftof_cc/threads/
event_builder.rs1use std::thread;
5use std::time::{
6 Instant,
7 Duration
8};
9
10use std::sync::{
11 Arc,
12 Mutex
13};
14
15use std::collections::VecDeque;
16use std::collections::HashMap;
17
18use crossbeam_channel::{
19 Receiver,
20 Sender,
21};
22
23use tof_dataclasses::events::{
24 MasterTriggerEvent,
25 TofEvent,
26 RBEvent,
27 EventStatus,
28};
29
30use tof_dataclasses::serialization::Packable;
31use tof_dataclasses::packets::TofPacket;
32use tof_dataclasses::commands::config::BuildStrategy;
33use tof_dataclasses::heartbeats::EVTBLDRHeartbeat;
34
35use liftof_lib::settings::{
36 TofEventBuilderSettings,
37};
38use liftof_lib::thread_control::ThreadControl;
39
40use crate::constants::EVENT_BUILDER_EVID_CACHE_SIZE;
41
42pub fn event_builder (m_trig_ev : &Receiver<MasterTriggerEvent>,
75 ev_from_rb : &Receiver<RBEvent>,
76 data_sink : &Sender<TofPacket>,
77 mtb_link_map : HashMap<u8,u8>,
78 thread_control : Arc<Mutex<ThreadControl>>) {
79 let mut send_tev_sum : bool;
88 let mut send_rbwaveform : bool;
89 let mut send_rbwf_freq : u32;
90 let mut rbwf_ctr = 0u64;
91 let mut settings : TofEventBuilderSettings;
92 let mut run_id : u32;
93 let mut cali_active : bool;
96 loop {
97 match thread_control.lock() {
98 Ok(tc) => {
99 send_rbwaveform = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
100 send_tev_sum = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
101 send_rbwf_freq = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
102 settings = tc.liftof_settings.event_builder_settings.clone();
103 run_id = tc.run_id;
104 cali_active = tc.calibration_active;
105 }
106 Err(err) => {
107 error!("Can't acquire lock for ThreadControl! {err}");
108 error!("CRITICAL: Unable to configure event builder thread! Aborting!");
109 return;
110 }
111 }
112 if !cali_active {
113 break;
114 } else {
115 thread::sleep(Duration::from_secs(4));
116 }
117 }
118 info!("Will assign run id {} to events!", run_id);
119
120 let mut heartbeat = EVTBLDRHeartbeat::new();
122 let mut event_cache = HashMap::<u32, TofEvent>::new();
123 let mut event_id_cache = VecDeque::<u32>::with_capacity(EVENT_BUILDER_EVID_CACHE_SIZE);
124 let mut n_received : usize;
125 let mut last_evid = 0;
126 let mut n_sent = 0usize;
127 let mut last_rb_evid : u32;
129 let mut n_rbe_per_te = 0usize;
130 let mut debug_timer = Instant::now();
131 let mut check_tc_update = Instant::now();
132 let daq_reset_cooldown = Instant::now();
133 let reset_daq_flag = false;
134 let mut retire = false;
135 let mut hb_timer = Instant::now();
136 let hb_interval = Duration::from_secs(settings.hb_send_interval as u64);
137
138 loop {
139 if check_tc_update.elapsed().as_secs() > 2 {
140 let mut cali_still_active = false;
143 match thread_control.try_lock() {
144 Ok(mut tc) => {
145 if tc.thread_event_bldr_active {
146 println!("= => [evt_builder] shutting down...");
147 break;
148 }
149 if tc.stop_flag {
151 println!("= => [evt_builder] shutting down...");
153 retire = true;
154 }
155 if tc.calibration_active {
157 cali_still_active = true;
158 } else {
159 cali_still_active = false;
160 }
161 if daq_reset_cooldown.elapsed().as_secs_f32() > 120.0 && reset_daq_flag {
162 warn!("Resetttign MTB DAQ queue!");
163 tc.reset_mtb_daq = true;
164 }
165 },
166 Err(err) => {
167 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
168 },
169 }
170 check_tc_update = Instant::now();
171 if cali_still_active {
172 thread::sleep(Duration::from_secs(1));
173 continue;
174 }
175 }
176 if retire {
177 break;
179 }
180 n_received = 0;
181 while n_received < settings.n_mte_per_loop as usize {
182 match m_trig_ev.try_recv() {
184 Err(_) => {
185 trace!("No new event ready yet!");
186 continue;
188 }
189 Ok(mt) => {
190 debug!("Received MasterTriggerEvent {}!", mt);
191 let mut event = TofEvent::from(mt);
192 event.header.run_id = run_id;
193 if last_evid != 0 {
194 if event.mt_event.event_id != last_evid + 1 {
195 if event.mt_event.event_id > last_evid {
196 heartbeat.n_mte_skipped += (event.mt_event.event_id - last_evid - 1) as usize;
197 }
198 }
199 }
200 last_evid = event.mt_event.event_id;
201 event_cache.insert(last_evid, event);
202 event_id_cache.push_back(last_evid);
205 n_received += 1;
206 heartbeat.n_mte_received_tot += 1;
207 }
208 } } trace!("Debug timer MTE received! {:?}", debug_timer.elapsed());
211 n_received = 0;
213 'main: while !ev_from_rb.is_empty() && n_received < settings.n_rbe_per_loop as usize {
218 match ev_from_rb.try_recv() {
219 Err(err) => {
220 error!("Can't receive RBEvent! Err {err}");
221 },
222 Ok(rb_ev) => {
223 heartbeat.n_rbe_received_tot += 1;
224 n_received += 1;
225 last_rb_evid = rb_ev.header.event_id;
235 if last_rb_evid < event_id_cache[0] {
240 n_received -= 1;
244 debug!("The received RBEvent {} is from the ancient past! Currently, we don't have a way to deal with that and this event will be DISCARDED! The RBEvent queue will be re-synchronized...", last_rb_evid);
245 heartbeat.n_rbe_discarded_tot += 1;
246 heartbeat.n_rbe_from_past += 1;
247 continue;
249 }
250 match event_cache.get_mut(&last_rb_evid) {
253 None => {
254 if last_rb_evid < *event_id_cache.back().unwrap() {
257 heartbeat.rbe_wo_mte += 1;
259 }
260 heartbeat.n_rbe_discarded_tot += 1;
261 heartbeat.n_rbe_orphan += 1;
262 let delta_evid = last_rb_evid - *event_id_cache.back().unwrap();
263 debug!("We can't associate event id {} from RB {} with a MTEvent in range {} .. {}. It is {} event ids ahead !", last_rb_evid, rb_ev.header.rb_id, event_id_cache[0], event_id_cache.back().unwrap(), delta_evid);
264 debug!("{}", rb_ev);
265 continue 'main;
268 },
269 Some(ev) => {
270 if settings.build_strategy == BuildStrategy::AdaptiveThorough {
271 match mtb_link_map.get(&rb_ev.header.rb_id) {
272 None => {
273 error!("Don't know MTB Link ID for {}", rb_ev.header.rb_id);
274 error!("This RBEvent gets discarded!");
275 }
276 Some(link_id) => {
277 if ev.mt_event.get_rb_link_ids().contains(link_id) {
278 ev.rb_events.push(rb_ev);
279 } else {
280 error!("MT Event {}", ev.mt_event);
281 error!("RBEvent {} has the same event id, but does not show up in MTB Link ID mask!", rb_ev);
282 }
283 }
284 }
285 } else {
286 ev.rb_events.push(rb_ev);
288 }
290 }
292 }
293 }
294 }
295 }
296 let debug_timer_elapsed = debug_timer.elapsed().as_secs_f64();
298 if debug_timer_elapsed > 35.0 {
300 debug_timer = Instant::now();
301 }
302 trace!("Debug timer RBE received! {:?}", debug_timer.elapsed());
303
304 let av_rb_ev = n_rbe_per_te as f64 / n_sent as f64;
310 if settings.build_strategy == BuildStrategy::Adaptive ||
311 settings.build_strategy == BuildStrategy::AdaptiveThorough {
312 settings.n_rbe_per_loop = av_rb_ev.ceil() as u32;
313 if ev_from_rb.len() > 1000 {
316 settings.n_rbe_per_loop = ev_from_rb.len() as u32 - 500;
317 }
318 if settings.n_rbe_per_loop == 0 {
319 settings.n_rbe_per_loop = 40;
321 }
322 }
323 if let BuildStrategy::AdaptiveGreedy = settings.build_strategy {
324 settings.n_rbe_per_loop = av_rb_ev.ceil() as u32 + settings.greediness as u32;
325 if settings.n_rbe_per_loop == 0 {
326 settings.n_rbe_per_loop = 40;
328 }
329 }
330 heartbeat.n_rbe_per_loop = settings.n_rbe_per_loop as usize;
331
332 let mut prior_ev_sent = 0u32;
339 let mut first_ev_sent = false;
340
341 for idx in 0..event_id_cache.len() {
342 let evid = event_id_cache.pop_front().unwrap();
344 match event_cache.get(&evid) {
345 None => {
346 error!("Event id and event caches are misaligned for event id {}, idx {} and sizes {} {}! This is BAD and most likely a BUG!", evid, idx, event_cache.len(), event_id_cache.len());
347 continue;
348 },
349 Some(ev) => {
350 let ev_timed_out = ev.age() >= settings.te_timeout_sec as u64;
351 let mut ready_to_send = ev_timed_out;
353 if ev_timed_out {
354 if !ev.is_complete() {
355 heartbeat.n_timed_out += 1;
356 }
357 } else {
358 match settings.build_strategy {
361 BuildStrategy::WaitForNBoards => {
362 if ev.rb_events.len() as u8 == settings.wait_nrb {
365 ready_to_send = true;
366 } },
368 BuildStrategy::Adaptive
369 | BuildStrategy::AdaptiveThorough
370 | BuildStrategy::AdaptiveGreedy
371 | BuildStrategy::Smart
372 | BuildStrategy::Unknown => {
373 if ev.is_complete() {
374 ready_to_send = true;
375 }
376 }
377 }
378 }
379 if settings.sort_events {
382 if ready_to_send && !ev_timed_out {
383 if idx == 0 {
384 first_ev_sent = true;
385 prior_ev_sent = ev.header.event_id;
386 } else {
387 if idx == 1 {
388 if !first_ev_sent {
389 ready_to_send = false;
392 }
393 }
394 if ev.header.event_id != (prior_ev_sent + 1) {
395 ready_to_send = false;
398 }
399 prior_ev_sent = ev.header.event_id;
400 }
401 }
402 }
403
404 if ready_to_send {
405 let mut ev_to_send = event_cache.remove(&evid).unwrap();
414 if ev_timed_out {
415 ev_to_send.mt_event.event_status = EventStatus::EventTimeOut;
416 }
417 n_rbe_per_te += ev_to_send.rb_events.len();
421 if ev_to_send.has_any_mangling() {
422 heartbeat.data_mangled_ev += 1;
423 }
424 heartbeat.drs_bsy_lost_hg_hits += ev_to_send.get_lost_hits() as usize;
426
427 let mut save_to_disk = true;
428 n_sent += 1;
429 heartbeat.n_sent += 1;
430 if send_tev_sum {
431 let tes = ev_to_send.get_summary();
432 if settings.only_save_interesting {
433 save_to_disk = false;
434 if tes.n_hits_umb >= settings.thr_n_hits_umb
435 && tes.n_hits_cbe >= settings.thr_n_hits_cbe
436 && tes.n_hits_cor >= settings.thr_n_hits_cor
437 && tes.tot_edep_umb >= settings.thr_tot_edep_umb
438 && tes.tot_edep_cbe >= settings.thr_tot_edep_cbe
439 && tes.tot_edep_cor >= settings.thr_tot_edep_cor {
440 save_to_disk = true;
441 }
442 }
443 let pack = tes.pack();
444 match data_sink.send(pack) {
445 Err(err) => {
446 error!("Packet sending failed! Err {}", err);
447 }
448 Ok(_) => {
449 debug!("Event with id {} sent!", evid);
450 }
451 }
452 }
453
454 if send_rbwaveform {
456 if rbwf_ctr == send_rbwf_freq as u64 {
457 for wf in ev_to_send.get_rbwaveforms() {
458 let pack = wf.pack();
459 match data_sink.send(pack) {
460 Err(err) => {
461 error!("Packet sending failed! Err {}", err);
462 }
463 Ok(_) => {
464 debug!("Event with id {} sent!", evid);
465 }
466 }
467 }
468 rbwf_ctr = 0;
469 }
470 rbwf_ctr += 1; }
472
473 if save_to_disk {
478 let pack = ev_to_send.pack();
479 match data_sink.send(pack) {
480 Err(err) => {
481 error!("Packet sending failed! Err {}", err);
482 }
483 Ok(_) => {
484 debug!("Event with id {} sent!", evid);
485 }
486 }
487 }
488 } else {
490 event_id_cache.push_front(evid);
491 }
492 }
493 }
494 } if hb_timer.elapsed() >= hb_interval {
496 heartbeat.met_seconds += hb_timer.elapsed().as_secs_f64() as usize;
498 heartbeat.event_cache_size = event_cache.len();
500 heartbeat.event_id_cache_size = event_id_cache.len();
501 heartbeat.mte_receiver_cbc_len = m_trig_ev.len();
502 heartbeat.rbe_receiver_cbc_len = ev_from_rb.len();
503 heartbeat.tp_sender_cbc_len = data_sink.len();
504
505 let pack = heartbeat.pack();
506 match data_sink.send(pack) {
507 Err(err) => {
508 error!("Packet sending failed! Err {}", err);
509 }
510 Ok(_) => {
511 }
512 }
513 hb_timer = Instant::now();
514 }
515 } }
517