liftof_cc/threads/event_builder.rs
1//! The Heart of lfitof-cc. The event builder assembles all
2//! events coming from the Readoutboards in a single event
3
4use std::thread;
5use std::time::{
6 Instant,
7 Duration
8};
9
10use std::sync::{
11 Arc,
12 Mutex
13};
14
15use std::collections::VecDeque;
16use std::collections::HashMap;
17
18use crossbeam_channel::{
19 Receiver,
20 Sender,
21};
22
23use gondola_core::prelude::*;
24
25use crate::constants::EVENT_BUILDER_EVID_CACHE_SIZE;
26
27/// Events ... assemble!
28///
29/// The event_builder collects all available event information,
30/// beginning with the MasterTriggerEvent defining the event
31/// id. It collects the requested number of RBEvents.
32/// The final product then will be a TofEvent
33///
34/// The event_builder is the heart of this software and crucial
35/// to all operations.
36///
37/// # Arguments
38///
39/// * m_trig_ev : Receive a `MasterTriggerEvent` over this
40/// channel. The event will be either build
41/// immediatly, or cached.
42/// * ev_from_rb : Receive a number of `RBEvents` over this channel.
43/// The events here shall be associated with the
44/// MasterTriggerEvent
45/// * data_sink : Send assembled events (and everything else in
46/// the form of TofPackets to the data sink/
47/// * mtb_link_map : Map of MTB Link ID - RB ID. Maybe in the future
48/// RBs will know their link id themselves?
49/// This is currently only needed for the build strategy
50/// "AdaptiveThorough"
51/// * settings : Configure the event builder
52pub fn event_builder (m_trig_ev : &Receiver<TofEvent>,
53 ev_from_rb : &Receiver<RBEvent>,
54 orphanage : &Sender<RBEvent>,
55 data_sink : &Sender<TofPacket>,
56 data_sink_ev : &Sender<TofEvent>,
57 mtb_link_map : HashMap<u8,u8>,
58 dead_rb_ids : Vec<u8>,
59 thread_control : Arc<Mutex<ThreadControl>>) {
60 // set up the event builder. Since we are now doing settings only at run
61 // start, it is fine to do this outside of the loop
62 //let mut send_tev_sum : bool;
63 //let mut send_rbwaveform : bool;
64 //let mut send_rbwf_freq : u32;
65 //let mut rbwf_ctr = 0u64;
66 let mut settings : TofEventBuilderSettings;
67 let mut run_id : u32;
68 // this can block it is fine bc it is only
69 // happening once at init
70 let mut cali_active : bool;
71 // two trigger types are possible, make the time out depending on them
72 //let mut primary_trigger : TriggerType;
73 let mut combo_trigger : TriggerType;
74 // these are for debugging as long as we don't have the correct link ids working
75 // these should be +1 for orphans when they are dropped or in case of the link_ids
76 // when the events time out
77 let mut weird_orphan_rbids = HashMap::<u8, usize>::new();
78 //let mut weird_rbe_link_ids = HashMap::<u8, usize>::new();
79 let mut dead_rbs : Option<(&Vec<u8>, &DsiJChRbMapping)> = None;
80 let mut no_expect_dead_rbs : bool;
81 loop {
82 match thread_control.lock() {
83 Ok(tc) => {
84 settings = tc.liftof_settings.event_builder_settings.clone();
85 run_id = tc.run_id;
86 cali_active = tc.calibration_active;
87 //primary_trigger = tc.liftof_settings.mtb_settings.trigger_type;
88 combo_trigger = tc.liftof_settings.mtb_settings.global_trigger_type;
89 no_expect_dead_rbs = tc.liftof_settings.event_builder_settings.no_expect_dead_rbs.unwrap_or(false);
90 }
91 Err(err) => {
92 error!("Can't acquire lock for ThreadControl! {err}");
93 error!("CRITICAL: Unable to configure event builder thread! Aborting!");
94 return;
95 }
96 }
97 if !cali_active {
98 break;
99 } else {
100 thread::sleep(Duration::from_secs(4));
101 }
102 }
103 info!("Will assign run id {} to events!", run_id);
104 let paddles = TofPaddle::all().unwrap_or(Vec::<TofPaddle>::new());
105 let dsijrbmap = get_dsi_j_ch_rb_map(&paddles);
106 //let all_rbs = ReadoutBoard::all().unwrap_or(Vec::<ReadoutBoard>::new());
107 //let exprbmap = get_linkid_rbid_map(&all_rbs);
108 if no_expect_dead_rbs {
109 dead_rbs = Some((&dead_rb_ids, &dsijrbmap));
110 }
111
112 // event caches for assembled events
113 let mut heartbeat = EventBuilderHB::new();
114 let mut event_cache = HashMap::<u32, TofEvent>::new();
115 let mut event_id_cache = VecDeque::<u32>::with_capacity(EVENT_BUILDER_EVID_CACHE_SIZE);
116 let mut n_received : usize;
117 let mut last_evid = 0;
118 //let mut n_sent = 0usize;
119 // debug
120 let mut last_rb_evid : u32;
121 let mut n_rbe_per_te = 0usize;
122 //let mut debug_timer = Instant::now();
123 let mut check_tc_update = Instant::now();
124 let daq_reset_cooldown = Instant::now();
125 let reset_daq_flag = false;
126 let mut retire = false;
127 let mut hb_timer = Instant::now();
128 let hb_interval = Duration::from_secs(settings.hb_send_interval as u64);
129
130 //let mut debug_orphans = Vec::<RBEvent>::new();
131
132 // holdoff, just empty the channels, until we are confident to start
133 if let Some(ho) = settings.holdoff {
134 // orphans, stfu
135 while hb_timer.elapsed().as_secs() < ho as u64 {
136 while !m_trig_ev.is_empty() {
137 let _foo = m_trig_ev.try_recv();
138 }
139 while !ev_from_rb.is_empty() {
140 let _bar = ev_from_rb.try_recv();
141 }
142 }
143 println!("=> EvtBldr starting m_trig_ev len {}", m_trig_ev.len());
144 println!("=> EvtBldr starting ev_from_rb len {}", ev_from_rb.len());
145 println!("=> EvtBldr passed holdoff time of {}", ho);
146 }
147 //------- DEBUG -- Measure the timing of the different parts
148 //------- of the loop
149 //let mut mt_loop_time = Instant::now();
150 //let mut avg_mt_loop_time = 0u128;
151 //let mut n_iter_mt_loop = 0usize;
152 //let mut rb_loop_time = Instant::now();
153 //let mut avg_rb_loop_time = 0u128;
154 //let mut n_iter_rbe_loop = 0usize;
155 let n_rbe_per_loop_default = settings.n_rbe_per_loop;
156 loop {
157 if check_tc_update.elapsed().as_secs() > 2 {
158 //println!("= => [evt_builder] checkling tc..");
159
160 let mut cali_still_active = false;
161 match thread_control.try_lock() {
162 Ok(mut tc) => {
163 if !tc.thread_event_bldr_active {
164 //println!("= => [evt_builder] (thread_event_bldr_active == false) shutting down...");
165 continue;
166 }
167 //println!("= => [evt_builder] {}", tc);
168 if tc.stop_flag {
169 // end myself
170 println!("= => [evt_builder] (stop_flag == true) shutting down...");
171 retire = true;
172 }
173 //println!("== ==> [evt_builder] tc lock acquired!");
174 if tc.calibration_active {
175 cali_still_active = true;
176 } else {
177 cali_still_active = false;
178 }
179 if daq_reset_cooldown.elapsed().as_secs_f32() > 120.0 && reset_daq_flag {
180 warn!("Resetttign MTB DAQ queue!");
181 tc.reset_mtb_daq = true;
182 }
183 },
184 Err(err) => {
185 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
186 },
187 }
188 check_tc_update = Instant::now();
189 if cali_still_active {
190 thread::sleep(Duration::from_secs(1));
191 continue;
192 }
193 }
194 if retire {
195 //thread::sleep(Duration::from_secs(2));
196 break;
197 }
198 n_received = 0;
199 while n_received < settings.n_mte_per_loop as usize {
200 //mt_loop_time = Instant::now();
201 // every iteration, we welcome a new master event
202 //mt_loop_time = Instant::now();
203 //if m_trig_ev.is_empty() {
204 // continue;
205 //}
206 // have that deliberatly blocking
207 match m_trig_ev.try_recv() {
208 //match m_trig_ev.try_recv() {
209 Err(_) => {
210 trace!("No new event ready yet!");
211 //n_receiving_errors += 1;
212 continue;
213 }
214 Ok(mut event) => {
215 debug!("Received MasterTriggerEvent {}!", event);
216 event.run_id = run_id as u16; // FIXME - might be too big
217 if last_evid != 0 {
218 if event.event_id != last_evid + 1 {
219 if event.event_id > last_evid {
220 heartbeat.n_mte_skipped += (event.event_id - last_evid - 1) as u32;
221 }
222 }
223 }
224 last_evid = event.event_id;
225 event_cache.insert(last_evid, event);
226 // use this to keep track of the order
227 // of events
228 event_id_cache.push_back(last_evid);
229 n_received += 1;
230 heartbeat.n_mte_received_tot += 1;
231 }
232 } // end match Ok(mt)
233 //avg_mt_loop_time += mt_loop_time.elapsed().as_nanos();
234 //n_iter_mt_loop += 1;
235 } // end getting MTEvents
236 //trace!("Debug timer MTE received! {:?}", debug_timer.elapsed());
237 // recycle that variable for the rb events as well
238 n_received = 0;
239 // The second receiver gets RBEvents from all ReadoutBoards. ReadoutBoard events are
240 // NOT cached by design. The assumption here is that due to caching on the RBs and the
241 // longer pathway (harting cable + ethernet cables) and DRS and user time, RBEvents are
242 // ALWAYS later than the MTEvents.
243 'main: while !ev_from_rb.is_empty() && n_received < settings.n_rbe_per_loop as usize {
244
245 //rb_loop_time = Instant::now();
246 match ev_from_rb.try_recv() {
247 Err(err) => {
248 error!("Can't receive RBEvent! Err {err}");
249 },
250 Ok(rb_ev) => {
251 heartbeat.n_rbe_received_tot += 1;
252 n_received += 1;
253 if rb_ev.status == EventStatus::RBEventWacky {
254 continue;
255 }
256
257 //match seen_rbevents.get_mut(&rb_ev.header.rb_id) {
258 // Some(value) => {
259 // *value += 1;
260 // }
261 // None => {
262 // warn!("Unable to do bookkeeping for RB {}", rb_ev.header.rb_id);
263 // }
264 //}
265 //iter_ev = 0;
266 last_rb_evid = rb_ev.header.event_id;
267 // try to asscociate the rb events with the mtb events
268 // the event ids from the RBEvents have to be in the
269 // range of the MTB Event
270 // The event_id_cache is sorted, that is why it works
271 if last_rb_evid < event_id_cache[0] {
272 // this is the first check. If this fails, then the event is for
273 // sure not in the event_cache and we can dismiss it right away,
274 // knowing that it is from the past
275 n_received -= 1;
276 debug!("The received RBEvent {} is from the ancient past! Currently, we don't have a way to deal with that and this event will be DISCARDED! The RBEvent queue will be re-synchronized...", last_rb_evid);
277 heartbeat.n_rbe_discarded_tot += 1;
278 heartbeat.n_rbe_from_past += 1;
279 //*too_early_rbevents.get_mut(&rb_ev.header.rb_id).unwrap() += 1;
280 continue;
281 }
282 // Now try to get the master trigger event for
283 // this RBEvent
284 match event_cache.get_mut(&last_rb_evid) {
285 None => {
286 if let Some(backend_evid) = event_id_cache.back() {
287 if last_rb_evid < *backend_evid {
288 // we know that this is neither too late nor too early!
289 heartbeat.rbe_wo_mte += 1;
290 }
291 //debug_orphans.push(rb_ev);
292 //let orphan_pack = rb_ev.pack();
293 //writer.add_tof_packet(&orphan_pack);
294 if rb_ev.creation_time.is_some() {
295 if rb_ev.creation_time.unwrap().elapsed().as_secs() > 300 {
296 let delta_evid = last_rb_evid - backend_evid;
297 error!("We can't associate event id {} from RB {} with a MTEvent in range {} .. {}. It is {} event ids ahead !", last_rb_evid, rb_ev.header.rb_id, event_id_cache[0], backend_evid, delta_evid);
298 debug!("{}", rb_ev);
299 warn!("Orphan could not be adopted within 5 mins. Kicking them out!");
300 heartbeat.n_rbe_discarded_tot += 1;
301 heartbeat.n_rbe_orphan += 1;
302 continue 'main
303 }
304 }
305 match orphanage.send(rb_ev) {
306 Ok(_) => (),
307 Err(err) => {
308 error! ("Orphanage does not accept this orphan. They are dying in the gutter all by themselves. What a said world! {err}");
309 }
310 }
311 }
312 continue 'main;
313 },
314 Some(ev) => {
315 if settings.build_strategy == BuildStrategy::AdaptiveThorough {
316 match mtb_link_map.get(&rb_ev.header.rb_id) {
317 None => {
318 error!("Don't know MTB Link ID for {}", rb_ev.header.rb_id);
319 error!("This RBEvent gets discarded!");
320 }
321 Some(link_id) => {
322 if ev.get_rb_link_ids().contains(link_id) {
323 ev.rb_events.push(rb_ev);
324 } else {
325 error!("RBEvent {} has the same event id, but does not show up in MTB Link ID mask!", rb_ev);
326 }
327 }
328 }
329 } else {
330 // Just ad it without questioning
331 //println!("PUSHING NEW RG EVENT WITH {} HITS", rb_ev.hits.len());
332 ev.rb_events.push(rb_ev);
333 //println!("[EVTBUILDER] DEBUG n rb expected : {}, n rbs {}",ev.mt_event.get_n_rbs_expected(), ev.rb_events.len());
334 }
335 //break;
336 }
337 }
338 }
339 }
340 //thread::sleep(Duration::from_nanos(200));
341 //avg_rb_loop_time += rb_loop_time.elapsed().as_nanos();
342 //n_iter_rbe_loop += 1;
343 }
344 // FIXME - timing debugging
345 //let debug_timer_elapsed = debug_timer.elapsed().as_secs_f64();
346 ////println!("Debug timer elapsed {}", debug_timer_elapsed);
347 //if debug_timer_elapsed > 90.0 {
348 // debug_timer = Instant::now();
349 // let mut file = File::create("event_id_cache.txt").unwrap();
350 // let mut file2 = File::create("orphans.txt").unwrap();
351 // let content = format!("{:?}", event_id_cache);
352 // //let content = "This is a line of text.\nAnother line.";
353 // file.write_all(content.as_bytes()); // write_all expects a byte slice
354 // let mut content_rbs = String::new();
355 // for k in &debug_orphans {
356 // content_rbs += &format!("{}", k);
357 // }
358 // file2.write_all(content_rbs.as_bytes()); // write_al
359 //
360 //}
361 //trace!("Debug timer RBE received! {:?}", debug_timer.elapsed());
362
363 // -----------------------------------------------------
364 // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
365 // This concludes the actually "event building" part
366 // -----------------------------------------------------
367
368 let av_rb_ev = n_rbe_per_te as f64 / heartbeat.n_sent as f64;
369 if settings.build_strategy == BuildStrategy::Adaptive ||
370 settings.build_strategy == BuildStrategy::AdaptiveThorough {
371 //settings.n_rbe_per_loop = av_rb_ev.ceil() as u32;
372 settings.n_rbe_per_loop = av_rb_ev.floor() as u32;
373 }
374 if let BuildStrategy::AdaptiveGreedy = settings.build_strategy {
375 settings.n_rbe_per_loop = av_rb_ev.ceil() as u32 + settings.greediness as u32;
376 if settings.n_rbe_per_loop == 0 {
377 // failsafe
378 settings.n_rbe_per_loop = 40;
379 }
380 }
381
382
383 //-----------------------------------------
384 // From here on, we are checking all events
385 // in the cache, deciding which ones are
386 // ready to be passed on
387 // ----------------------------------------
388
389 let mut prior_ev_sent = 0u32;
390 let mut first_ev_sent = false;
391 let timeout = settings.te_timeout_sec as u64;
392 // For the combo trigger, we can have a different timeout
393 // if it is not set, we use the same timeout as for the primary trigger
394 let combo_timeout = settings.te_timeout_sec_combo.unwrap_or(settings.te_timeout_sec) as u64;
395 let n_iter_max = event_id_cache.len() / 2;
396 for idx in 0..n_iter_max {
397 // if there wasn't a first element, size would be 0
398 let evid = event_id_cache.pop_front().unwrap();
399 match event_cache.get_mut(&evid) {
400 None => {
401 error!("Event id and event caches are misaligned for event id {}, idx {} and sizes {} {}! This is BAD and most likely a BUG!", evid, idx, event_cache.len(), event_id_cache.len());
402 continue;
403 },
404 Some(ev) => {
405 let mut ev_timed_out = ev.age() >= timeout;
406 let mut is_combo = false;
407 for trg in &ev.get_trigger_sources() {
408 // the combo trigger should superseed the primary trigger and only if
409 // we have the secondary trigger, we will apply the other timeout
410 if trg == &combo_trigger {
411 ev_timed_out = ev.age() >= combo_timeout;
412 is_combo = true;
413 break;
414 }
415 }
416 // timed out events should be sent in any case
417 let is_complete = ev.is_complete(dead_rbs);
418 //let mut ready_to_send = ev_timed_out || is_complete;
419 let mut ready_to_send : bool;
420 // mangled events will be treated seperatly
421 if ev_timed_out && !is_complete && !ev.has_any_mangling() {
422 //if !ev.is_complete(dead_rbs) {
423 if is_combo {
424 heartbeat.n_timed_out_combo += 1;
425 //println!("Ev {} timed out! ({} total) Seen rb events {}, expected {} link ids", ev.event_id, heartbeat.n_timed_out_combo, ev.rb_events.len(), ev.get_rb_link_ids().len());
426 } else {
427 heartbeat.n_timed_out += 1;
428 //println!("GAPS Ev {} timed out! ({} total) Seen rb events {}, expected {} link ids! Expected RBs {:?}", ev.event_id, heartbeat.n_timed_out, ev.rb_events.len(), ev.get_rb_link_ids().len(), ev.get_expected_rbs(&exprbmap));
429 }
430 // let's check the rb_link_ids here and let's check which one is missing
431 //for l_id in ev.get_rb_link_ids() {
432 // if weird_rbe_link_ids.contains_key(&l_id) {
433 // *weird_rbe_link_ids.get_mut(&l_id).unwrap() += 1;
434 // } else {
435 // weird_rbe_link_ids.insert(l_id, 1);
436 // }
437 //}
438 //}
439 } // else {
440 // we are earlier than our time out, maybe the
441 // event is already complete
442 ready_to_send = ev_timed_out;
443 match settings.build_strategy {
444 BuildStrategy::WaitForNBoards => {
445 // we will always wait for the expected number of boards,
446 // except the event times out
447 if ev.rb_events.len() as u8 == settings.wait_nrb {
448 ready_to_send = true;
449 } // else ready_to_send is still false
450 },
451 BuildStrategy::Adaptive
452 | BuildStrategy::AdaptiveThorough
453 | BuildStrategy::AdaptiveGreedy
454 | BuildStrategy::Smart
455 | BuildStrategy::Unknown => {
456 if ev.is_complete(dead_rbs) {
457 //println!("EV HAS {} RBEVENTS", ev.rb_events.len());
458 ready_to_send = true;
459 }
460 }
461 }
462 //}
463 // this feature tries to sort the events which are getting sent
464 // by id. This might lead to timed out events and more resources needed
465 if settings.sort_events {
466 if ready_to_send && !ev_timed_out {
467 if idx == 0 {
468 first_ev_sent = true;
469 prior_ev_sent = ev.event_id;
470 } else {
471 if idx == 1 {
472 if !first_ev_sent {
473 // we wait and check the others too see if something
474 // else timed out
475 ready_to_send = false;
476 }
477 }
478 if ev.event_id != (prior_ev_sent + 1) {
479 // we wait and check the others too see if something
480 // else timed out
481 ready_to_send = false;
482 }
483 prior_ev_sent = ev.event_id;
484 }
485 }
486 }
487
488 if ready_to_send {
489 // if we don't cache it, we have to send it.
490 //let ev_to_send = ev.clone();
491 // so the idea here is that this happens way less
492 // often (a few times per main loop iteration)
493 // than the cache it case, so we rather do something
494 // here even if it might require re-allocating memory
495 // we should have an eye on performance though
496 //idx_to_remove.push(idx);
497 let mut ev_to_send = event_cache.remove(&evid).unwrap();
498 let is_mangled = ev_to_send.has_any_mangling();
499 if is_mangled {
500 heartbeat.data_mangled_ev += 1;
501 }
502 if ev_timed_out && !is_mangled {
503 ev_to_send.status = EventStatus::EventTimeOut;
504 }
505 //if send_rbwaveform {
506 // if rbwf_ctr == send_rbwf_freq as u64 {
507 // for wf in ev_to_send.get_waveforms() {
508 // let mut pack = wf.pack();
509 // pack.no_write_to_disk = true;
510 // match data_sink.send(pack) {
511 // Err(err) => {
512 // error!("Packet sending failed! Err {}", err);
513 // }
514 // Ok(_) => {
515 // debug!("Event with id {} sent!", evid);
516 // }
517 // }
518 // }
519 // rbwf_ctr = 0;
520 // }
521 // rbwf_ctr += 1; // increase for every event, not wf
522 //}
523 // update event status, so that we will also see in an
524 // (optionally) produced tof event summary if the
525 // event has isuses
526 n_rbe_per_te += ev_to_send.rb_events.len();
527 // sum up lost hits due to drs4 deadtime
528 heartbeat.drs_bsy_lost_hg_hits += ev_to_send.get_lost_hits() as u32;
529
530 //println!("GOT {} ", ev_to_send.hits.len());
531 heartbeat.n_sent += 1;
532 if is_combo {
533 heartbeat.n_sent_combo_trigger += 1;
534 } else {
535 heartbeat.n_sent_trigger += 1;
536 }
537 //let mut no_write_to_disk = false;
538 //let no_send_over_nw = false;
539 //if send_tev_sum {
540 //let tes = ev_to_send.get_summary();
541 // FIXME - these might be all zero!
542 //if settings.only_save_interesting {
543 // no_write_to_disk = true;
544 // if ev_to_send.n_hits_umb >= settings.thr_n_hits_umb
545 // && ev_to_send.n_hits_cbe >= settings.thr_n_hits_cbe
546 // && ev_to_send.n_hits_cor >= settings.thr_n_hits_cor
547 // && ev_to_send.tot_edep_umb >= settings.thr_tot_edep_umb
548 // && ev_to_send.tot_edep_cbe >= settings.thr_tot_edep_cbe
549 // && ev_to_send.tot_edep_cor >= settings.thr_tot_edep_cor {
550 // no_write_to_disk = false;
551 // }
552 //}
553 //for wf in ev_to_send.get_waveforms() {
554 // println!( "FS {} ", wf);
555 //}
556 //for ev in &ev_to_send.rb_events {
557 // println!("{:?}", ev.hits);
558 //}
559 // Right now we want version V3 (which should already
560 // be set), so it saves waveforms
561 // FIXME - this all should go to the data_publsher
562 ev_to_send.version = ProtocolVersion::V3;
563 //let mut pack = ev_to_send.pack();
564 //pack.no_write_to_disk = no_write_to_disk;
565 match data_sink_ev.send(ev_to_send) {
566 Err(err) => {
567 error!("Packet sending failed! Err {}", err);
568 }
569 Ok(_) => {
570
571 debug!("Event with id {} sent!", evid);
572 }
573 //}
574 }
575
576 //if
577
578 // this happens when we are NOT ready to send -> cache it!
579 } else {
580 event_id_cache.push_front(evid);
581 }
582 }
583 }
584 } // end loop over event_id_cache
585 if hb_timer.elapsed() >= hb_interval {
586 // print the statistics for the weird orphans
587 //println!("=> Weird orphan statistics. Here is a map of rb_id -> N_orphans (which ended up in the gutter)");
588 //println!("=> {:?}", weird_orphan_rbids);
589 //println!("-- -- -- -- -- -- -- -- -- -- -- --");
590
591 // make sure the heartbeat has the latest mission elapsed time
592 heartbeat.met_seconds += hb_timer.elapsed().as_secs_f64() as u64;// as usize;
593 // get the length of the various caches at this moment in time
594 heartbeat.event_cache_size = event_cache.len() as u32;
595 heartbeat.event_id_cache_size = event_id_cache.len() as u32;
596 heartbeat.mte_receiver_cbc_len = m_trig_ev.len() as u32;
597 heartbeat.rbe_receiver_cbc_len = ev_from_rb.len() as u32;
598 heartbeat.tp_sender_cbc_len = data_sink.len() as u32;
599
600 let pack = heartbeat.pack();
601 //println!("Avg mt loop time {}", avg_mt_loop_time as f64/n_iter_mt_loop as f64);
602 //println!("Avg rb loop time {}", avg_rb_loop_time as f64/n_iter_rbe_loop as f64);
603
604 match data_sink.send(pack) {
605 Err(err) => {
606 error!("Packet sending failed! Err {}", err);
607 }
608 Ok(_) => {
609 }
610 }
611 hb_timer = Instant::now();
612 }
613
614 // mitigation for full channels
615 if ev_from_rb.is_full() {
616 // flush rb events from the rb receiver channel
617 // limit level 1
618 //if let Some(pl) = settings.rbe_purge_limit1 {
619 //settings.n_rbe_per_loop = n_rbe_per_loop_default;
620 //let purge = settings.rbe_purge_limit1.unwrap_or(ev_from_rb.len() as u32);
621 //let expired_sec = settings.rbe_purge_ev_time1.unwrap_or(30) as u64;
622 //if ev_from_rb.len() > pl as usize {
623 settings.n_rbe_per_loop = n_rbe_per_loop_default;
624 for _k in 0..ev_from_rb.capacity().unwrap_or(20000) {
625 match ev_from_rb.try_recv() {
626 Err(_) => (),
627 Ok(_ev) => {
628 ////FIXME - what to do with these?
629 // We should probalby tell the heartbeat how many we have purged?
630 //if ev.creation_time.is_some() {
631 // if ev.creation_time.unwrap().elapsed().as_secs() < expired_sec {
632 // match orphanage.send(ev) {
633 // Ok(_) => (),
634 // Err(err) => {
635 // error! ("Orphanage does not accept this orphan. They are dying in the gutter all by themselves. What a said world! {err}");
636 // }
637 // }
638 // } else {
639 // //// just do statistics
640 // //if weird_orphan_rbids.contains_key(&ev.header.rb_id) {
641 // // *weird_orphan_rbids.get_mut(&ev.header.rb_id).unwrap() += 1;
642 // //} else {
643 // // weird_orphan_rbids.insert(ev.header.rb_id,1);
644 // //}
645 // }
646 //}
647 }
648 }
649 }
650 //}
651 }
652
653 // flush rb events from the rb receiver channel
654 // limit level 1
655 if let Some(pl) = settings.rbe_purge_limit1 {
656 //settings.n_rbe_per_loop = n_rbe_per_loop_default;
657 //let purge = settings.rbe_purge_limit1.unwrap_or(ev_from_rb.len() as u32);
658 let purge = ev_from_rb.len();
659 let expired_sec = settings.rbe_purge_ev_time1.unwrap_or(30) as u64;
660 if ev_from_rb.len() > pl as usize {
661 settings.n_rbe_per_loop = n_rbe_per_loop_default;
662 for _k in 0..purge {
663 match ev_from_rb.recv() {
664 Err(_) => (),
665 Ok(ev) => {
666 //FIXME - what to do with these?
667 if ev.creation_time.is_some() {
668 if ev.creation_time.unwrap().elapsed().as_secs() < expired_sec {
669 match orphanage.send(ev) {
670 Ok(_) => (),
671 Err(err) => {
672 error! ("Orphanage does not accept this orphan. They are dying in the gutter all by themselves. What a said world! {err}");
673 }
674 }
675 } else {
676 // just do statistics
677 if weird_orphan_rbids.contains_key(&ev.header.rb_id) {
678 *weird_orphan_rbids.get_mut(&ev.header.rb_id).unwrap() += 1;
679 } else {
680 weird_orphan_rbids.insert(ev.header.rb_id,1);
681 }
682 }
683 }
684 }
685 }
686 }
687 }
688 }
689
690 //// flush rb events from the rb receiver channel
691 //// limit level 2
692 //if let Some(pl) = settings.rbe_purge_limit2 {
693 // let purge = settings.rbe_purge_limit2.unwrap_or(ev_from_rb.len() as u32);
694 // let expired_sec = settings.rbe_purge_ev_time2.unwrap_or(30) as u64;
695 // if ev_from_rb.len() > pl as usize {
696 // settings.n_rbe_per_loop = n_rbe_per_loop_default;
697 // for k in 0..purge {
698 // match ev_from_rb.recv() {
699 // Err(_) => (),
700 // Ok(ev) => {
701 // //FIXME - what to do with these?
702 // if ev.creation_time.is_some() {
703 // if ev.creation_time.unwrap().elapsed().as_secs() < expired_sec {
704 // match orphanage.send(ev) {
705 // Ok(_) => (),
706 // Err(err) => {
707 // error! ("Orphanage does not accept this orphan. They are dying in the gutter all by themselves. What a said world! {err}");
708 // }
709 // }
710 // } else {
711 // // just do statistics
712 // if weird_orphan_rbids.contains_key(&ev.header.rb_id) {
713 // *weird_orphan_rbids.get_mut(&ev.header.rb_id).unwrap() += 1;
714 // } else {
715 // weird_orphan_rbids.insert(ev.header.rb_id,1);
716 // }
717 // }
718 // }
719 // }
720 // }
721 // }
722 // }
723 //}
724 //// flush rb events from the rb receiver channel
725 //// limit level 3
726 //if let Some(pl) = settings.rbe_purge_limit3 {
727 // let purge = settings.rbe_purge_limit3.unwrap_or(ev_from_rb.len() as u32);
728 // let expired_sec = settings.rbe_purge_ev_time3.unwrap_or(30) as u64;
729 // if ev_from_rb.len() > pl as usize {
730 // settings.n_rbe_per_loop = n_rbe_per_loop_default;
731 // for k in 0..purge {
732 // match ev_from_rb.recv() {
733 // Err(_) => (),
734 // Ok(ev) => {
735 // //FIXME - what to do with these?
736 // if ev.creation_time.is_some() {
737 // if ev.creation_time.unwrap().elapsed().as_secs() < expired_sec {
738 // match orphanage.send(ev) {
739 // Ok(_) => (),
740 // Err(err) => {
741 // error! ("Orphanage does not accept this orphan. They are dying in the gutter all by themselves. What a said world! {err}");
742 // }
743 // }
744 // }
745 // } else {
746 // // just do statistics
747 // if weird_orphan_rbids.contains_key(&ev.header.rb_id) {
748 // *weird_orphan_rbids.get_mut(&ev.header.rb_id).unwrap() += 1;
749 // } else {
750 // weird_orphan_rbids.insert(ev.header.rb_id,1);
751 // }
752 // }
753 // }
754 // }
755 // }
756 // }
757 //}
758
759 ////if ev_from_rb.len() > 1000 {
760 //// for k in 0..1000 {
761 //// //while !ev_from_rb.is_empty() {
762 //// match ev_from_rb.recv() {
763 //// Err(_) => (),
764 //// Ok(ev) => {
765 //// //FIXME - what to do with these?
766 //// if ev.creation_time.is_some() {
767 //// if ev.creation_time.unwrap().elapsed().as_secs() < 90 {
768 //// match orphanage.send(ev) {
769 //// Ok(_) => (),
770 //// Err(err) => {
771 //// error! ("Orphanage does not accept this orphan. They are dying in the gutter all by themselves. What a said world! {err}");
772 //// }
773 //// }
774 //// }
775 //// }
776 //// }
777 //// }
778 //// }
779 //// settings.n_rbe_per_loop = n_rbe_per_loop_default;
780 ////}
781 if settings.n_rbe_per_loop == 0 {
782 // failsafe
783 settings.n_rbe_per_loop = 40;
784 }
785 heartbeat.n_rbe_per_loop = settings.n_rbe_per_loop as u32;
786 } // end loop
787}
788