liftof_cc/threads/
event_builder.rs

1//! The Heart of lfitof-cc. The event builder assembles all 
2//! events coming from the Readoutboards in a single event
3
4use std::thread;
5use std::time::{
6  Instant,
7  Duration
8};
9
10use std::sync::{
11  Arc,
12  Mutex
13};
14
15use std::collections::VecDeque;
16use std::collections::HashMap;
17
18use crossbeam_channel::{
19  Receiver,
20  Sender,
21};
22
23use gondola_core::prelude::*;
24
25use crate::constants::EVENT_BUILDER_EVID_CACHE_SIZE;
26
27/// Events ... assemble! 
28///
29/// The event_builder collects all available event information,
30/// beginning with the MasterTriggerEvent defining the event 
31/// id. It collects the requested number of RBEvents.
32/// The final product then will be a TofEvent
33///
34/// The event_builder is the heart of this software and crucial
35/// to all operations.
36///
37/// # Arguments
38///
39/// * m_trig_ev      : Receive a `MasterTriggerEvent` over this 
40///                    channel. The event will be either build 
41///                    immediatly, or cached. 
42/// * ev_from_rb     : Receive a number of `RBEvents` over this channel.
43///                    The events here shall be associated with the 
44///                    MasterTriggerEvent
45/// * data_sink      : Send assembled events (and everything else in 
46///                    the form of TofPackets to the data sink/
47/// * mtb_link_map   : Map of MTB Link ID - RB ID. Maybe in the future
48///                    RBs will know their link id themselves?
49///                    This is currently only needed for the build strategy
50///                    "AdaptiveThorough"
51/// * settings       : Configure the event builder
52pub fn event_builder (m_trig_ev      : &Receiver<TofEvent>,
53                      ev_from_rb     : &Receiver<RBEvent>,
54                      orphanage      : &Sender<RBEvent>,
55                      data_sink      : &Sender<TofPacket>,
56                      data_sink_ev   : &Sender<TofEvent>,
57                      mtb_link_map   : HashMap<u8,u8>,
58                      dead_rb_ids    : Vec<u8>,
59                      thread_control : Arc<Mutex<ThreadControl>>) { 
60  // set up the event builder. Since we are now doing settings only at run 
61  // start, it is fine to do this outside of the loop
62  //let mut send_tev_sum    : bool;
63  //let mut send_rbwaveform : bool;
64  //let mut send_rbwf_freq  : u32;
65  //let mut rbwf_ctr        = 0u64;
66  let mut settings        : TofEventBuilderSettings;
67  let mut run_id          : u32;
68  // this can block it is fine bc it is only 
69  // happening once at init
70  let mut cali_active : bool;
71  // two trigger types are possible, make the time out depending on them 
72  //let mut primary_trigger   : TriggerType;
73  let mut combo_trigger : TriggerType;
74  // these are for debugging as long as we don't have the correct link ids working 
75  // these should be +1 for orphans when they are dropped or in case of the link_ids 
76  // when the events time out
77  let mut weird_orphan_rbids    = HashMap::<u8, usize>::new();
78  //let mut weird_rbe_link_ids    = HashMap::<u8, usize>::new();
79  let mut dead_rbs    : Option<(&Vec<u8>, &DsiJChRbMapping)> = None;
80  let mut no_expect_dead_rbs    : bool;
81  loop {
82    match thread_control.lock() {
83      Ok(tc) => {
84        settings           = tc.liftof_settings.event_builder_settings.clone();
85        run_id             = tc.run_id;
86        cali_active        = tc.calibration_active;
87        //primary_trigger   = tc.liftof_settings.mtb_settings.trigger_type;
88        combo_trigger      = tc.liftof_settings.mtb_settings.global_trigger_type;
89        no_expect_dead_rbs = tc.liftof_settings.event_builder_settings.no_expect_dead_rbs.unwrap_or(false);
90      }
91      Err(err) => {
92        error!("Can't acquire lock for ThreadControl! {err}");
93        error!("CRITICAL: Unable to configure event builder thread! Aborting!");
94        return;
95      }
96    }
97    if !cali_active {
98      break;
99    } else {
100      thread::sleep(Duration::from_secs(4));
101    }
102  }
103  info!("Will assign run id {} to events!", run_id);
104  let paddles   = TofPaddle::all().unwrap_or(Vec::<TofPaddle>::new());
105  let dsijrbmap = get_dsi_j_ch_rb_map(&paddles); 
106  //let all_rbs   = ReadoutBoard::all().unwrap_or(Vec::<ReadoutBoard>::new()); 
107  //let exprbmap  = get_linkid_rbid_map(&all_rbs);
108  if no_expect_dead_rbs {  
109    dead_rbs = Some((&dead_rb_ids, &dsijrbmap));
110  }
111
112  // event caches for assembled events
113  let mut heartbeat            = EventBuilderHB::new();
114  let mut event_cache          = HashMap::<u32, TofEvent>::new();
115  let mut event_id_cache       = VecDeque::<u32>::with_capacity(EVENT_BUILDER_EVID_CACHE_SIZE);
116  let mut n_received           : usize;
117  let mut last_evid            = 0;
118  //let mut n_sent               = 0usize;
119  // debug
120  let mut last_rb_evid         : u32;
121  let mut n_rbe_per_te         = 0usize;
122  //let mut debug_timer          = Instant::now();
123  let mut check_tc_update      = Instant::now();
124  let daq_reset_cooldown       = Instant::now();
125  let reset_daq_flag           = false;
126  let mut retire               = false;
127  let mut hb_timer             = Instant::now(); 
128  let hb_interval              = Duration::from_secs(settings.hb_send_interval as u64);
129
130  //let mut debug_orphans        = Vec::<RBEvent>::new();
131
132  // holdoff, just empty the channels, until we are confident to start 
133  if let Some(ho) = settings.holdoff {
134    // orphans, stfu
135    while hb_timer.elapsed().as_secs() < ho as u64 {
136      while !m_trig_ev.is_empty() {
137        let _foo = m_trig_ev.try_recv();
138      }
139      while !ev_from_rb.is_empty() {
140        let _bar = ev_from_rb.try_recv();
141      }
142    }
143    println!("=> EvtBldr starting m_trig_ev  len {}", m_trig_ev.len());
144    println!("=> EvtBldr starting ev_from_rb len {}", ev_from_rb.len());
145    println!("=> EvtBldr passed holdoff time of {}", ho);
146  }
147  //------- DEBUG -- Measure the timing of the different parts 
148  //------- of the loop
149  //let mut mt_loop_time     = Instant::now();
150  //let mut avg_mt_loop_time = 0u128;
151  //let mut n_iter_mt_loop   = 0usize;
152  //let mut rb_loop_time     = Instant::now();
153  //let mut avg_rb_loop_time = 0u128;
154  //let mut n_iter_rbe_loop  = 0usize;
155  let n_rbe_per_loop_default = settings.n_rbe_per_loop; 
156  loop {
157    if check_tc_update.elapsed().as_secs() > 2 {
158      //println!("= => [evt_builder] checkling tc..");
159
160      let mut cali_still_active = false;
161      match thread_control.try_lock() {
162        Ok(mut tc) => {
163          if !tc.thread_event_bldr_active {
164            //println!("= => [evt_builder] (thread_event_bldr_active == false) shutting down...");
165            continue; 
166          }
167          //println!("= => [evt_builder] {}", tc);
168          if tc.stop_flag {
169            // end myself
170            println!("= => [evt_builder] (stop_flag == true) shutting down...");
171            retire = true;
172          }
173          //println!("== ==> [evt_builder] tc lock acquired!");
174          if tc.calibration_active {
175            cali_still_active = true;
176          } else {
177            cali_still_active = false;  
178          }
179          if daq_reset_cooldown.elapsed().as_secs_f32() > 120.0 && reset_daq_flag {
180            warn!("Resetttign MTB DAQ queue!");
181            tc.reset_mtb_daq = true;
182          }
183        },
184        Err(err) => {
185          error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
186        },
187      }
188      check_tc_update = Instant::now();
189      if cali_still_active {
190        thread::sleep(Duration::from_secs(1));
191        continue;
192      }
193    }
194    if retire {
195      //thread::sleep(Duration::from_secs(2));
196      break;
197    }
198    n_received = 0;
199    while n_received < settings.n_mte_per_loop as usize {
200      //mt_loop_time     = Instant::now(); 
201      // every iteration, we welcome a new master event
202      //mt_loop_time = Instant::now(); 
203      //if m_trig_ev.is_empty() {
204      //  continue;
205      //}
206      // have that deliberatly blocking
207      match m_trig_ev.try_recv() {
208      //match m_trig_ev.try_recv() {
209        Err(_) => {
210          trace!("No new event ready yet!");
211          //n_receiving_errors += 1;
212          continue;
213        }   
214        Ok(mut event) => {
215          debug!("Received MasterTriggerEvent {}!", event);
216          event.run_id = run_id as u16; // FIXME - might be too big
217          if last_evid != 0 {
218            if event.event_id != last_evid + 1 {
219              if event.event_id > last_evid {
220                heartbeat.n_mte_skipped += (event.event_id - last_evid - 1) as u32;
221              }
222            }
223          }
224          last_evid = event.event_id;
225          event_cache.insert(last_evid, event);
226          // use this to keep track of the order
227          // of events
228          event_id_cache.push_back(last_evid);
229          n_received  += 1;
230          heartbeat.n_mte_received_tot += 1;
231        }
232      } // end match Ok(mt)
233      //avg_mt_loop_time += mt_loop_time.elapsed().as_nanos();
234      //n_iter_mt_loop += 1;
235    } // end getting MTEvents
236    //trace!("Debug timer MTE received! {:?}", debug_timer.elapsed());
237    // recycle that variable for the rb events as well
238    n_received = 0;
239    // The second receiver gets RBEvents from all ReadoutBoards. ReadoutBoard events are 
240    // NOT cached by design. The assumption here is that due to caching on the RBs and the 
241    // longer pathway (harting cable + ethernet cables) and DRS and user time, RBEvents are 
242    // ALWAYS later than the MTEvents.
243    'main: while !ev_from_rb.is_empty() && n_received < settings.n_rbe_per_loop as usize {
244      
245      //rb_loop_time = Instant::now();
246      match ev_from_rb.try_recv() {
247        Err(err) => {
248          error!("Can't receive RBEvent! Err {err}");
249        },
250        Ok(rb_ev) => {
251          heartbeat.n_rbe_received_tot += 1;
252          n_received += 1;
253          if rb_ev.status == EventStatus::RBEventWacky {
254            continue;
255          }
256
257          //match seen_rbevents.get_mut(&rb_ev.header.rb_id) {
258          //  Some(value) => {
259          //    *value += 1;
260          //  }
261          //  None => {
262          //    warn!("Unable to do bookkeeping for RB {}", rb_ev.header.rb_id);
263          //  }
264          //}
265          //iter_ev = 0;
266          last_rb_evid = rb_ev.header.event_id;
267          // try to asscociate the rb events with the mtb events
268          // the event ids from the RBEvents have to be in the 
269          // range of the MTB Event
270          // The event_id_cache is sorted, that is why it works
271          if last_rb_evid < event_id_cache[0] {
272            // this is the first check. If this fails, then the event is for 
273            // sure not in the event_cache and we can dismiss it right away,
274            // knowing that it is from the past
275            n_received -= 1;
276            debug!("The received RBEvent {} is from the ancient past! Currently, we don't have a way to deal with that and this event will be DISCARDED! The RBEvent queue will be re-synchronized...", last_rb_evid);
277            heartbeat.n_rbe_discarded_tot += 1;
278            heartbeat.n_rbe_from_past     += 1;
279            //*too_early_rbevents.get_mut(&rb_ev.header.rb_id).unwrap() += 1;
280            continue;
281          }
282          // Now try to get the master trigger event for 
283          // this RBEvent
284          match event_cache.get_mut(&last_rb_evid) {
285            None => {
286              if let Some(backend_evid) = event_id_cache.back() { 
287                if last_rb_evid < *backend_evid {
288                  // we know that this is neither too late nor too early!
289                  heartbeat.rbe_wo_mte          += 1;
290                }
291                //debug_orphans.push(rb_ev);
292                //let orphan_pack = rb_ev.pack();
293                //writer.add_tof_packet(&orphan_pack);
294                if rb_ev.creation_time.is_some() {
295                  if rb_ev.creation_time.unwrap().elapsed().as_secs() > 300 {
296                    let delta_evid = last_rb_evid - backend_evid;
297                    error!("We can't associate event id {} from RB {} with a MTEvent in range {} .. {}. It is {} event ids ahead !", last_rb_evid, rb_ev.header.rb_id, event_id_cache[0], backend_evid, delta_evid);
298                    debug!("{}", rb_ev);
299                    warn!("Orphan could not be adopted within 5 mins. Kicking them out!");
300                    heartbeat.n_rbe_discarded_tot += 1;
301                    heartbeat.n_rbe_orphan        += 1;
302                    continue 'main
303                  }
304                }
305                match orphanage.send(rb_ev) {
306                  Ok(_) => (),
307                  Err(err) => {
308                    error! ("Orphanage does not accept this orphan. They are dying in the gutter all by themselves. What a said world! {err}");
309                  }
310                }
311              }
312              continue 'main;
313            },
314            Some(ev) => {
315              if settings.build_strategy == BuildStrategy::AdaptiveThorough {
316                match mtb_link_map.get(&rb_ev.header.rb_id) {
317                  None => {
318                    error!("Don't know MTB Link ID for {}", rb_ev.header.rb_id);
319                    error!("This RBEvent gets discarded!");
320                  }
321                  Some(link_id) => {
322                    if ev.get_rb_link_ids().contains(link_id) {
323                      ev.rb_events.push(rb_ev);
324                    } else {
325                      error!("RBEvent {} has the same event id, but does not show up in MTB Link ID mask!", rb_ev);
326                    }
327                  }
328                }
329              } else {
330                // Just ad it without questioning
331                //println!("PUSHING NEW RG EVENT WITH {} HITS", rb_ev.hits.len());
332                ev.rb_events.push(rb_ev);
333                //println!("[EVTBUILDER] DEBUG n rb expected : {}, n rbs {}",ev.mt_event.get_n_rbs_expected(), ev.rb_events.len());
334              }
335              //break;
336            }
337          }
338        }
339      }
340      //thread::sleep(Duration::from_nanos(200)); 
341      //avg_rb_loop_time += rb_loop_time.elapsed().as_nanos();
342      //n_iter_rbe_loop  += 1;
343    }
344    // FIXME - timing debugging
345    //let debug_timer_elapsed = debug_timer.elapsed().as_secs_f64();
346    ////println!("Debug timer elapsed {}", debug_timer_elapsed);
347    //if debug_timer_elapsed > 90.0  {
348    //  debug_timer = Instant::now(); 
349    //  let mut file = File::create("event_id_cache.txt").unwrap();
350    //  let mut file2 = File::create("orphans.txt").unwrap();
351    //  let content = format!("{:?}", event_id_cache);
352    //  //let content = "This is a line of text.\nAnother line.";
353    //  file.write_all(content.as_bytes()); // write_all expects a byte slice
354    //  let mut content_rbs = String::new();
355    //  for k in &debug_orphans {
356    //    content_rbs += &format!("{}", k);
357    //  }
358    //  file2.write_all(content_rbs.as_bytes()); // write_al
359    //
360    //}
361    //trace!("Debug timer RBE received! {:?}", debug_timer.elapsed());
362
363    // -----------------------------------------------------
364    // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
365    // This concludes the actually "event building" part
366    // -----------------------------------------------------
367
368    let av_rb_ev = n_rbe_per_te as f64 / heartbeat.n_sent as f64;
369    if settings.build_strategy == BuildStrategy::Adaptive || 
370      settings.build_strategy  == BuildStrategy::AdaptiveThorough {
371      //settings.n_rbe_per_loop  = av_rb_ev.ceil() as u32;
372      settings.n_rbe_per_loop  = av_rb_ev.floor() as u32;
373    }
374    if let BuildStrategy::AdaptiveGreedy = settings.build_strategy {
375      settings.n_rbe_per_loop = av_rb_ev.ceil() as u32 + settings.greediness as u32;
376      if settings.n_rbe_per_loop == 0 {
377        // failsafe
378        settings.n_rbe_per_loop = 40;
379      }
380    }
381
382
383    //-----------------------------------------
384    // From here on, we are checking all events
385    // in the cache, deciding which ones are 
386    // ready to be passed on
387    // ----------------------------------------
388
389    let mut prior_ev_sent = 0u32;
390    let mut first_ev_sent = false;
391    let timeout           = settings.te_timeout_sec as u64;  
392    // For the combo trigger, we can have a different timeout
393    // if it is not set, we use the same timeout as for the primary trigger
394    let combo_timeout = settings.te_timeout_sec_combo.unwrap_or(settings.te_timeout_sec) as u64;
395    let n_iter_max = event_id_cache.len() / 2;
396    for idx in 0..n_iter_max {
397      // if there wasn't a first element, size would be 0
398      let evid = event_id_cache.pop_front().unwrap();
399      match event_cache.get_mut(&evid) {
400        None => {
401          error!("Event id and event caches are misaligned for event id {}, idx {} and sizes {} {}! This is BAD and most likely a BUG!", evid, idx, event_cache.len(), event_id_cache.len());
402          continue;
403        },
404        Some(ev) => {
405          let mut ev_timed_out = ev.age() >= timeout;
406          let mut is_combo     = false;
407          for trg in &ev.get_trigger_sources() {
408            // the combo trigger should superseed the primary trigger and only if 
409            // we have the secondary trigger, we will apply the other timeout 
410            if trg == &combo_trigger {
411              ev_timed_out = ev.age() >= combo_timeout;
412              is_combo     = true;
413              break;
414            }
415          }
416          // timed out events should be sent in any case
417          let is_complete = ev.is_complete(dead_rbs);
418          //let mut ready_to_send = ev_timed_out || is_complete;
419          let mut ready_to_send : bool;
420          // mangled events will be treated seperatly
421          if ev_timed_out && !is_complete && !ev.has_any_mangling() {
422            //if !ev.is_complete(dead_rbs) {
423              if is_combo {
424                heartbeat.n_timed_out_combo += 1;
425                //println!("Ev {} timed out! ({} total) Seen rb events {}, expected {} link ids", ev.event_id, heartbeat.n_timed_out_combo, ev.rb_events.len(), ev.get_rb_link_ids().len());
426              } else {
427                heartbeat.n_timed_out += 1;
428                //println!("GAPS Ev {} timed out! ({} total) Seen rb events {}, expected {} link ids! Expected RBs {:?}", ev.event_id, heartbeat.n_timed_out, ev.rb_events.len(), ev.get_rb_link_ids().len(), ev.get_expected_rbs(&exprbmap));
429              }
430              // let's check the rb_link_ids here and let's check which one is missing
431              //for l_id in ev.get_rb_link_ids() {
432              //  if weird_rbe_link_ids.contains_key(&l_id) {
433              //    *weird_rbe_link_ids.get_mut(&l_id).unwrap() += 1;
434              //  } else {
435              //    weird_rbe_link_ids.insert(l_id, 1);
436              //  }
437              //}
438            //}
439          } // else {
440          // we are earlier than our time out, maybe the 
441          // event is already complete
442          ready_to_send = ev_timed_out;
443          match settings.build_strategy {
444            BuildStrategy::WaitForNBoards => {
445              // we will always wait for the expected number of boards, 
446              // except the event times out
447              if ev.rb_events.len() as u8 == settings.wait_nrb {
448                ready_to_send = true;
449              } // else ready_to_send is still false 
450            },
451            BuildStrategy::Adaptive 
452            | BuildStrategy::AdaptiveThorough
453            | BuildStrategy::AdaptiveGreedy
454            | BuildStrategy::Smart 
455            | BuildStrategy::Unknown => {
456              if ev.is_complete(dead_rbs) {
457                //println!("EV HAS {} RBEVENTS", ev.rb_events.len());
458                ready_to_send = true;
459              }
460            }
461          }
462          //} 
463          // this feature tries to sort the events which are getting sent
464          // by id. This might lead to timed out events and more resources needed
465          if settings.sort_events {
466            if ready_to_send && !ev_timed_out {
467              if idx == 0 {
468                first_ev_sent = true;
469                prior_ev_sent = ev.event_id;
470              } else {
471                if idx == 1 {
472                  if !first_ev_sent {
473                    // we wait and check the others too see if something 
474                    // else timed out
475                    ready_to_send = false;
476                  }
477                }
478                if ev.event_id != (prior_ev_sent + 1) {
479                  // we wait and check the others too see if something 
480                  // else timed out
481                  ready_to_send = false;
482                }
483                prior_ev_sent = ev.event_id;
484              }
485            }
486          }
487          
488          if ready_to_send {
489            // if we don't cache it, we have to send it. 
490            //let ev_to_send = ev.clone();
491            // so the idea here is that this happens way less 
492            // often (a few times per main loop iteration)
493            // than the cache it case, so we rather do something
494            // here even if it might require re-allocating memory
495            // we should have an eye on performance though
496            //idx_to_remove.push(idx);
497            let mut ev_to_send = event_cache.remove(&evid).unwrap();
498            let is_mangled     = ev_to_send.has_any_mangling(); 
499            if is_mangled {
500              heartbeat.data_mangled_ev += 1;
501            }
502            if ev_timed_out && !is_mangled {
503              ev_to_send.status = EventStatus::EventTimeOut;
504            }
505            //if send_rbwaveform {
506            //  if rbwf_ctr == send_rbwf_freq as u64 {
507            //    for wf in ev_to_send.get_waveforms() {
508            //      let mut pack = wf.pack();
509            //      pack.no_write_to_disk = true;
510            //      match data_sink.send(pack) {
511            //        Err(err) => {
512            //          error!("Packet sending failed! Err {}", err);
513            //        }
514            //        Ok(_)    => {
515            //          debug!("Event with id {} sent!", evid);
516            //        }
517            //      }
518            //    }
519            //    rbwf_ctr = 0;
520            //  }
521            //  rbwf_ctr += 1; // increase for every event, not wf
522            //}
523            // update event status, so that we will also see in an 
524            // (optionally) produced tof event summary if the 
525            // event has isuses
526            n_rbe_per_te  += ev_to_send.rb_events.len();
527            // sum up lost hits due to drs4 deadtime
528            heartbeat.drs_bsy_lost_hg_hits += ev_to_send.get_lost_hits() as u32;
529
530            //println!("GOT {} ", ev_to_send.hits.len());
531            heartbeat.n_sent += 1;
532            if is_combo {
533              heartbeat.n_sent_combo_trigger += 1;
534            } else {
535              heartbeat.n_sent_trigger += 1;
536            }
537            //let mut no_write_to_disk = false;
538            //let no_send_over_nw  = false; 
539            //if send_tev_sum {
540            //let tes  = ev_to_send.get_summary();
541            // FIXME - these might be all zero!
542            //if settings.only_save_interesting {
543            //  no_write_to_disk = true;
544            //  if ev_to_send.n_hits_umb   >= settings.thr_n_hits_umb 
545            //  && ev_to_send.n_hits_cbe   >= settings.thr_n_hits_cbe
546            //  && ev_to_send.n_hits_cor   >= settings.thr_n_hits_cor
547            //  && ev_to_send.tot_edep_umb >= settings.thr_tot_edep_umb
548            //  && ev_to_send.tot_edep_cbe >= settings.thr_tot_edep_cbe
549            //  && ev_to_send.tot_edep_cor >= settings.thr_tot_edep_cor {
550            //    no_write_to_disk = false;
551            //  }
552            //}
553            //for wf in ev_to_send.get_waveforms() {
554            //  println!( "FS {} ", wf);
555            //}
556            //for ev in &ev_to_send.rb_events {
557            //   println!("{:?}", ev.hits);
558            //}
559            // Right now we want version V3 (which should already 
560            // be set), so it saves waveforms 
561            // FIXME - this all should go to the data_publsher
562            ev_to_send.version = ProtocolVersion::V3;
563            //let mut pack = ev_to_send.pack();
564            //pack.no_write_to_disk = no_write_to_disk;
565            match data_sink_ev.send(ev_to_send) {
566              Err(err) => {
567                error!("Packet sending failed! Err {}", err);
568              }
569              Ok(_)    => {
570                
571                debug!("Event with id {} sent!", evid);
572              }
573            //}
574            }
575
576            //if 
577            
578          // this happens when we are NOT ready to send -> cache it!
579          } else { 
580            event_id_cache.push_front(evid);
581          }
582        }
583      }
584    } // end loop over event_id_cache
585    if hb_timer.elapsed() >= hb_interval {
586      // print the statistics for the weird orphans 
587      //println!("=> Weird orphan statistics. Here is a map of rb_id -> N_orphans (which ended up in the gutter)");
588      //println!("=> {:?}", weird_orphan_rbids);
589      //println!("-- -- -- -- -- -- -- -- -- -- -- --");
590
591      // make sure the heartbeat has the latest mission elapsed time
592      heartbeat.met_seconds         += hb_timer.elapsed().as_secs_f64() as u64;// as usize;
593      // get the length of the various caches at this moment in time
594      heartbeat.event_cache_size     = event_cache.len()    as u32;
595      heartbeat.event_id_cache_size  = event_id_cache.len() as u32;
596      heartbeat.mte_receiver_cbc_len = m_trig_ev.len()      as u32;
597      heartbeat.rbe_receiver_cbc_len = ev_from_rb.len()     as u32;
598      heartbeat.tp_sender_cbc_len    = data_sink.len()      as u32;
599
600      let pack         = heartbeat.pack();
601      //println!("Avg mt loop time {}", avg_mt_loop_time as f64/n_iter_mt_loop as f64);
602      //println!("Avg rb loop time {}", avg_rb_loop_time as f64/n_iter_rbe_loop as f64);
603
604      match data_sink.send(pack) {
605        Err(err) => {
606          error!("Packet sending failed! Err {}", err);
607        }
608        Ok(_)    => {
609        }
610      }
611      hb_timer = Instant::now();
612    }
613
614    // mitigation for full channels 
615    if ev_from_rb.is_full() {
616    // flush rb events from the rb receiver channel 
617    // limit level 1 
618    //if let Some(pl)  = settings.rbe_purge_limit1 {
619      //settings.n_rbe_per_loop = n_rbe_per_loop_default;
620      //let purge       = settings.rbe_purge_limit1.unwrap_or(ev_from_rb.len() as u32);
621      //let expired_sec = settings.rbe_purge_ev_time1.unwrap_or(30) as u64;
622      //if ev_from_rb.len() > pl as usize {
623      settings.n_rbe_per_loop = n_rbe_per_loop_default;
624      for _k in 0..ev_from_rb.capacity().unwrap_or(20000) {
625        match ev_from_rb.try_recv() {
626          Err(_) => (), 
627          Ok(_ev) => {
628            ////FIXME - what to do with these?
629            // We should probalby tell the heartbeat how many we have purged?
630            //if ev.creation_time.is_some() {
631            //  if ev.creation_time.unwrap().elapsed().as_secs() < expired_sec {
632            //    match orphanage.send(ev) {
633            //      Ok(_) => (),
634            //      Err(err) => {
635            //        error! ("Orphanage does not accept this orphan. They are dying in the gutter all by themselves. What a said world! {err}");
636            //      }
637            //    } 
638            //  } else {
639            //    //// just do statistics 
640            //    //if weird_orphan_rbids.contains_key(&ev.header.rb_id) {
641            //    //  *weird_orphan_rbids.get_mut(&ev.header.rb_id).unwrap() += 1;
642            //    //} else {
643            //    //  weird_orphan_rbids.insert(ev.header.rb_id,1);
644            //    //}
645            //  }
646            //}
647          }
648        }
649      }
650      //}
651    }
652
653    // flush rb events from the rb receiver channel 
654    // limit level 1 
655    if let Some(pl)  = settings.rbe_purge_limit1 {
656      //settings.n_rbe_per_loop = n_rbe_per_loop_default;
657      //let purge       = settings.rbe_purge_limit1.unwrap_or(ev_from_rb.len() as u32);
658      let purge      = ev_from_rb.len();
659      let expired_sec = settings.rbe_purge_ev_time1.unwrap_or(30) as u64;
660      if ev_from_rb.len() > pl as usize {
661        settings.n_rbe_per_loop = n_rbe_per_loop_default;
662        for _k in 0..purge {
663          match ev_from_rb.recv() {
664            Err(_) => (), 
665            Ok(ev) => {
666              //FIXME - what to do with these?
667              if ev.creation_time.is_some() {
668                if ev.creation_time.unwrap().elapsed().as_secs() < expired_sec {
669                  match orphanage.send(ev) {
670                    Ok(_) => (),
671                    Err(err) => {
672                      error! ("Orphanage does not accept this orphan. They are dying in the gutter all by themselves. What a said world! {err}");
673                    }
674                  } 
675                } else {
676                  // just do statistics 
677                  if weird_orphan_rbids.contains_key(&ev.header.rb_id) {
678                    *weird_orphan_rbids.get_mut(&ev.header.rb_id).unwrap() += 1;
679                  } else {
680                    weird_orphan_rbids.insert(ev.header.rb_id,1);
681                  }
682                }
683              }
684            }
685          }
686        }
687      }
688    }
689 
690    //// flush rb events from the rb receiver channel 
691    //// limit level 2 
692    //if let Some(pl)  = settings.rbe_purge_limit2 {
693    //  let purge       = settings.rbe_purge_limit2.unwrap_or(ev_from_rb.len() as u32);
694    //  let expired_sec = settings.rbe_purge_ev_time2.unwrap_or(30) as u64;
695    //  if ev_from_rb.len() > pl as usize {
696    //    settings.n_rbe_per_loop = n_rbe_per_loop_default;
697    //    for k in 0..purge {
698    //      match ev_from_rb.recv() {
699    //        Err(_) => (), 
700    //        Ok(ev) => {
701    //          //FIXME - what to do with these?
702    //          if ev.creation_time.is_some() {
703    //            if ev.creation_time.unwrap().elapsed().as_secs() < expired_sec {
704    //              match orphanage.send(ev) {
705    //                Ok(_) => (),
706    //                Err(err) => {
707    //                  error! ("Orphanage does not accept this orphan. They are dying in the gutter all by themselves. What a said world! {err}");
708    //                }
709    //              } 
710    //            } else {
711    //              // just do statistics 
712    //              if weird_orphan_rbids.contains_key(&ev.header.rb_id) {
713    //                *weird_orphan_rbids.get_mut(&ev.header.rb_id).unwrap() += 1;
714    //              } else {
715    //                weird_orphan_rbids.insert(ev.header.rb_id,1);
716    //              }
717    //            }
718    //          } 
719    //        }
720    //      }
721    //    }
722    //  }
723    //}
724    //// flush rb events from the rb receiver channel 
725    //// limit level 3 
726    //if let Some(pl)  = settings.rbe_purge_limit3 {
727    //  let purge       = settings.rbe_purge_limit3.unwrap_or(ev_from_rb.len() as u32);
728    //  let expired_sec = settings.rbe_purge_ev_time3.unwrap_or(30) as u64;
729    //  if ev_from_rb.len() > pl as usize {
730    //    settings.n_rbe_per_loop = n_rbe_per_loop_default;
731    //    for k in 0..purge {
732    //      match ev_from_rb.recv() {
733    //        Err(_) => (), 
734    //        Ok(ev) => {
735    //          //FIXME - what to do with these?
736    //          if ev.creation_time.is_some() {
737    //            if ev.creation_time.unwrap().elapsed().as_secs() < expired_sec {
738    //              match orphanage.send(ev) {
739    //                Ok(_) => (),
740    //                Err(err) => {
741    //                  error! ("Orphanage does not accept this orphan. They are dying in the gutter all by themselves. What a said world! {err}");
742    //                }
743    //              } 
744    //            }
745    //          } else {
746    //            // just do statistics 
747    //            if weird_orphan_rbids.contains_key(&ev.header.rb_id) {
748    //              *weird_orphan_rbids.get_mut(&ev.header.rb_id).unwrap() += 1;
749    //            } else {
750    //              weird_orphan_rbids.insert(ev.header.rb_id,1);
751    //            }
752    //          }
753    //        }
754    //      }
755    //    }
756    //  }
757    //}
758    
759    ////if ev_from_rb.len() > 1000 {
760    ////  for k in 0..1000 {
761    ////    //while !ev_from_rb.is_empty() {
762    ////    match ev_from_rb.recv() {
763    ////      Err(_) => (), 
764    ////      Ok(ev) => {
765    ////        //FIXME - what to do with these?
766    ////        if ev.creation_time.is_some() {
767    ////          if ev.creation_time.unwrap().elapsed().as_secs() < 90 {
768    ////            match orphanage.send(ev) {
769    ////              Ok(_) => (),
770    ////              Err(err) => {
771    ////                error! ("Orphanage does not accept this orphan. They are dying in the gutter all by themselves. What a said world! {err}");
772    ////              }
773    ////            } 
774    ////          }
775    ////        }
776    ////      }
777    ////    }        
778    ////  }
779    ////  settings.n_rbe_per_loop = n_rbe_per_loop_default;
780    ////}
781    if settings.n_rbe_per_loop == 0 {
782      // failsafe
783      settings.n_rbe_per_loop = 40;
784    }
785    heartbeat.n_rbe_per_loop = settings.n_rbe_per_loop as u32;
786  } // end loop
787}  
788