liftof_cc/threads/event_builder.rs
1//! The Heart of lfitof-cc. The event builder assembles all
2//! events coming from the Readoutboards in a single event
3
4use std::thread;
5use std::time::{
6 Instant,
7 Duration
8};
9
10use std::sync::{
11 Arc,
12 Mutex
13};
14
15use std::collections::VecDeque;
16use std::collections::HashMap;
17
18use crossbeam_channel::{
19 Receiver,
20 Sender,
21};
22
23//use tof_dataclasses::events::{
24// MasterTriggerEvent,
25// TofEvent,
26// RBEvent,
27// EventStatus,
28//};
29//
30//use tof_dataclasses::serialization::Packable;
31//use tof_dataclasses::packets::TofPacket;
32//use tof_dataclasses::commands::config::BuildStrategy;
33//use tof_dataclasses::heartbeats::EVTBLDRHeartbeat;
34//
35//use liftof_lib::settings::{
36// TofEventBuilderSettings,
37//};
38//use liftof_lib::thread_control::ThreadControl;
39
40use gondola_core::prelude::*;
41
42use crate::constants::EVENT_BUILDER_EVID_CACHE_SIZE;
43
44// just for debugging
45//use tof_dataclasses::io::{
46// TofPacketWriter,
47// FileType
48//};
49
50
51/// Events ... assemble!
52///
53/// The event_builder collects all available event information,
54/// beginning with the MasterTriggerEvent defining the event
55/// id. It collects the requested number of RBEvents.
56/// The final product then will be a TofEvent
57///
58/// The event_builder is the heart of this software and crucial
59/// to all operations.
60///
61/// # Arguments
62///
63/// * m_trig_ev : Receive a `MasterTriggerEvent` over this
64/// channel. The event will be either build
65/// immediatly, or cached.
66/// * ev_from_rb : Receive a number of `RBEvents` over this channel.
67/// The events here shall be associated with the
68/// MasterTriggerEvent
69/// * data_sink : Send assembled events (and everything else in
70/// the form of TofPackets to the data sink/
71/// * mtb_link_map : Map of MTB Link ID - RB ID. Maybe in the future
72/// RBs will know their link id themselves?
73/// This is currently only needed for the build strategy
74/// "AdaptiveThorough"
75/// * settings : Configure the event builder
76pub fn event_builder (m_trig_ev : &Receiver<TofEvent>,
77 ev_from_rb : &Receiver<RBEvent>,
78 data_sink : &Sender<TofPacket>,
79 mtb_link_map : HashMap<u8,u8>,
80 thread_control : Arc<Mutex<ThreadControl>>) {
81 // deleteme
82 //let file_type = FileType::RunFile(12345);
83 //let mut writer = TofPacketWriter::new(String::from("."), file_type);
84 //writer.mbytes_per_file = 420;
85
86
87 // set up the event builder. Since we are now doing settings only at run
88 // start, it is fine to do this outside of the loop
89 let mut send_tev_sum : bool;
90 let mut send_rbwaveform : bool;
91 let mut send_rbwf_freq : u32;
92 let mut rbwf_ctr = 0u64;
93 let mut settings : TofEventBuilderSettings;
94 let mut run_id : u32;
95 // this can block it is fine bc it is only
96 // happening once at init
97 let mut cali_active : bool;
98 loop {
99 match thread_control.lock() {
100 Ok(tc) => {
101 send_rbwaveform = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
102 send_tev_sum = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
103 send_rbwf_freq = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
104 settings = tc.liftof_settings.event_builder_settings.clone();
105 run_id = tc.run_id;
106 cali_active = tc.calibration_active;
107 }
108 Err(err) => {
109 error!("Can't acquire lock for ThreadControl! {err}");
110 error!("CRITICAL: Unable to configure event builder thread! Aborting!");
111 return;
112 }
113 }
114 if !cali_active {
115 break;
116 } else {
117 thread::sleep(Duration::from_secs(4));
118 }
119 }
120 info!("Will assign run id {} to events!", run_id);
121
122 // event caches for assembled events
123 let mut heartbeat = EventBuilderHB::new();
124 let mut event_cache = HashMap::<u32, TofEvent>::new();
125 let mut event_id_cache = VecDeque::<u32>::with_capacity(EVENT_BUILDER_EVID_CACHE_SIZE);
126 let mut n_received : usize;
127 let mut last_evid = 0;
128 let mut n_sent = 0usize;
129 // debug
130 let mut last_rb_evid : u32;
131 let mut n_rbe_per_te = 0usize;
132 let mut debug_timer = Instant::now();
133 let mut check_tc_update = Instant::now();
134 let daq_reset_cooldown = Instant::now();
135 let reset_daq_flag = false;
136 let mut retire = false;
137 let mut hb_timer = Instant::now();
138 let hb_interval = Duration::from_secs(settings.hb_send_interval as u64);
139
140 loop {
141 if check_tc_update.elapsed().as_secs() > 2 {
142 //println!("= => [evt_builder] checkling tc..");
143
144 let mut cali_still_active = false;
145 match thread_control.try_lock() {
146 Ok(mut tc) => {
147 if !tc.thread_event_bldr_active {
148 println!("= => [evt_builder] (thread_event_bldr_active == false) shutting down...");
149 break;
150 }
151 //println!("= => [evt_builder] {}", tc);
152 if tc.stop_flag {
153 // end myself
154 println!("= => [evt_builder] (stop_flag == true) shutting down...");
155 retire = true;
156 }
157 //println!("== ==> [evt_builder] tc lock acquired!");
158 if tc.calibration_active {
159 cali_still_active = true;
160 } else {
161 cali_still_active = false;
162 }
163 if daq_reset_cooldown.elapsed().as_secs_f32() > 120.0 && reset_daq_flag {
164 warn!("Resetttign MTB DAQ queue!");
165 tc.reset_mtb_daq = true;
166 }
167 },
168 Err(err) => {
169 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
170 },
171 }
172 check_tc_update = Instant::now();
173 if cali_still_active {
174 thread::sleep(Duration::from_secs(1));
175 continue;
176 }
177 }
178 if retire {
179 //thread::sleep(Duration::from_secs(2));
180 break;
181 }
182 n_received = 0;
183 while n_received < settings.n_mte_per_loop as usize {
184 // every iteration, we welcome a new master event
185 match m_trig_ev.try_recv() {
186 Err(_) => {
187 trace!("No new event ready yet!");
188 //n_receiving_errors += 1;
189 continue;
190 }
191 Ok(mt) => {
192 debug!("Received MasterTriggerEvent {}!", mt);
193 let mut event = TofEvent::from(mt);
194 event.run_id = run_id as u16; // FIXME - might be too big
195 if last_evid != 0 {
196 if event.event_id != last_evid + 1 {
197 if event.event_id > last_evid {
198 heartbeat.n_mte_skipped += (event.event_id - last_evid - 1) as u64;
199 }
200 }
201 }
202 last_evid = event.event_id;
203 event_cache.insert(last_evid, event);
204 // use this to keep track of the order
205 // of events
206 event_id_cache.push_back(last_evid);
207 n_received += 1;
208 heartbeat.n_mte_received_tot += 1;
209 }
210 } // end match Ok(mt)
211 } // end getting MTEvents
212 trace!("Debug timer MTE received! {:?}", debug_timer.elapsed());
213 // recycle that variable for the rb events as well
214 n_received = 0;
215 // The second receiver gets RBEvents from all ReadoutBoards. ReadoutBoard events are
216 // NOT cached by design. The assumption here is that due to caching on the RBs and the
217 // longer pathway (harting cable + ethernet cables) and DRS and user time, RBEvents are
218 // ALWAYS later than the MTEvents.
219 'main: while !ev_from_rb.is_empty() && n_received < settings.n_rbe_per_loop as usize {
220 match ev_from_rb.try_recv() {
221 Err(err) => {
222 error!("Can't receive RBEvent! Err {err}");
223 },
224 Ok(rb_ev) => {
225 heartbeat.n_rbe_received_tot += 1;
226 n_received += 1;
227 //match seen_rbevents.get_mut(&rb_ev.header.rb_id) {
228 // Some(value) => {
229 // *value += 1;
230 // }
231 // None => {
232 // warn!("Unable to do bookkeeping for RB {}", rb_ev.header.rb_id);
233 // }
234 //}
235 //iter_ev = 0;
236 last_rb_evid = rb_ev.header.event_id;
237 // try to asscociate the rb events with the mtb events
238 // the event ids from the RBEvents have to be in the
239 // range of the MTB Event
240 // The event_id_cache is sorted, that is why it works
241 if last_rb_evid < event_id_cache[0] {
242 // this is the first check. If this fails, then the event is for
243 // sure not in the event_cache and we can dismiss it right away,
244 // knowing that it is from the past
245 n_received -= 1;
246 debug!("The received RBEvent {} is from the ancient past! Currently, we don't have a way to deal with that and this event will be DISCARDED! The RBEvent queue will be re-synchronized...", last_rb_evid);
247 heartbeat.n_rbe_discarded_tot += 1;
248 heartbeat.n_rbe_from_past += 1;
249 //*too_early_rbevents.get_mut(&rb_ev.header.rb_id).unwrap() += 1;
250 continue;
251 }
252 // Now try to get the master trigger event for
253 // this RBEvent
254 match event_cache.get_mut(&last_rb_evid) {
255 None => {
256 // This means that either we dropped the MTB event,
257 // or the RBEvent is from the future
258 if last_rb_evid < *event_id_cache.back().unwrap() {
259 // we know that this is neither too late nor too early!
260 heartbeat.rbe_wo_mte += 1;
261 }
262 heartbeat.n_rbe_discarded_tot += 1;
263 heartbeat.n_rbe_orphan += 1;
264 let delta_evid = last_rb_evid - *event_id_cache.back().unwrap();
265 debug!("We can't associate event id {} from RB {} with a MTEvent in range {} .. {}. It is {} event ids ahead !", last_rb_evid, rb_ev.header.rb_id, event_id_cache[0], event_id_cache.back().unwrap(), delta_evid);
266 debug!("{}", rb_ev);
267 //let orphan_pack = rb_ev.pack();
268 //writer.add_tof_packet(&orphan_pack);
269 continue 'main;
270 },
271 Some(ev) => {
272 if settings.build_strategy == BuildStrategy::AdaptiveThorough {
273 match mtb_link_map.get(&rb_ev.header.rb_id) {
274 None => {
275 error!("Don't know MTB Link ID for {}", rb_ev.header.rb_id);
276 error!("This RBEvent gets discarded!");
277 }
278 Some(link_id) => {
279 if ev.get_rb_link_ids().contains(link_id) {
280 ev.rb_events.push(rb_ev);
281 } else {
282 error!("RBEvent {} has the same event id, but does not show up in MTB Link ID mask!", rb_ev);
283 }
284 }
285 }
286 } else {
287 // Just ad it without questioning
288 ev.rb_events.push(rb_ev);
289 //println!("[EVTBUILDER] DEBUG n rb expected : {}, n rbs {}",ev.mt_event.get_n_rbs_expected(), ev.rb_events.len());
290 }
291 //break;
292 }
293 }
294 }
295 }
296 }
297 // FIXME - timing debugging
298 let debug_timer_elapsed = debug_timer.elapsed().as_secs_f64();
299 //println!("Debug timer elapsed {}", debug_timer_elapsed);
300 if debug_timer_elapsed > 35.0 {
301 debug_timer = Instant::now();
302 }
303 trace!("Debug timer RBE received! {:?}", debug_timer.elapsed());
304
305 // -----------------------------------------------------
306 // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
307 // This concludes the actually "event building" part
308 // -----------------------------------------------------
309
310 let av_rb_ev = n_rbe_per_te as f64 / n_sent as f64;
311 if settings.build_strategy == BuildStrategy::Adaptive ||
312 settings.build_strategy == BuildStrategy::AdaptiveThorough {
313 settings.n_rbe_per_loop = av_rb_ev.ceil() as u32;
314 // if the rb in the pipeline get too long, catch up
315 // and drain it a bit
316 if ev_from_rb.len() > 1000 {
317 settings.n_rbe_per_loop = ev_from_rb.len() as u32 - 500;
318 }
319 if settings.n_rbe_per_loop == 0 {
320 // failsafe
321 settings.n_rbe_per_loop = 40;
322 }
323 }
324 if let BuildStrategy::AdaptiveGreedy = settings.build_strategy {
325 settings.n_rbe_per_loop = av_rb_ev.ceil() as u32 + settings.greediness as u32;
326 if settings.n_rbe_per_loop == 0 {
327 // failsafe
328 settings.n_rbe_per_loop = 40;
329 }
330 }
331 heartbeat.n_rbe_per_loop = settings.n_rbe_per_loop as u64;
332
333 //-----------------------------------------
334 // From here on, we are checking all events
335 // in the cache, deciding which ones are
336 // ready to be passed on
337 // ----------------------------------------
338
339 let mut prior_ev_sent = 0u32;
340 let mut first_ev_sent = false;
341
342 for idx in 0..event_id_cache.len() {
343 // if there wasn't a first element, size would be 0
344 let evid = event_id_cache.pop_front().unwrap();
345 match event_cache.get(&evid) {
346 None => {
347 error!("Event id and event caches are misaligned for event id {}, idx {} and sizes {} {}! This is BAD and most likely a BUG!", evid, idx, event_cache.len(), event_id_cache.len());
348 continue;
349 },
350 Some(ev) => {
351 let ev_timed_out = ev.age() >= settings.te_timeout_sec as u64;
352 // timed out events should be sent in any case
353 let mut ready_to_send = ev_timed_out;
354 if ev_timed_out {
355 if !ev.is_complete() {
356 heartbeat.n_timed_out += 1;
357 }
358 } else {
359 // we are earlier than our time out, maybe the
360 // event is already complete
361 match settings.build_strategy {
362 BuildStrategy::WaitForNBoards => {
363 // we will always wait for the expected number of boards,
364 // except the event times out
365 if ev.rb_events.len() as u8 == settings.wait_nrb {
366 ready_to_send = true;
367 } // else ready_to_send is still false
368 },
369 BuildStrategy::Adaptive
370 | BuildStrategy::AdaptiveThorough
371 | BuildStrategy::AdaptiveGreedy
372 | BuildStrategy::Smart
373 | BuildStrategy::Unknown => {
374 if ev.is_complete() {
375 ready_to_send = true;
376 }
377 }
378 }
379 }
380 // this feature tries to sort the events which are getting sent
381 // by id. This might lead to timed out events and more resources needed
382 if settings.sort_events {
383 if ready_to_send && !ev_timed_out {
384 if idx == 0 {
385 first_ev_sent = true;
386 prior_ev_sent = ev.event_id;
387 } else {
388 if idx == 1 {
389 if !first_ev_sent {
390 // we wait and check the others too see if something
391 // else timed out
392 ready_to_send = false;
393 }
394 }
395 if ev.event_id != (prior_ev_sent + 1) {
396 // we wait and check the others too see if something
397 // else timed out
398 ready_to_send = false;
399 }
400 prior_ev_sent = ev.event_id;
401 }
402 }
403 }
404
405 if ready_to_send {
406 // if we don't cache it, we have to send it.
407 //let ev_to_send = ev.clone();
408 // so the idea here is that this happens way less
409 // often (a few times per main loop iteration)
410 // than the cache it case, so we rather do something
411 // here even if it might require re-allocating memory
412 // we should have an eye on performance though
413 //idx_to_remove.push(idx);
414 let mut ev_to_send = event_cache.remove(&evid).unwrap();
415 if ev_timed_out {
416 ev_to_send.status = EventStatus::EventTimeOut;
417 }
418 // update event status, so that we will also see in an
419 // (optionally) produced tof event summary if the
420 // event has isuses
421 n_rbe_per_te += ev_to_send.rb_events.len();
422 if ev_to_send.has_any_mangling() {
423 heartbeat.data_mangled_ev += 1;
424 }
425 // sum up lost hits due to drs4 deadtime
426 heartbeat.drs_bsy_lost_hg_hits += ev_to_send.get_lost_hits() as u64;
427
428 let mut save_to_disk = true;
429 n_sent += 1;
430 heartbeat.n_sent += 1;
431 if send_tev_sum {
432 //let tes = ev_to_send.get_summary();
433 // FIXME - these might be all zero!
434 if settings.only_save_interesting {
435 save_to_disk = false;
436 if ev_to_send.n_hits_umb >= settings.thr_n_hits_umb
437 && ev_to_send.n_hits_cbe >= settings.thr_n_hits_cbe
438 && ev_to_send.n_hits_cor >= settings.thr_n_hits_cor
439 && ev_to_send.tot_edep_umb >= settings.thr_tot_edep_umb
440 && ev_to_send.tot_edep_cbe >= settings.thr_tot_edep_cbe
441 && ev_to_send.tot_edep_cor >= settings.thr_tot_edep_cor {
442 save_to_disk = true;
443 }
444 }
445 let pack = ev_to_send.pack();
446 match data_sink.send(pack) {
447 Err(err) => {
448 error!("Packet sending failed! Err {}", err);
449 }
450 Ok(_) => {
451 debug!("Event with id {} sent!", evid);
452 }
453 }
454 }
455
456 //if
457 if send_rbwaveform {
458 if rbwf_ctr == send_rbwf_freq as u64 {
459 for wf in ev_to_send.get_waveforms() {
460 let pack = wf.pack();
461 match data_sink.send(pack) {
462 Err(err) => {
463 error!("Packet sending failed! Err {}", err);
464 }
465 Ok(_) => {
466 debug!("Event with id {} sent!", evid);
467 }
468 }
469 }
470 rbwf_ctr = 0;
471 }
472 rbwf_ctr += 1; // increase for every event, not wf
473 }
474
475 // always sent TofEvents, so they get written to disk.
476 // There is one exception though, in case we have
477 // "interesting" event cuts in place, then this can
478 // be restricted.
479 if save_to_disk {
480 let pack = ev_to_send.pack();
481 match data_sink.send(pack) {
482 Err(err) => {
483 error!("Packet sending failed! Err {}", err);
484 }
485 Ok(_) => {
486 debug!("Event with id {} sent!", evid);
487 }
488 }
489 }
490 // this happens when we are NOT ready to send -> cache it!
491 } else {
492 event_id_cache.push_front(evid);
493 }
494 }
495 }
496 } // end loop over event_id_cache
497 if hb_timer.elapsed() >= hb_interval {
498 // make sure the heartbeat has the latest mission elapsed time
499 heartbeat.met_seconds += hb_timer.elapsed().as_secs_f64() as u64;// as usize;
500 // get the length of the various caches at this moment in time
501 heartbeat.event_cache_size = event_cache.len() as u64;
502 heartbeat.event_id_cache_size = event_id_cache.len() as u64;
503 heartbeat.mte_receiver_cbc_len = m_trig_ev.len() as u64;
504 heartbeat.rbe_receiver_cbc_len = ev_from_rb.len() as u64;
505 heartbeat.tp_sender_cbc_len = data_sink.len() as u64;
506
507 let pack = heartbeat.pack();
508 match data_sink.send(pack) {
509 Err(err) => {
510 error!("Packet sending failed! Err {}", err);
511 }
512 Ok(_) => {
513 }
514 }
515 hb_timer = Instant::now();
516 }
517 } // end loop
518}
519