liftof_cc/threads/
event_builder.rs

1//! The Heart of lfitof-cc. The event builder assembles all 
2//! events coming from the Readoutboards in a single event
3
4use std::thread;
5use std::time::{
6  Instant,
7  Duration
8};
9
10use std::sync::{
11  Arc,
12  Mutex
13};
14
15use std::collections::VecDeque;
16use std::collections::HashMap;
17
18use crossbeam_channel::{
19  Receiver,
20  Sender,
21};
22
23use tof_dataclasses::events::{
24  MasterTriggerEvent,
25  TofEvent,
26  RBEvent,
27  EventStatus,
28};
29
30use tof_dataclasses::serialization::Packable;
31use tof_dataclasses::packets::TofPacket;
32use tof_dataclasses::commands::config::BuildStrategy;
33use tof_dataclasses::heartbeats::EVTBLDRHeartbeat;
34
35use liftof_lib::settings::{
36  TofEventBuilderSettings,
37};
38use liftof_lib::thread_control::ThreadControl;
39
40use crate::constants::EVENT_BUILDER_EVID_CACHE_SIZE;
41
42// just for debugging
43//use tof_dataclasses::io::{
44//  TofPacketWriter,
45//  FileType
46//};
47
48
49/// Events ... assemble! 
50///
51/// The event_builder collects all available event information,
52/// beginning with the MasterTriggerEvent defining the event 
53/// id. It collects the requested number of RBEvents.
54/// The final product then will be a TofEvent
55///
56/// The event_builder is the heart of this software and crucial
57/// to all operations.
58///
59/// # Arguments
60///
61/// * m_trig_ev      : Receive a `MasterTriggerEvent` over this 
62///                    channel. The event will be either build 
63///                    immediatly, or cached. 
64/// * ev_from_rb     : Receive a number of `RBEvents` over this channel.
65///                    The events here shall be associated with the 
66///                    MasterTriggerEvent
67/// * data_sink      : Send assembled events (and everything else in 
68///                    the form of TofPackets to the data sink/
69/// * mtb_link_map   : Map of MTB Link ID - RB ID. Maybe in the future
70///                    RBs will know their link id themselves?
71///                    This is currently only needed for the build strategy
72///                    "AdaptiveThorough"
73/// * settings       : Configure the event builder
74pub fn event_builder (m_trig_ev      : &Receiver<MasterTriggerEvent>,
75                      ev_from_rb     : &Receiver<RBEvent>,
76                      data_sink      : &Sender<TofPacket>,
77                      mtb_link_map   : HashMap<u8,u8>,
78                      thread_control : Arc<Mutex<ThreadControl>>) { 
79  // deleteme
80  //let file_type = FileType::RunFile(12345);
81  //let mut writer = TofPacketWriter::new(String::from("."), file_type);
82  //writer.mbytes_per_file = 420;
83  
84
85  // set up the event builder. Since we are now doing settings only at run 
86  // start, it is fine to do this outside of the loop
87  let mut send_tev_sum    : bool;
88  let mut send_rbwaveform : bool;
89  let mut send_rbwf_freq  : u32;
90  let mut rbwf_ctr        = 0u64;
91  let mut settings        : TofEventBuilderSettings;
92  let mut run_id          : u32;
93  // this can block it is fine bc it is only 
94  // happening once at init
95  let mut cali_active : bool;
96  loop {
97    match thread_control.lock() {
98      Ok(tc) => {
99        send_rbwaveform   = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
100        send_tev_sum      = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
101        send_rbwf_freq    = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
102        settings          = tc.liftof_settings.event_builder_settings.clone();
103        run_id            = tc.run_id;
104        cali_active       = tc.calibration_active;
105      }
106      Err(err) => {
107        error!("Can't acquire lock for ThreadControl! {err}");
108        error!("CRITICAL: Unable to configure event builder thread! Aborting!");
109        return;
110      }
111    }
112    if !cali_active {
113      break;
114    } else {
115      thread::sleep(Duration::from_secs(4));
116    }
117  }
118  info!("Will assign run id {} to events!", run_id);
119
120  // event caches for assembled events
121  let mut heartbeat            = EVTBLDRHeartbeat::new();
122  let mut event_cache          = HashMap::<u32, TofEvent>::new();
123  let mut event_id_cache       = VecDeque::<u32>::with_capacity(EVENT_BUILDER_EVID_CACHE_SIZE);
124  let mut n_received           : usize;
125  let mut last_evid            = 0;
126  let mut n_sent               = 0usize;
127  // debug
128  let mut last_rb_evid         : u32;
129  let mut n_rbe_per_te         = 0usize;
130  let mut debug_timer          = Instant::now();
131  let mut check_tc_update      = Instant::now();
132  let daq_reset_cooldown       = Instant::now();
133  let reset_daq_flag           = false;
134  let mut retire               = false;
135  let mut hb_timer             = Instant::now(); 
136  let hb_interval              = Duration::from_secs(settings.hb_send_interval as u64);
137  
138  loop {
139    if check_tc_update.elapsed().as_secs() > 2 {
140      //println!("= => [evt_builder] checkling tc..");
141
142      let mut cali_still_active = false;
143      match thread_control.try_lock() {
144        Ok(mut tc) => {
145          if tc.thread_event_bldr_active {
146            println!("= => [evt_builder] shutting down...");
147            break; 
148          }
149          //println!("= => [evt_builder] {}", tc);
150          if tc.stop_flag {
151            // end myself
152            println!("= => [evt_builder] shutting down...");
153            retire = true;
154          }
155          //println!("== ==> [evt_builder] tc lock acquired!");
156          if tc.calibration_active {
157            cali_still_active = true;
158          } else {
159            cali_still_active = false;  
160          }
161          if daq_reset_cooldown.elapsed().as_secs_f32() > 120.0 && reset_daq_flag {
162            warn!("Resetttign MTB DAQ queue!");
163            tc.reset_mtb_daq = true;
164          }
165        },
166        Err(err) => {
167          error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
168        },
169      }
170      check_tc_update = Instant::now();
171      if cali_still_active {
172        thread::sleep(Duration::from_secs(1));
173        continue;
174      }
175    }
176    if retire {
177      //thread::sleep(Duration::from_secs(2));
178      break;
179    }
180    n_received = 0;
181    while n_received < settings.n_mte_per_loop as usize {
182      // every iteration, we welcome a new master event
183      match m_trig_ev.try_recv() {
184        Err(_) => {
185          trace!("No new event ready yet!");
186          //n_receiving_errors += 1;
187          continue;
188        }   
189        Ok(mt) => {
190          debug!("Received MasterTriggerEvent {}!", mt);
191          let mut event       = TofEvent::from(mt);
192          event.header.run_id = run_id;
193          if last_evid != 0 {
194            if event.mt_event.event_id != last_evid + 1 {
195              if event.mt_event.event_id > last_evid {
196                heartbeat.n_mte_skipped += (event.mt_event.event_id - last_evid - 1) as usize;
197              }
198            }
199          }
200          last_evid = event.mt_event.event_id;
201          event_cache.insert(last_evid, event);
202          // use this to keep track of the order
203          // of events
204          event_id_cache.push_back(last_evid);
205          n_received  += 1;
206          heartbeat.n_mte_received_tot += 1;
207        }
208      } // end match Ok(mt)
209    } // end getting MTEvents
210    trace!("Debug timer MTE received! {:?}", debug_timer.elapsed());
211    // recycle that variable for the rb events as well
212    n_received = 0;
213    // The second receiver gets RBEvents from all ReadoutBoards. ReadoutBoard events are 
214    // NOT cached by design. The assumption here is that due to caching on the RBs and the 
215    // longer pathway (harting cable + ethernet cables) and DRS and user time, RBEvents are 
216    // ALWAYS later than the MTEvents.
217    'main: while !ev_from_rb.is_empty() && n_received < settings.n_rbe_per_loop as usize {
218      match ev_from_rb.try_recv() {
219        Err(err) => {
220          error!("Can't receive RBEvent! Err {err}");
221        },
222        Ok(rb_ev) => {
223          heartbeat.n_rbe_received_tot += 1;
224          n_received += 1;
225          //match seen_rbevents.get_mut(&rb_ev.header.rb_id) {
226          //  Some(value) => {
227          //    *value += 1;
228          //  }
229          //  None => {
230          //    warn!("Unable to do bookkeeping for RB {}", rb_ev.header.rb_id);
231          //  }
232          //}
233          //iter_ev = 0;
234          last_rb_evid = rb_ev.header.event_id;
235          // try to asscociate the rb events with the mtb events
236          // the event ids from the RBEvents have to be in the 
237          // range of the MTB Event
238          // The event_id_cache is sorted, that is why it works
239          if last_rb_evid < event_id_cache[0] {
240            // this is the first check. If this fails, then the event is for 
241            // sure not in the event_cache and we can dismiss it right away,
242            // knowing that it is from the past
243            n_received -= 1;
244            debug!("The received RBEvent {} is from the ancient past! Currently, we don't have a way to deal with that and this event will be DISCARDED! The RBEvent queue will be re-synchronized...", last_rb_evid);
245            heartbeat.n_rbe_discarded_tot += 1;
246            heartbeat.n_rbe_from_past     += 1;
247            //*too_early_rbevents.get_mut(&rb_ev.header.rb_id).unwrap() += 1;
248            continue;
249          }
250          // Now try to get the master trigger event for 
251          // this RBEvent
252          match event_cache.get_mut(&last_rb_evid) {
253            None => {
254              // This means that either we dropped the MTB event,
255              // or the RBEvent is from the future
256              if last_rb_evid < *event_id_cache.back().unwrap() {
257                // we know that this is neither too late nor too early!
258                heartbeat.rbe_wo_mte          += 1;
259              }
260              heartbeat.n_rbe_discarded_tot += 1;
261              heartbeat.n_rbe_orphan        += 1;
262              let delta_evid = last_rb_evid - *event_id_cache.back().unwrap();
263              debug!("We can't associate event id {} from RB {} with a MTEvent in range {} .. {}. It is {} event ids ahead !", last_rb_evid, rb_ev.header.rb_id, event_id_cache[0], event_id_cache.back().unwrap(), delta_evid);
264              debug!("{}", rb_ev);
265              //let orphan_pack = rb_ev.pack();
266              //writer.add_tof_packet(&orphan_pack);
267              continue 'main;
268            },
269            Some(ev) => {
270              if settings.build_strategy == BuildStrategy::AdaptiveThorough {
271                match mtb_link_map.get(&rb_ev.header.rb_id) {
272                  None => {
273                    error!("Don't know MTB Link ID for {}", rb_ev.header.rb_id);
274                    error!("This RBEvent gets discarded!");
275                  }
276                  Some(link_id) => {
277                    if ev.mt_event.get_rb_link_ids().contains(link_id) {
278                      ev.rb_events.push(rb_ev);
279                    } else {
280                      error!("MT Event {}", ev.mt_event);
281                      error!("RBEvent {} has the same event id, but does not show up in MTB Link ID mask!", rb_ev);
282                    }
283                  }
284                }
285              } else {
286                // Just ad it without questioning
287                ev.rb_events.push(rb_ev);
288                //println!("[EVTBUILDER] DEBUG n rb expected : {}, n rbs {}",ev.mt_event.get_n_rbs_expected(), ev.rb_events.len());
289              }
290              //break;
291            }
292          }
293        }
294      }
295    }
296    // FIXME - timing debugging
297    let debug_timer_elapsed = debug_timer.elapsed().as_secs_f64();
298    //println!("Debug timer elapsed {}", debug_timer_elapsed);
299    if debug_timer_elapsed > 35.0  {
300      debug_timer = Instant::now(); 
301    }
302    trace!("Debug timer RBE received! {:?}", debug_timer.elapsed());
303
304    // -----------------------------------------------------
305    // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
306    // This concludes the actually "event building" part
307    // -----------------------------------------------------
308
309    let av_rb_ev = n_rbe_per_te as f64 / n_sent as f64;
310    if settings.build_strategy == BuildStrategy::Adaptive || 
311      settings.build_strategy  == BuildStrategy::AdaptiveThorough {
312      settings.n_rbe_per_loop  = av_rb_ev.ceil() as u32;
313      // if the rb in the pipeline get too long, catch up
314      // and drain it a bit
315      if ev_from_rb.len() > 1000 {
316        settings.n_rbe_per_loop = ev_from_rb.len() as u32 - 500;
317      }
318      if settings.n_rbe_per_loop == 0 {
319        // failsafe
320        settings.n_rbe_per_loop = 40;
321      }
322    }
323    if let BuildStrategy::AdaptiveGreedy = settings.build_strategy {
324      settings.n_rbe_per_loop = av_rb_ev.ceil() as u32 + settings.greediness as u32;
325      if settings.n_rbe_per_loop == 0 {
326        // failsafe
327        settings.n_rbe_per_loop = 40;
328      }
329    }
330    heartbeat.n_rbe_per_loop = settings.n_rbe_per_loop as usize;
331
332    //-----------------------------------------
333    // From here on, we are checking all events
334    // in the cache, deciding which ones are 
335    // ready to be passed on
336    // ----------------------------------------
337
338    let mut prior_ev_sent = 0u32;
339    let mut first_ev_sent = false;
340
341    for idx in 0..event_id_cache.len() {
342      // if there wasn't a first element, size would be 0
343      let evid = event_id_cache.pop_front().unwrap();
344      match event_cache.get(&evid) {
345        None => {
346          error!("Event id and event caches are misaligned for event id {}, idx {} and sizes {} {}! This is BAD and most likely a BUG!", evid, idx, event_cache.len(), event_id_cache.len());
347          continue;
348        },
349        Some(ev) => {
350          let ev_timed_out = ev.age() >= settings.te_timeout_sec as u64;
351          // timed out events should be sent in any case
352          let mut ready_to_send = ev_timed_out;
353          if ev_timed_out {
354            if !ev.is_complete() {
355              heartbeat.n_timed_out += 1;
356            }
357          } else {
358            // we are earlier than our time out, maybe the 
359            // event is already complete
360            match settings.build_strategy {
361              BuildStrategy::WaitForNBoards => {
362                // we will always wait for the expected number of boards, 
363                // except the event times out
364                if ev.rb_events.len() as u8 == settings.wait_nrb {
365                  ready_to_send = true;
366                } // else ready_to_send is still false 
367              },
368              BuildStrategy::Adaptive 
369              | BuildStrategy::AdaptiveThorough
370              | BuildStrategy::AdaptiveGreedy
371              | BuildStrategy::Smart 
372              | BuildStrategy::Unknown => {
373                if ev.is_complete() {
374                  ready_to_send = true;
375                }
376              }
377            }
378          } 
379          // this feature tries to sort the events which are getting sent
380          // by id. This might lead to timed out events and more resources needed
381          if settings.sort_events {
382            if ready_to_send && !ev_timed_out {
383              if idx == 0 {
384                first_ev_sent = true;
385                prior_ev_sent = ev.header.event_id;
386              } else {
387                if idx == 1 {
388                  if !first_ev_sent {
389                    // we wait and check the others too see if something 
390                    // else timed out
391                    ready_to_send = false;
392                  }
393                }
394                if ev.header.event_id != (prior_ev_sent + 1) {
395                  // we wait and check the others too see if something 
396                  // else timed out
397                  ready_to_send = false;
398                }
399                prior_ev_sent = ev.header.event_id;
400              }
401            }
402          }
403          
404          if ready_to_send {
405            // if we don't cache it, we have to send it. 
406            //let ev_to_send = ev.clone();
407            // so the idea here is that this happens way less 
408            // often (a few times per main loop iteration)
409            // than the cache it case, so we rather do something
410            // here even if it might require re-allocating memory
411            // we should have an eye on performance though
412            //idx_to_remove.push(idx);
413            let mut ev_to_send = event_cache.remove(&evid).unwrap();
414            if ev_timed_out {
415              ev_to_send.mt_event.event_status = EventStatus::EventTimeOut;
416            }
417            // update event status, so that we will also see in an 
418            // (optionally) produced tof event summary if the 
419            // event has isuses
420            n_rbe_per_te  += ev_to_send.rb_events.len();
421            if ev_to_send.has_any_mangling() {
422              heartbeat.data_mangled_ev += 1;
423            }
424            // sum up lost hits due to drs4 deadtime
425            heartbeat.drs_bsy_lost_hg_hits += ev_to_send.get_lost_hits() as usize;
426
427            let mut save_to_disk = true;
428            n_sent += 1;
429            heartbeat.n_sent += 1;
430            if send_tev_sum {
431              let tes  = ev_to_send.get_summary();
432              if settings.only_save_interesting {
433                save_to_disk = false;
434                if tes.n_hits_umb   >= settings.thr_n_hits_umb 
435                && tes.n_hits_cbe   >= settings.thr_n_hits_cbe
436                && tes.n_hits_cor   >= settings.thr_n_hits_cor
437                && tes.tot_edep_umb >= settings.thr_tot_edep_umb
438                && tes.tot_edep_cbe >= settings.thr_tot_edep_cbe
439                && tes.tot_edep_cor >= settings.thr_tot_edep_cor {
440                  save_to_disk = true;
441                }
442              }
443              let pack = tes.pack();
444              match data_sink.send(pack) {
445                Err(err) => {
446                  error!("Packet sending failed! Err {}", err);
447                }
448                Ok(_)    => {
449                  debug!("Event with id {} sent!", evid);
450                }
451              }
452            }
453
454            //if 
455            if send_rbwaveform {
456              if rbwf_ctr == send_rbwf_freq as u64 {
457                for wf in ev_to_send.get_rbwaveforms() {
458                  let pack = wf.pack();
459                  match data_sink.send(pack) {
460                    Err(err) => {
461                      error!("Packet sending failed! Err {}", err);
462                    }
463                    Ok(_)    => {
464                      debug!("Event with id {} sent!", evid);
465                    }
466                  }
467                }
468                rbwf_ctr = 0;
469              }
470              rbwf_ctr += 1; // increase for every event, not wf
471            }
472            
473            // always sent TofEvents, so they get written to disk.
474            // There is one exception though, in case we have 
475            // "interesting" event cuts in place, then this can 
476            // be restricted.
477            if save_to_disk {
478              let pack = ev_to_send.pack();
479              match data_sink.send(pack) {
480                Err(err) => {
481                  error!("Packet sending failed! Err {}", err);
482                }
483                Ok(_)    => {
484                  debug!("Event with id {} sent!", evid);
485                }
486              }
487            } 
488          // this happens when we are NOT ready to send -> cache it!
489          } else { 
490            event_id_cache.push_front(evid);
491          }
492        }
493      }
494    } // end loop over event_id_cache
495    if hb_timer.elapsed() >= hb_interval {
496      // make sure the heartbeat has the latest mission elapsed time
497      heartbeat.met_seconds         += hb_timer.elapsed().as_secs_f64() as usize;
498      // get the length of the various caches at this moment in time
499      heartbeat.event_cache_size     = event_cache.len();
500      heartbeat.event_id_cache_size  = event_id_cache.len();
501      heartbeat.mte_receiver_cbc_len = m_trig_ev.len();
502      heartbeat.rbe_receiver_cbc_len = ev_from_rb.len();
503      heartbeat.tp_sender_cbc_len    = data_sink.len();
504
505      let pack         = heartbeat.pack();
506      match data_sink.send(pack) {
507        Err(err) => {
508          error!("Packet sending failed! Err {}", err);
509        }
510        Ok(_)    => {
511        }
512      }
513      hb_timer = Instant::now();
514    } 
515  } // end loop
516}  
517