liftof_cc/threads/
event_builder.rs

1//! The Heart of lfitof-cc. The event builder assembles all 
2//! events coming from the Readoutboards in a single event
3
4use std::thread;
5use std::time::{
6  Instant,
7  Duration
8};
9
10use std::sync::{
11  Arc,
12  Mutex
13};
14
15use std::collections::VecDeque;
16use std::collections::HashMap;
17
18use crossbeam_channel::{
19  Receiver,
20  Sender,
21};
22
23//use tof_dataclasses::events::{
24//  MasterTriggerEvent,
25//  TofEvent,
26//  RBEvent,
27//  EventStatus,
28//};
29//
30//use tof_dataclasses::serialization::Packable;
31//use tof_dataclasses::packets::TofPacket;
32//use tof_dataclasses::commands::config::BuildStrategy;
33//use tof_dataclasses::heartbeats::EVTBLDRHeartbeat;
34//
35//use liftof_lib::settings::{
36//  TofEventBuilderSettings,
37//};
38//use liftof_lib::thread_control::ThreadControl;
39
40use gondola_core::prelude::*;
41
42use crate::constants::EVENT_BUILDER_EVID_CACHE_SIZE;
43
44// just for debugging
45//use tof_dataclasses::io::{
46//  TofPacketWriter,
47//  FileType
48//};
49
50
51/// Events ... assemble! 
52///
53/// The event_builder collects all available event information,
54/// beginning with the MasterTriggerEvent defining the event 
55/// id. It collects the requested number of RBEvents.
56/// The final product then will be a TofEvent
57///
58/// The event_builder is the heart of this software and crucial
59/// to all operations.
60///
61/// # Arguments
62///
63/// * m_trig_ev      : Receive a `MasterTriggerEvent` over this 
64///                    channel. The event will be either build 
65///                    immediatly, or cached. 
66/// * ev_from_rb     : Receive a number of `RBEvents` over this channel.
67///                    The events here shall be associated with the 
68///                    MasterTriggerEvent
69/// * data_sink      : Send assembled events (and everything else in 
70///                    the form of TofPackets to the data sink/
71/// * mtb_link_map   : Map of MTB Link ID - RB ID. Maybe in the future
72///                    RBs will know their link id themselves?
73///                    This is currently only needed for the build strategy
74///                    "AdaptiveThorough"
75/// * settings       : Configure the event builder
76pub fn event_builder (m_trig_ev      : &Receiver<TofEvent>,
77                      ev_from_rb     : &Receiver<RBEvent>,
78                      data_sink      : &Sender<TofPacket>,
79                      mtb_link_map   : HashMap<u8,u8>,
80                      thread_control : Arc<Mutex<ThreadControl>>) { 
81  // deleteme
82  //let file_type = FileType::RunFile(12345);
83  //let mut writer = TofPacketWriter::new(String::from("."), file_type);
84  //writer.mbytes_per_file = 420;
85  
86
87  // set up the event builder. Since we are now doing settings only at run 
88  // start, it is fine to do this outside of the loop
89  let mut send_tev_sum    : bool;
90  let mut send_rbwaveform : bool;
91  let mut send_rbwf_freq  : u32;
92  let mut rbwf_ctr        = 0u64;
93  let mut settings        : TofEventBuilderSettings;
94  let mut run_id          : u32;
95  // this can block it is fine bc it is only 
96  // happening once at init
97  let mut cali_active : bool;
98  loop {
99    match thread_control.lock() {
100      Ok(tc) => {
101        send_rbwaveform   = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
102        send_tev_sum      = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
103        send_rbwf_freq    = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
104        settings          = tc.liftof_settings.event_builder_settings.clone();
105        run_id            = tc.run_id;
106        cali_active       = tc.calibration_active;
107      }
108      Err(err) => {
109        error!("Can't acquire lock for ThreadControl! {err}");
110        error!("CRITICAL: Unable to configure event builder thread! Aborting!");
111        return;
112      }
113    }
114    if !cali_active {
115      break;
116    } else {
117      thread::sleep(Duration::from_secs(4));
118    }
119  }
120  info!("Will assign run id {} to events!", run_id);
121
122  // event caches for assembled events
123  let mut heartbeat            = EventBuilderHB::new();
124  let mut event_cache          = HashMap::<u32, TofEvent>::new();
125  let mut event_id_cache       = VecDeque::<u32>::with_capacity(EVENT_BUILDER_EVID_CACHE_SIZE);
126  let mut n_received           : usize;
127  let mut last_evid            = 0;
128  let mut n_sent               = 0usize;
129  // debug
130  let mut last_rb_evid         : u32;
131  let mut n_rbe_per_te         = 0usize;
132  let mut debug_timer          = Instant::now();
133  let mut check_tc_update      = Instant::now();
134  let daq_reset_cooldown       = Instant::now();
135  let reset_daq_flag           = false;
136  let mut retire               = false;
137  let mut hb_timer             = Instant::now(); 
138  let hb_interval              = Duration::from_secs(settings.hb_send_interval as u64);
139  
140  loop {
141    if check_tc_update.elapsed().as_secs() > 2 {
142      //println!("= => [evt_builder] checkling tc..");
143
144      let mut cali_still_active = false;
145      match thread_control.try_lock() {
146        Ok(mut tc) => {
147          if !tc.thread_event_bldr_active {
148            println!("= => [evt_builder] (thread_event_bldr_active == false) shutting down...");
149            break; 
150          }
151          //println!("= => [evt_builder] {}", tc);
152          if tc.stop_flag {
153            // end myself
154            println!("= => [evt_builder] (stop_flag == true) shutting down...");
155            retire = true;
156          }
157          //println!("== ==> [evt_builder] tc lock acquired!");
158          if tc.calibration_active {
159            cali_still_active = true;
160          } else {
161            cali_still_active = false;  
162          }
163          if daq_reset_cooldown.elapsed().as_secs_f32() > 120.0 && reset_daq_flag {
164            warn!("Resetttign MTB DAQ queue!");
165            tc.reset_mtb_daq = true;
166          }
167        },
168        Err(err) => {
169          error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
170        },
171      }
172      check_tc_update = Instant::now();
173      if cali_still_active {
174        thread::sleep(Duration::from_secs(1));
175        continue;
176      }
177    }
178    if retire {
179      //thread::sleep(Duration::from_secs(2));
180      break;
181    }
182    n_received = 0;
183    while n_received < settings.n_mte_per_loop as usize {
184      // every iteration, we welcome a new master event
185      match m_trig_ev.try_recv() {
186        Err(_) => {
187          trace!("No new event ready yet!");
188          //n_receiving_errors += 1;
189          continue;
190        }   
191        Ok(mt) => {
192          debug!("Received MasterTriggerEvent {}!", mt);
193          let mut event       = TofEvent::from(mt);
194          event.run_id = run_id as u16; // FIXME - might be too big
195          if last_evid != 0 {
196            if event.event_id != last_evid + 1 {
197              if event.event_id > last_evid {
198                heartbeat.n_mte_skipped += (event.event_id - last_evid - 1) as u64;
199              }
200            }
201          }
202          last_evid = event.event_id;
203          event_cache.insert(last_evid, event);
204          // use this to keep track of the order
205          // of events
206          event_id_cache.push_back(last_evid);
207          n_received  += 1;
208          heartbeat.n_mte_received_tot += 1;
209        }
210      } // end match Ok(mt)
211    } // end getting MTEvents
212    trace!("Debug timer MTE received! {:?}", debug_timer.elapsed());
213    // recycle that variable for the rb events as well
214    n_received = 0;
215    // The second receiver gets RBEvents from all ReadoutBoards. ReadoutBoard events are 
216    // NOT cached by design. The assumption here is that due to caching on the RBs and the 
217    // longer pathway (harting cable + ethernet cables) and DRS and user time, RBEvents are 
218    // ALWAYS later than the MTEvents.
219    'main: while !ev_from_rb.is_empty() && n_received < settings.n_rbe_per_loop as usize {
220      match ev_from_rb.try_recv() {
221        Err(err) => {
222          error!("Can't receive RBEvent! Err {err}");
223        },
224        Ok(rb_ev) => {
225          heartbeat.n_rbe_received_tot += 1;
226          n_received += 1;
227          //match seen_rbevents.get_mut(&rb_ev.header.rb_id) {
228          //  Some(value) => {
229          //    *value += 1;
230          //  }
231          //  None => {
232          //    warn!("Unable to do bookkeeping for RB {}", rb_ev.header.rb_id);
233          //  }
234          //}
235          //iter_ev = 0;
236          last_rb_evid = rb_ev.header.event_id;
237          // try to asscociate the rb events with the mtb events
238          // the event ids from the RBEvents have to be in the 
239          // range of the MTB Event
240          // The event_id_cache is sorted, that is why it works
241          if last_rb_evid < event_id_cache[0] {
242            // this is the first check. If this fails, then the event is for 
243            // sure not in the event_cache and we can dismiss it right away,
244            // knowing that it is from the past
245            n_received -= 1;
246            debug!("The received RBEvent {} is from the ancient past! Currently, we don't have a way to deal with that and this event will be DISCARDED! The RBEvent queue will be re-synchronized...", last_rb_evid);
247            heartbeat.n_rbe_discarded_tot += 1;
248            heartbeat.n_rbe_from_past     += 1;
249            //*too_early_rbevents.get_mut(&rb_ev.header.rb_id).unwrap() += 1;
250            continue;
251          }
252          // Now try to get the master trigger event for 
253          // this RBEvent
254          match event_cache.get_mut(&last_rb_evid) {
255            None => {
256              // This means that either we dropped the MTB event,
257              // or the RBEvent is from the future
258              if last_rb_evid < *event_id_cache.back().unwrap() {
259                // we know that this is neither too late nor too early!
260                heartbeat.rbe_wo_mte          += 1;
261              }
262              heartbeat.n_rbe_discarded_tot += 1;
263              heartbeat.n_rbe_orphan        += 1;
264              let delta_evid = last_rb_evid - *event_id_cache.back().unwrap();
265              debug!("We can't associate event id {} from RB {} with a MTEvent in range {} .. {}. It is {} event ids ahead !", last_rb_evid, rb_ev.header.rb_id, event_id_cache[0], event_id_cache.back().unwrap(), delta_evid);
266              debug!("{}", rb_ev);
267              //let orphan_pack = rb_ev.pack();
268              //writer.add_tof_packet(&orphan_pack);
269              continue 'main;
270            },
271            Some(ev) => {
272              if settings.build_strategy == BuildStrategy::AdaptiveThorough {
273                match mtb_link_map.get(&rb_ev.header.rb_id) {
274                  None => {
275                    error!("Don't know MTB Link ID for {}", rb_ev.header.rb_id);
276                    error!("This RBEvent gets discarded!");
277                  }
278                  Some(link_id) => {
279                    if ev.get_rb_link_ids().contains(link_id) {
280                      ev.rb_events.push(rb_ev);
281                    } else {
282                      error!("RBEvent {} has the same event id, but does not show up in MTB Link ID mask!", rb_ev);
283                    }
284                  }
285                }
286              } else {
287                // Just ad it without questioning
288                ev.rb_events.push(rb_ev);
289                //println!("[EVTBUILDER] DEBUG n rb expected : {}, n rbs {}",ev.mt_event.get_n_rbs_expected(), ev.rb_events.len());
290              }
291              //break;
292            }
293          }
294        }
295      }
296    }
297    // FIXME - timing debugging
298    let debug_timer_elapsed = debug_timer.elapsed().as_secs_f64();
299    //println!("Debug timer elapsed {}", debug_timer_elapsed);
300    if debug_timer_elapsed > 35.0  {
301      debug_timer = Instant::now(); 
302    }
303    trace!("Debug timer RBE received! {:?}", debug_timer.elapsed());
304
305    // -----------------------------------------------------
306    // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
307    // This concludes the actually "event building" part
308    // -----------------------------------------------------
309
310    let av_rb_ev = n_rbe_per_te as f64 / n_sent as f64;
311    if settings.build_strategy == BuildStrategy::Adaptive || 
312      settings.build_strategy  == BuildStrategy::AdaptiveThorough {
313      settings.n_rbe_per_loop  = av_rb_ev.ceil() as u32;
314      // if the rb in the pipeline get too long, catch up
315      // and drain it a bit
316      if ev_from_rb.len() > 1000 {
317        settings.n_rbe_per_loop = ev_from_rb.len() as u32 - 500;
318      }
319      if settings.n_rbe_per_loop == 0 {
320        // failsafe
321        settings.n_rbe_per_loop = 40;
322      }
323    }
324    if let BuildStrategy::AdaptiveGreedy = settings.build_strategy {
325      settings.n_rbe_per_loop = av_rb_ev.ceil() as u32 + settings.greediness as u32;
326      if settings.n_rbe_per_loop == 0 {
327        // failsafe
328        settings.n_rbe_per_loop = 40;
329      }
330    }
331    heartbeat.n_rbe_per_loop = settings.n_rbe_per_loop as u64;
332
333    //-----------------------------------------
334    // From here on, we are checking all events
335    // in the cache, deciding which ones are 
336    // ready to be passed on
337    // ----------------------------------------
338
339    let mut prior_ev_sent = 0u32;
340    let mut first_ev_sent = false;
341
342    for idx in 0..event_id_cache.len() {
343      // if there wasn't a first element, size would be 0
344      let evid = event_id_cache.pop_front().unwrap();
345      match event_cache.get(&evid) {
346        None => {
347          error!("Event id and event caches are misaligned for event id {}, idx {} and sizes {} {}! This is BAD and most likely a BUG!", evid, idx, event_cache.len(), event_id_cache.len());
348          continue;
349        },
350        Some(ev) => {
351          let ev_timed_out = ev.age() >= settings.te_timeout_sec as u64;
352          // timed out events should be sent in any case
353          let mut ready_to_send = ev_timed_out;
354          if ev_timed_out {
355            if !ev.is_complete() {
356              heartbeat.n_timed_out += 1;
357            }
358          } else {
359            // we are earlier than our time out, maybe the 
360            // event is already complete
361            match settings.build_strategy {
362              BuildStrategy::WaitForNBoards => {
363                // we will always wait for the expected number of boards, 
364                // except the event times out
365                if ev.rb_events.len() as u8 == settings.wait_nrb {
366                  ready_to_send = true;
367                } // else ready_to_send is still false 
368              },
369              BuildStrategy::Adaptive 
370              | BuildStrategy::AdaptiveThorough
371              | BuildStrategy::AdaptiveGreedy
372              | BuildStrategy::Smart 
373              | BuildStrategy::Unknown => {
374                if ev.is_complete() {
375                  ready_to_send = true;
376                }
377              }
378            }
379          } 
380          // this feature tries to sort the events which are getting sent
381          // by id. This might lead to timed out events and more resources needed
382          if settings.sort_events {
383            if ready_to_send && !ev_timed_out {
384              if idx == 0 {
385                first_ev_sent = true;
386                prior_ev_sent = ev.event_id;
387              } else {
388                if idx == 1 {
389                  if !first_ev_sent {
390                    // we wait and check the others too see if something 
391                    // else timed out
392                    ready_to_send = false;
393                  }
394                }
395                if ev.event_id != (prior_ev_sent + 1) {
396                  // we wait and check the others too see if something 
397                  // else timed out
398                  ready_to_send = false;
399                }
400                prior_ev_sent = ev.event_id;
401              }
402            }
403          }
404          
405          if ready_to_send {
406            // if we don't cache it, we have to send it. 
407            //let ev_to_send = ev.clone();
408            // so the idea here is that this happens way less 
409            // often (a few times per main loop iteration)
410            // than the cache it case, so we rather do something
411            // here even if it might require re-allocating memory
412            // we should have an eye on performance though
413            //idx_to_remove.push(idx);
414            let mut ev_to_send = event_cache.remove(&evid).unwrap();
415            if ev_timed_out {
416              ev_to_send.status = EventStatus::EventTimeOut;
417            }
418            // update event status, so that we will also see in an 
419            // (optionally) produced tof event summary if the 
420            // event has isuses
421            n_rbe_per_te  += ev_to_send.rb_events.len();
422            if ev_to_send.has_any_mangling() {
423              heartbeat.data_mangled_ev += 1;
424            }
425            // sum up lost hits due to drs4 deadtime
426            heartbeat.drs_bsy_lost_hg_hits += ev_to_send.get_lost_hits() as u64;
427
428            let mut save_to_disk = true;
429            n_sent += 1;
430            heartbeat.n_sent += 1;
431            if send_tev_sum {
432              //let tes  = ev_to_send.get_summary();
433              // FIXME - these might be all zero!
434              if settings.only_save_interesting {
435                save_to_disk = false;
436                if ev_to_send.n_hits_umb   >= settings.thr_n_hits_umb 
437                && ev_to_send.n_hits_cbe   >= settings.thr_n_hits_cbe
438                && ev_to_send.n_hits_cor   >= settings.thr_n_hits_cor
439                && ev_to_send.tot_edep_umb >= settings.thr_tot_edep_umb
440                && ev_to_send.tot_edep_cbe >= settings.thr_tot_edep_cbe
441                && ev_to_send.tot_edep_cor >= settings.thr_tot_edep_cor {
442                  save_to_disk = true;
443                }
444              }
445              let pack = ev_to_send.pack();
446              match data_sink.send(pack) {
447                Err(err) => {
448                  error!("Packet sending failed! Err {}", err);
449                }
450                Ok(_)    => {
451                  debug!("Event with id {} sent!", evid);
452                }
453              }
454            }
455
456            //if 
457            if send_rbwaveform {
458              if rbwf_ctr == send_rbwf_freq as u64 {
459                for wf in ev_to_send.get_waveforms() {
460                  let pack = wf.pack();
461                  match data_sink.send(pack) {
462                    Err(err) => {
463                      error!("Packet sending failed! Err {}", err);
464                    }
465                    Ok(_)    => {
466                      debug!("Event with id {} sent!", evid);
467                    }
468                  }
469                }
470                rbwf_ctr = 0;
471              }
472              rbwf_ctr += 1; // increase for every event, not wf
473            }
474            
475            // always sent TofEvents, so they get written to disk.
476            // There is one exception though, in case we have 
477            // "interesting" event cuts in place, then this can 
478            // be restricted.
479            if save_to_disk {
480              let pack = ev_to_send.pack();
481              match data_sink.send(pack) {
482                Err(err) => {
483                  error!("Packet sending failed! Err {}", err);
484                }
485                Ok(_)    => {
486                  debug!("Event with id {} sent!", evid);
487                }
488              }
489            } 
490          // this happens when we are NOT ready to send -> cache it!
491          } else { 
492            event_id_cache.push_front(evid);
493          }
494        }
495      }
496    } // end loop over event_id_cache
497    if hb_timer.elapsed() >= hb_interval {
498      // make sure the heartbeat has the latest mission elapsed time
499      heartbeat.met_seconds         += hb_timer.elapsed().as_secs_f64() as u64;// as usize;
500      // get the length of the various caches at this moment in time
501      heartbeat.event_cache_size     = event_cache.len()    as u64;
502      heartbeat.event_id_cache_size  = event_id_cache.len() as u64;
503      heartbeat.mte_receiver_cbc_len = m_trig_ev.len()      as u64;
504      heartbeat.rbe_receiver_cbc_len = ev_from_rb.len()     as u64;
505      heartbeat.tp_sender_cbc_len    = data_sink.len()      as u64;
506
507      let pack         = heartbeat.pack();
508      match data_sink.send(pack) {
509        Err(err) => {
510          error!("Packet sending failed! Err {}", err);
511        }
512        Ok(_)    => {
513        }
514      }
515      hb_timer = Instant::now();
516    } 
517  } // end loop
518}  
519