liftof_rb/threads/
runner.rs

1use std::time::{Instant};
2use std::{thread, time};
3use crossbeam_channel::{Sender,
4                        Receiver};
5use std::sync::{
6    Arc,
7    Mutex,
8};
9
10use indicatif::{
11    MultiProgress,
12    ProgressBar,
13    ProgressStyle
14};
15
16use crate::control::*;
17use crate::memory::*;
18use crate::api::*;
19
20use tof_dataclasses::events::{DataType};
21use tof_dataclasses::commands::{TofOperationMode};
22use tof_dataclasses::commands::config::{
23  RunConfig
24};
25//use tof_dataclasses::io::RBEventMemoryStreamer;
26//use tof_dataclasses::packets::TofPacket;
27//use tof_dataclasses::threading::ThreadControl;
28
29use liftof_lib::settings::{
30    RBSettings,
31    RBBufferStrategy
32};
33
34use liftof_lib::thread_control::ThreadControl;
35
36
37/// Shutdown a run within the runner thread
38fn termination_seqeunce(prog_ev       : &ProgressBar,
39                        prog_a        : &ProgressBar,
40                        prog_b        : &ProgressBar,
41                        show_progress : bool,
42                        bs_sender     : &Sender<Vec<u8>>) {
43  info!("Calling termination sequence, will end current run!");
44  // just to be sure we set the self trigger rate to 0 
45  // this is for the poisson trigger)
46  match set_self_trig_rate(0) {
47    Err(err) => error!("Resetting self trigger rate to 0Hz failed! Err {err}"),
48    Ok(_)    => ()
49  }
50  match disable_trigger() {
51    Err(err) => error!("Can not disable triggers, error {err}"),
52    Ok(_)    => info!("Disabling triggers! Stopping current run!")
53  }
54  if show_progress {
55    prog_ev.finish();
56    prog_a.finish();
57    prog_b.finish();
58  }
59  match ram_buffer_handler(1,
60                           &bs_sender) { 
61    Err(err)   => {
62      error!("Can not deal with RAM buffers {err}");
63    },
64    Ok(_) => ()
65  }
66  info!("Termination sequence complete!");
67}
68
69
70/// Thread which controls run start/stop, deals with 
71/// runconfigs and dis/enable triggers accordingly
72///
73///
74///  # Arguments
75///
76///  * run_config     : A channel over which we can pass a RunConfig.
77///                     This will either initialize data taking or 
78///                     stop it.
79/// 
80///  * prog_op_ev     : An option for a progress bar which
81///                     is helpful for debugging
82///  * force_trigger  : Run in forced trigger mode
83///
84///
85pub fn runner(run_config              : &Receiver<RunConfig>,
86              bs_sender               : &Sender<Vec<u8>>,
87              dtf_to_evproc           : &Sender<DataType>,
88              opmode_to_cache         : &Sender<TofOperationMode>,
89              show_progress           : bool,
90              settings                : &RBSettings, 
91              thread_control          : Arc<Mutex<ThreadControl>>) { 
92  
93  let one_milli        = time::Duration::from_millis(1);
94  let one_sec          = time::Duration::from_secs(1);
95  let mut first_iter   = true; 
96  let mut last_evt_cnt : u32 = 0;
97  let mut evt_cnt      = 0u32;
98  let mut delta_events : u64;
99  let mut n_events     : u64 = 0;
100 
101  // trigger settings. Per default, we latch to the 
102  let mut latch_to_mtb = true;
103
104  let mut timer               = Instant::now();
105  // do we have to manually trigger at the desired 
106  // time inberval? Then we set force_trigger.
107  // The Poisson trigger triggers automatically.
108  let mut force_trigger = false;
109  let mut time_between_events : Option<f32> = None;
110  let met = time::Instant::now();
111
112  // run start/stop conditions
113  let mut terminate             = false;
114  let mut is_running            = false;
115  let mut listen_for_new_config = false;
116  let mut rc = RunConfig::new();
117  
118  // this are all settings for the progress bar
119  let mut template_bar_n_ev : &str;
120  let mut sty_ev     : ProgressStyle;
121  let mut multi_prog : MultiProgress;
122  let mut prog_a  = ProgressBar::hidden();
123  let mut prog_b  = ProgressBar::hidden();
124  let mut prog_ev = ProgressBar::hidden();
125  let template_bar_a   : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {bytes:>7}/{total_bytes:7} ";
126  let template_bar_b   : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.green/grey} {bytes:>7}/{total_bytes:7} ";
127  let label_a   = String::from("Buff A");
128  let label_b   = String::from("Buff B");
129  let sty_a = ProgressStyle::with_template(template_bar_a)
130  .unwrap();
131  let sty_b = ProgressStyle::with_template(template_bar_b)
132  .unwrap();
133  prog_a.set_position(0);
134  prog_b.set_position(0);
135  prog_ev.set_position(0);
136
137  let mut which_buff  : RamBuffer;
138  let mut buff_size   : usize;
139  // set a default of 2000 events in the cache, 
140  // but this will be defined in the run params
141  let mut buffer_trip : usize = 2000*EVENT_SIZE;
142  let mut uio1_total_size = DATABUF_TOTAL_SIZE;
143  let mut uio2_total_size = DATABUF_TOTAL_SIZE;
144  loop {
145    match thread_control.lock() {
146      Ok(tc) => {
147        if tc.stop_flag {
148          info!("Received stop signal. Will stop thread!");
149          termination_seqeunce(&prog_ev     ,
150                               &prog_a      ,
151                               &prog_b      ,
152                               show_progress,
153                               &bs_sender   );
154          break;
155        }
156      },
157      Err(err) => {
158        trace!("Can't acquire lock! {err}");
159      },
160    }
161    match run_config.try_recv() {
162      Err(err) => {
163        trace!("Did not receive a new RunConfig! Err {err}");
164        // in this case, we just wait until we get a new run config!
165        if listen_for_new_config {
166          thread::sleep(one_sec);
167          continue;
168        }
169      }
170      Ok(new_config) => {
171        // we got a new config. We will proceed with our loop,
172        // except the config says run_active = false.
173        // In that case, we will end and listen for the next
174        // config
175        listen_for_new_config = false;
176        println!("==> Received a new set of RunConfig\n {}!", new_config);
177
178        // reset some variables for the loop
179        first_iter   = true; 
180        last_evt_cnt = 0;
181        evt_cnt      = 0;
182        //delta_events = 0;
183        n_events     = 0;
184        rc           = new_config;
185        // first of all, check if the new run config is active. 
186        // if not, stop all triggers
187        if !rc.is_active {
188          listen_for_new_config = true;
189          termination_seqeunce(&prog_ev     ,
190                               &prog_a      ,
191                               &prog_b      ,
192                               show_progress,
193                               &bs_sender   );
194          continue;
195        }
196        // we have an active run, initializing
197        terminate = false;
198
199        // deal with the individual settings:
200        // first buffer size
201        info!("Setting buffer trip value to {}", buffer_trip);
202        buffer_trip = (rc.rb_buff_size as usize)*EVENT_SIZE; 
203        if (buffer_trip > DATABUF_TOTAL_SIZE) 
204        || (buffer_trip > DATABUF_TOTAL_SIZE) {
205          error!("Tripsize of {buffer_trip} exceeds buffer sizes of A : {uio1_total_size} or B : {uio2_total_size}. The EVENT_SIZE is {EVENT_SIZE}");
206          warn!("Will set buffer_trip to {DATABUF_TOTAL_SIZE}");
207          buffer_trip = DATABUF_TOTAL_SIZE;
208        } else {
209          uio1_total_size = buffer_trip;
210          uio2_total_size = buffer_trip;
211        }
212        
213        match opmode_to_cache.send(rc.tof_op_mode.clone()) {
214          Err(err) => {
215            error!("Unable to send TofOperationMode to the event cache! Err {err}");
216          }
217          Ok(_)    => {
218            info!("Send TofOperationMode {} to event processing thread!", rc.tof_op_mode);
219          }
220        }
221
222        let dt_c = rc.data_type.clone();
223        match dtf_to_evproc.send(dt_c) {
224          Err(err) => {
225            error!("Unable to send dataformat to event processing thread! {err}");
226          }
227          Ok(_) => {
228            info!("Sent dataformat {} to event processing thread!", rc.data_type);
229          }
230        }
231
232        // data type
233        match rc.data_type {
234          DataType::VoltageCalibration | 
235          DataType::TimingCalibration  | 
236          DataType::Noi                |
237          DataType::RBTriggerPoisson   | 
238          DataType::RBTriggerPeriodic =>  {
239            latch_to_mtb = false;
240          },
241          _ => ()
242        }
243        if rc.trigger_poisson_rate > 0 {
244          latch_to_mtb = false;
245          // we also activate the poisson trigger
246          //enable_poisson_self_trigger(rc.trigger_poisson_rate as f32);
247        }
248        if rc.trigger_fixed_rate>0 {
249          force_trigger = true;
250          time_between_events = Some(1.0/(rc.trigger_fixed_rate as f32));
251          warn!("Will run in forced trigger mode with a rate of {} Hz!", rc.trigger_fixed_rate);
252          debug!("Will call trigger() every {} seconds...", time_between_events.unwrap());
253          latch_to_mtb = false;
254        }
255        match disable_trigger() {
256          Err(err) => error!("Can not disable triggers! {err}"),
257          Ok(_)    => info!("Triggers disabled awaiting SOFT_RESET!"),
258        }
259        info!("Attempting soft reset...");
260        match soft_reset_board() {
261          Err(err) => error!("Unable to reset board! {err}"),
262          Ok(_)    => info!("SOFT_RESET succesful!"),
263        }
264        // preparations done, let's gooo
265        //reset_dma_and_buffers();
266
267        if latch_to_mtb {
268          match set_master_trigger_mode() {
269            Err(err) => error!("Can not initialize master trigger mode, Err {err}"),
270            Ok(_)    => info!("Latching to MasterTrigger")
271          }
272        } else {
273          match disable_master_trigger_mode() {
274            Err(err) => error!("Can not disable master trigger mode, Err {err}"),
275            Ok(_)    => info!("Master trigger mode didsabled!")
276          }
277        }
278        
279        // this basically signals "RUNSTART"
280        match enable_trigger() {
281          Err(err) => error!("Can not enable triggers! Err {err}"),
282          Ok(_)    => info!("Triggers enabled - Run start!")
283        }
284        if rc.trigger_poisson_rate > 0 {
285          enable_poisson_self_trigger(rc.trigger_poisson_rate as f32);
286        }
287        // FIXME - only if above call Ok()
288        is_running = true;
289
290        if !force_trigger {
291          // we relax and let the system go 
292          // for a short bit
293          thread::sleep(one_sec);
294          match get_trigger_rate() {
295            Err(err) => error!("Unable to obtain trigger rate! Err {err}"),
296            Ok(rate) => info!("Seing MTB trigger rate of {rate} Hz")
297          }
298        }
299        if show_progress {
300          if rc.nevents == 0 {
301            template_bar_n_ev = "[{elapsed_precise}] {prefix} {msg} {spinner} ";
302          } else {
303            template_bar_n_ev = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.red/grey} {pos:>7}/{len:7}";
304          }
305          sty_ev = ProgressStyle::with_template(template_bar_n_ev)
306          .unwrap();
307          multi_prog = MultiProgress::new();
308          prog_a  = multi_prog
309                    .add(ProgressBar::new(uio1_total_size as u64)); 
310          prog_b  = multi_prog
311                    .insert_after(&prog_a, ProgressBar::new(uio2_total_size as u64)); 
312          prog_ev = multi_prog
313                    .insert_after(&prog_b, ProgressBar::new(rc.nevents as u64)); 
314          prog_a.set_message (label_a.clone());
315          prog_a.set_prefix  ("\u{1F4BE}");
316          prog_a.set_style   (sty_a.clone());
317          prog_a.set_position(0);
318          prog_b.set_message (label_b.clone());
319          prog_b.set_prefix  ("\u{1F4BE}");
320          prog_b.set_style   (sty_b.clone());
321          prog_b.set_position(0);
322          prog_ev.set_style  (sty_ev.clone());
323          prog_ev.set_prefix ("\u{2728}");
324          prog_ev.set_message("EVENTS");
325          prog_ev.set_position(0);
326          info!("Preparations complete. Run start should be imminent.");
327        }
328        continue; // start loop again
329      } // end Ok(RunConfig) 
330    } // end run_params.try_recv()
331
332    if is_running {
333      if terminate {
334        is_running = false;
335        termination_seqeunce(&prog_ev     ,
336                             &prog_a      ,
337                             &prog_b      ,
338                             show_progress,
339                             &bs_sender   );
340        info!("Run stopped! The runner has processed {n_events} events!");
341        continue;
342      } // end if terminate
343      
344      // We did not terminate the run,
345      // that means we are still going!
346      if force_trigger {
347        //println!("Forcing trigger!");
348        //println!("Time between events {}", time_between_events.unwrap());
349        let elapsed = timer.elapsed().as_secs_f32();
350        //println!("Elapsed {}", elapsed);
351        trace!("Forced trigger mode, {} seconds since last trigger", elapsed);
352        // decide if we have to issue the trigger signal NOW!
353        if elapsed > time_between_events.unwrap() {
354          timer = Instant::now(); 
355          match trigger() {
356            Err(err) => error!("Error when triggering! {err}"),
357            Ok(_)    => trace!("Firing trigger!")
358          }
359        } else { // not enough time has yet passed for the next trigger signal
360          // FIXME - we could sleep here for a bit!
361          continue;
362        }
363      }    
364
365      // calculate current event count
366      if !force_trigger {
367        // this checks if we have seen a new event
368        //match get_event_count_mt() {
369        match get_event_count() {
370          Err (err) => {
371            error!("Can not obtain event count! Err {:?}", err);
372            continue;
373          }
374          Ok (cnt) => {
375            evt_cnt = cnt;
376            if first_iter {
377              last_evt_cnt = evt_cnt;
378              first_iter = false;
379              continue;
380            }
381            if evt_cnt == last_evt_cnt {
382              thread::sleep(one_milli);
383              trace!("We didn't get an updated event count!");
384              continue; // only continue after we see a new event!
385            }
386          } // end ok
387        } // end match
388      } // end force trigger
389
390      // AT THIS POINT WE KNOW WE HAVE SEEN SOMETHING!!!
391      // THIS IS IMPORTANT
392      match ram_buffer_handler(buffer_trip,
393                               &bs_sender) { 
394        Err(err)   => {
395          error!("Can not deal with RAM buffers {err}");
396          continue;
397        }
398        Ok(result) => {
399          which_buff = result.0;
400          buff_size  = result.1;
401          if result.2 { // buffer has tripped
402            // in case we chose a dynamic buffer strategy, 
403            // adapt the buffer size for the next time
404            match settings.rb_buff_strategy {
405              // first case we have a fixed buffer size
406              RBBufferStrategy::NEvents(_) => (),
407              RBBufferStrategy::AdaptToRate(n_secs) => {
408                match get_trigger_rate() {
409                  Err(err) => {
410                    error!("Unable to obtain trigger rate! {err}");
411                    // FIXME - Reasonable default?
412                    buffer_trip = 50;
413                  },
414                  Ok(rate) => {
415                    buffer_trip = rate as usize*n_secs as usize*EVENT_SIZE ;
416                    trace!("Dynamic setting of buffer trip size for rate {} and n_secs {}! Setting buffer trip size to {}",rate, n_secs, buffer_trip);
417                  }
418                }
419              }
420            }
421            // check again if buffer trip exceeds total size
422            if (buffer_trip > DATABUF_TOTAL_SIZE) 
423            || (buffer_trip > DATABUF_TOTAL_SIZE) {
424              error!("Tripsize of {buffer_trip} exceeds buffer sizes of A : {uio1_total_size} or B : {uio2_total_size}. The EVENT_SIZE is {EVENT_SIZE}");
425              warn!("Will set buffer_trip to {DATABUF_TOTAL_SIZE}");
426              buffer_trip = DATABUF_TOTAL_SIZE;
427            } else {
428              uio1_total_size = buffer_trip;
429              uio2_total_size = buffer_trip;
430            }
431            if show_progress {
432              prog_a.set_length(uio1_total_size as u64);
433              prog_b.set_length(uio2_total_size as u64);
434            }
435          }
436        }
437      }
438      if force_trigger {
439          n_events += 1;
440      } else {
441        delta_events = (evt_cnt - last_evt_cnt) as u64;
442        n_events    += delta_events;
443        last_evt_cnt = evt_cnt;
444      }
445      if show_progress {
446        match which_buff {
447          RamBuffer::A => {
448            prog_a.set_position(buff_size as u64);
449            prog_b.set_position(0);
450          }
451          RamBuffer::B => {
452            prog_b.set_position(buff_size as u64);
453            prog_a.set_position(0);
454          }
455        }
456        prog_ev.set_position(n_events);
457      }
458    } // end is_running
459    
460    // from here on, check termination 
461    // conditions
462    if rc.nseconds > 0 {
463      if met.elapsed().as_secs() > rc.nseconds  as u64{
464        terminate = true;
465      }
466    }
467    
468    // FIXME
469    if !rc.nevents == 0 {
470      if rc.nevents != 0 {
471        if n_events > rc.nevents as u64{
472          terminate = true;
473        }
474      }
475      
476      if rc.nseconds > 0 {
477          if met.elapsed().as_secs() > rc.nseconds  as u64{
478            terminate = true;
479          }
480        }
481
482      // reduce cpu load
483      if !terminate {
484        if !force_trigger { 
485          thread::sleep(100*one_milli);
486        }
487      }
488    }
489  } // end loop
490}
491
492
493