liftof_rb/threads/
runner.rs

1use std::time::{Instant};
2use std::{thread, time};
3use crossbeam_channel::{Sender,
4                        Receiver};
5use std::sync::{
6    Arc,
7    Mutex,
8};
9
10use indicatif::{
11    MultiProgress,
12    ProgressBar,
13    ProgressStyle
14};
15
16use crate::control::*;
17use crate::memory::*;
18use crate::api::*;
19
20use gondola_core::prelude::*;
21
22/// Shutdown a run within the runner thread
23fn termination_seqeunce(prog_ev       : &ProgressBar,
24                        prog_a        : &ProgressBar,
25                        prog_b        : &ProgressBar,
26                        show_progress : bool,
27                        bs_sender     : &Sender<Vec<u8>>) {
28  info!("Calling termination sequence, will end current run!");
29  // just to be sure we set the self trigger rate to 0 
30  // this is for the poisson trigger)
31  match set_self_trig_rate(0) {
32    Err(err) => error!("Resetting self trigger rate to 0Hz failed! Err {err}"),
33    Ok(_)    => ()
34  }
35  match disable_trigger() {
36    Err(err) => error!("Can not disable triggers, error {err}"),
37    Ok(_)    => info!("Disabling triggers! Stopping current run!")
38  }
39  if show_progress {
40    prog_ev.finish();
41    prog_a.finish();
42    prog_b.finish();
43  }
44  match ram_buffer_handler(1,
45                           &bs_sender) { 
46    Err(err)   => {
47      error!("Can not deal with RAM buffers {err}");
48    },
49    Ok(_) => ()
50  }
51  info!("Termination sequence complete!");
52}
53
54
55/// Thread which controls run start/stop, deals with 
56/// runconfigs and dis/enable triggers accordingly
57///
58///
59///  # Arguments
60///
61///  * run_config     : A channel over which we can pass a RunConfig.
62///                     This will either initialize data taking or 
63///                     stop it.
64/// 
65///  * prog_op_ev     : An option for a progress bar which
66///                     is helpful for debugging
67///  * force_trigger  : Run in forced trigger mode
68///
69///
70pub fn runner(rb_id                   : u8,
71              run_config              : &Receiver<RunConfig>,
72              bs_sender               : &Sender<Vec<u8>>,
73              dtf_to_evproc           : &Sender<DataType>,
74              opmode_to_cache         : &Sender<TofOperationMode>,
75              show_progress           : bool,
76              settings                : &RBSettings, 
77              thread_control          : Arc<Mutex<ThreadControl>>) { 
78  
79  let one_milli        = time::Duration::from_millis(1);
80  let one_sec          = time::Duration::from_secs(1);
81  let mut first_iter   = true; 
82  let mut last_evt_cnt : u32 = 0;
83  let mut evt_cnt      = 0u32;
84  let mut delta_events : u64;
85  let mut n_events     : u64 = 0;
86
87  let rb_id_str : String =  format!("{rb_id}");
88
89  // trigger settings. Per default, we latch to the 
90  let mut latch_to_mtb = true;
91
92  let mut timer               = Instant::now();
93  // do we have to manually trigger at the desired 
94  // time inberval? Then we set force_trigger.
95  // The Poisson trigger triggers automatically.
96  let mut force_trigger = false;
97  let mut time_between_events : Option<f32> = None;
98  let met = time::Instant::now();
99
100  // run start/stop conditions
101  let mut terminate             = false;
102  let mut is_running            = false;
103  let mut listen_for_new_config = false;
104  let mut rc = RunConfig::new();
105  
106  // this are all settings for the progress bar
107  let mut template_bar_n_ev : &str;
108  let mut sty_ev     : ProgressStyle;
109  let mut multi_prog : MultiProgress;
110  let mut prog_a  = ProgressBar::hidden();
111  let mut prog_b  = ProgressBar::hidden();
112  let mut prog_ev = ProgressBar::hidden();
113  let template_bar_a   : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {bytes:>7}/{total_bytes:7} ";
114  let template_bar_b   : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.green/grey} {bytes:>7}/{total_bytes:7} ";
115  let label_a   = String::from("Buff A");
116  let label_b   = String::from("Buff B");
117  let sty_a = ProgressStyle::with_template(template_bar_a)
118  .unwrap();
119  let sty_b = ProgressStyle::with_template(template_bar_b)
120  .unwrap();
121  prog_a.set_position(0);
122  prog_b.set_position(0);
123  prog_ev.set_position(0);
124
125  let mut which_buff  = RamBuffer::A;
126  let mut buff_size   = 0usize;
127  let mut has_tripped = false;
128  // set a default of 2000 events in the cache, 
129  // but this will be defined in the run params
130  let mut buffer_trip : usize = 2000*EVENT_SIZE;
131  let mut uio1_total_size = DATABUF_TOTAL_SIZE;
132  let mut uio2_total_size = DATABUF_TOTAL_SIZE;
133  let mut buffer_strategy_timeout = false;
134  let mut buffer_trips_every = 0.0f32;
135  let mut buffer_timer = Instant::now();
136  loop {
137    match thread_control.lock() {
138      Ok(tc) => {
139        if tc.stop_flag {
140          info!("Received stop signal. Will stop thread!");
141          termination_seqeunce(&prog_ev     ,
142                               &prog_a      ,
143                               &prog_b      ,
144                               show_progress,
145                               &bs_sender   );
146          break;
147        }
148      },
149      Err(err) => {
150        trace!("Can't acquire lock! {err}");
151      },
152    }
153    match run_config.try_recv() {
154      Err(err) => {
155        trace!("Did not receive a new RunConfig! Err {err}");
156        // in this case, we just wait until we get a new run config!
157        if listen_for_new_config {
158          thread::sleep(one_sec);
159          continue;
160        }
161      }
162      Ok(new_config) => {
163        // we got a new config. We will proceed with our loop,
164        // except the config says run_active = false.
165        // In that case, we will end and listen for the next
166        // config
167        listen_for_new_config = false;
168        println!("==> Received a new set of RunConfig\n {}!", new_config);
169
170        // reset some variables for the loop
171        first_iter   = true; 
172        last_evt_cnt = 0;
173        evt_cnt      = 0;
174        //delta_events = 0;
175        n_events     = 0;
176        rc           = new_config;
177        // first of all, check if the new run config is active. 
178        // if not, stop all triggers
179        if !rc.is_active {
180          listen_for_new_config = true;
181          termination_seqeunce(&prog_ev     ,
182                               &prog_a      ,
183                               &prog_b      ,
184                               show_progress,
185                               &bs_sender   );
186          continue;
187        }
188        // we have an active run, initializing
189        terminate = false;
190
191        // deal with the individual settings:
192        // first buffer size
193        info!("Setting buffer trip value to {}", buffer_trip);
194        let buff_trip_size = rc.rb_buff_size.unwrap_or(50); 
195        if buff_trip_size == 50 {
196          warn!("Using buffer trip size of 50. This ssems to be a fallback setting");
197        }
198        buffer_trip = (buff_trip_size as usize)*EVENT_SIZE; 
199        if (buffer_trip > DATABUF_TOTAL_SIZE) 
200        || (buffer_trip > DATABUF_TOTAL_SIZE) {
201          error!("Tripsize of {buffer_trip} exceeds buffer sizes of A : {uio1_total_size} or B : {uio2_total_size}. The EVENT_SIZE is {EVENT_SIZE}");
202          warn!("Will set buffer_trip to {DATABUF_TOTAL_SIZE}");
203          buffer_trip = DATABUF_TOTAL_SIZE;
204        } else {
205          uio1_total_size = buffer_trip;
206          uio2_total_size = buffer_trip;
207        }
208        
209        match opmode_to_cache.send(rc.tof_op_mode.clone()) {
210          Err(err) => {
211            error!("Unable to send TofOperationMode to the event cache! Err {err}");
212          }
213          Ok(_)    => {
214            info!("Send TofOperationMode {} to event processing thread!", rc.tof_op_mode);
215          }
216        }
217
218        let dt_c = rc.data_type.clone();
219        match dtf_to_evproc.send(dt_c) {
220          Err(err) => {
221            error!("Unable to send dataformat to event processing thread! {err}");
222          }
223          Ok(_) => {
224            info!("Sent dataformat {} to event processing thread!", rc.data_type);
225          }
226        }
227
228        // data type
229        match rc.data_type {
230          DataType::VoltageCalibration | 
231          DataType::TimingCalibration  | 
232          DataType::Noi                |
233          DataType::RBTriggerPoisson   | 
234          DataType::RBTriggerPeriodic =>  {
235            latch_to_mtb = false;
236          },
237          _ => ()
238        }
239        if rc.trigger_poisson_rate > 0 {
240          latch_to_mtb = false;
241          // we also activate the poisson trigger
242          //enable_poisson_self_trigger(rc.trigger_poisson_rate as f32);
243        }
244        if rc.trigger_fixed_rate>0 {
245          force_trigger = true;
246          time_between_events = Some(1.0/(rc.trigger_fixed_rate as f32));
247          warn!("Will run in forced trigger mode with a rate of {} Hz!", rc.trigger_fixed_rate);
248          debug!("Will call trigger() every {} seconds...", time_between_events.unwrap());
249          latch_to_mtb = false;
250        }
251        match disable_trigger() {
252          Err(err) => error!("Can not disable triggers! {err}"),
253          Ok(_)    => info!("Triggers disabled awaiting SOFT_RESET!"),
254        }
255        info!("Attempting soft reset...");
256        match soft_reset_board() {
257          Err(err) => error!("Unable to reset board! {err}"),
258          Ok(_)    => info!("SOFT_RESET succesful!"),
259        }
260        // preparations done, let's gooo
261        //reset_dma_and_buffers();
262
263        if latch_to_mtb {
264          match set_master_trigger_mode() {
265            Err(err) => error!("Can not initialize master trigger mode, Err {err}"),
266            Ok(_)    => info!("Latching to MasterTrigger")
267          }
268        } else {
269          match disable_master_trigger_mode() {
270            Err(err) => error!("Can not disable master trigger mode, Err {err}"),
271            Ok(_)    => info!("Master trigger mode didsabled!")
272          }
273        }
274        
275        // this basically signals "RUNSTART"
276        match enable_trigger() {
277          Err(err) => error!("Can not enable triggers! Err {err}"),
278          Ok(_)    => info!("Triggers enabled - Run start!")
279        }
280        if rc.trigger_poisson_rate > 0 {
281          enable_poisson_self_trigger(rc.trigger_poisson_rate as f32);
282        }
283        // FIXME - only if above call Ok()
284        is_running = true;
285
286        if !force_trigger {
287          // we relax and let the system go 
288          // for a short bit
289          thread::sleep(one_sec);
290          match get_trigger_rate() {
291            Err(err) => error!("Unable to obtain trigger rate! Err {err}"),
292            Ok(rate) => info!("Seing MTB trigger rate of {rate} Hz")
293          }
294        }
295        if show_progress {
296          if rc.nevents == 0 {
297            template_bar_n_ev = "[{elapsed_precise}] {prefix} {msg} {spinner} ";
298          } else {
299            template_bar_n_ev = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.red/grey} {pos:>7}/{len:7}";
300          }
301          sty_ev = ProgressStyle::with_template(template_bar_n_ev)
302          .unwrap();
303          multi_prog = MultiProgress::new();
304          prog_a  = multi_prog
305                    .add(ProgressBar::new(uio1_total_size as u64)); 
306          prog_b  = multi_prog
307                    .insert_after(&prog_a, ProgressBar::new(uio2_total_size as u64)); 
308          prog_ev = multi_prog
309                    .insert_after(&prog_b, ProgressBar::new(rc.nevents as u64)); 
310          prog_a.set_message (label_a.clone());
311          prog_a.set_prefix  ("\u{1F4BE}");
312          prog_a.set_style   (sty_a.clone());
313          prog_a.set_position(0);
314          prog_b.set_message (label_b.clone());
315          prog_b.set_prefix  ("\u{1F4BE}");
316          prog_b.set_style   (sty_b.clone());
317          prog_b.set_position(0);
318          prog_ev.set_style  (sty_ev.clone());
319          prog_ev.set_prefix ("\u{2728}");
320          prog_ev.set_message("EVENTS");
321          prog_ev.set_position(0);
322          info!("Preparations complete. Run start should be imminent.");
323        }
324        continue; // start loop again
325      } // end Ok(RunConfig) 
326    } // end run_params.try_recv()
327
328    if is_running {
329      if terminate {
330        is_running = false;
331        termination_seqeunce(&prog_ev     ,
332                             &prog_a      ,
333                             &prog_b      ,
334                             show_progress,
335                             &bs_sender   );
336        info!("Run stopped! The runner has processed {n_events} events!");
337        continue;
338      } // end if terminate
339      
340      // We did not terminate the run,
341      // that means we are still going!
342      if force_trigger {
343        //println!("Forcing trigger!");
344        //println!("Time between events {}", time_between_events.unwrap());
345        let elapsed = timer.elapsed().as_secs_f32();
346        //println!("Elapsed {}", elapsed);
347        trace!("Forced trigger mode, {} seconds since last trigger", elapsed);
348        // decide if we have to issue the trigger signal NOW!
349        if elapsed > time_between_events.unwrap() {
350          timer = Instant::now(); 
351          match trigger() {
352            Err(err) => error!("Error when triggering! {err}"),
353            Ok(_)    => trace!("Firing trigger!")
354          }
355        } else { // not enough time has yet passed for the next trigger signal
356          // FIXME - we could sleep here for a bit!
357          continue;
358        }
359      }    
360
361      // calculate current event count
362      if !force_trigger {
363        // this checks if we have seen a new event
364        //match get_event_count_mt() {
365        match get_event_count() {
366          Err (err) => {
367            error!("Can not obtain event count! Err {:?}", err);
368            continue;
369          }
370          Ok (cnt) => {
371            evt_cnt = cnt;
372            if first_iter {
373              last_evt_cnt = evt_cnt;
374              first_iter = false;
375              continue;
376            }
377            if evt_cnt == last_evt_cnt {
378              thread::sleep(one_milli);
379              trace!("We did not get an updated event count!");
380              continue; // only continue after we see a new event!
381            }
382          } // end ok
383        } // end match
384      } // end force trigger
385
386      // AT THIS POINT WE KNOW WE HAVE SEEN SOMETHING!!!
387      // THIS IS IMPORTANT
388      
389      if buffer_strategy_timeout {
390        if buffer_timer.elapsed().as_secs_f32() > buffer_trips_every {
391          buffer_timer = Instant::now();
392          match ram_buffer_handler(buffer_trip,
393                                   &bs_sender) { 
394            Err(err)   => {
395              error!("Can not deal with RAM buffers {err}");
396              continue;
397            }
398            Ok(result) => {
399              which_buff = result.0;
400              buff_size  = result.1;
401              has_tripped = result.2;
402            }
403          }
404        }
405      } else {
406        match ram_buffer_handler(buffer_trip,
407                                 &bs_sender) { 
408          Err(err)   => {
409            error!("Can not deal with RAM buffers {err}");
410            continue;
411          }
412          Ok(result) => {
413            which_buff = result.0;
414            buff_size  = result.1;
415            has_tripped = result.2;
416          }
417        }
418      }
419      if has_tripped { // buffer has tripped
420        // in case we chose a dynamic buffer strategy, 
421        // adapt the buffer size for the next time
422        // FIXME - no unwraps 
423        match settings.rb_buff_strategy.get(&rb_id_str).unwrap() {
424          // first case we have a fixed buffer size
425          RBBufferStrategy::NEvents(_) => (),
426          RBBufferStrategy::NSeconds(n_secs) => {
427            buffer_trip = 1;
428            buffer_strategy_timeout = true;
429            buffer_trips_every = *n_secs;
430          }
431          RBBufferStrategy::ActuallySmart => { 
432            match get_trigger_rate() {
433              Err(err) => {
434                error!("Unable to obtain trigger rate! {err}");
435                // FIXME - Reasonable default?
436                buffer_trip = 50;
437              },
438              Ok(rate) => {
439                buffer_trip = rate as usize*EVENT_SIZE ;
440                trace!("Dynamic setting of buffer trip size for rate {}! Setting buffer trip size to {}",rate, buffer_trip);
441              }
442            }
443          }
444        }
445        // check again if buffer trip exceeds total size
446        if (buffer_trip > DATABUF_TOTAL_SIZE) 
447        || (buffer_trip > DATABUF_TOTAL_SIZE) {
448          error!("Tripsize of {buffer_trip} exceeds buffer sizes of A : {uio1_total_size} or B : {uio2_total_size}. The EVENT_SIZE is {EVENT_SIZE}");
449          warn!("Will set buffer_trip to {DATABUF_TOTAL_SIZE}");
450          buffer_trip = DATABUF_TOTAL_SIZE;
451        } else {
452          uio1_total_size = buffer_trip;
453          uio2_total_size = buffer_trip;
454        }
455        if show_progress {
456          prog_a.set_length(uio1_total_size as u64);
457          prog_b.set_length(uio2_total_size as u64);
458        }
459      }
460      if force_trigger {
461          n_events += 1;
462      } else {
463        delta_events = (evt_cnt - last_evt_cnt) as u64;
464        n_events    += delta_events;
465        last_evt_cnt = evt_cnt;
466      }
467      if show_progress {
468        match which_buff {
469          RamBuffer::A => {
470            prog_a.set_position(buff_size as u64);
471            prog_b.set_position(0);
472          }
473          RamBuffer::B => {
474            prog_b.set_position(buff_size as u64);
475            prog_a.set_position(0);
476          }
477        }
478        prog_ev.set_position(n_events);
479      }
480    } // end is_running
481    
482    // from here on, check termination 
483    // conditions
484    if rc.nseconds > 0 {
485      if met.elapsed().as_secs() > rc.nseconds  as u64{
486        terminate = true;
487      }
488    }
489    
490    // FIXME
491    if !rc.nevents == 0 {
492      if rc.nevents != 0 {
493        if n_events > rc.nevents as u64{
494          terminate = true;
495        }
496      }
497      
498      if rc.nseconds > 0 {
499          if met.elapsed().as_secs() > rc.nseconds  as u64{
500            terminate = true;
501          }
502        }
503
504      // reduce cpu load
505      if !terminate {
506        if !force_trigger { 
507          thread::sleep(100*one_milli);
508        }
509      }
510    }
511  } // end loop
512}
513
514
515