liftof_rb/threads/
runner.rs

1use std::time::{Instant};
2use std::{thread, time};
3use crossbeam_channel::{Sender,
4                        Receiver};
5use std::sync::{
6    Arc,
7    Mutex,
8};
9
10use indicatif::{
11    MultiProgress,
12    ProgressBar,
13    ProgressStyle
14};
15
16use crate::control::*;
17use crate::memory::*;
18use crate::api::*;
19
20use gondola_core::prelude::*;
21
22//use tof_dataclasses::events::{DataType};
23//use tof_dataclasses::commands::{TofOperationMode};
24//use tof_dataclasses::commands::config::{
25//  RunConfig
26//};
27////use tof_dataclasses::io::RBEventMemoryStreamer;
28////use tof_dataclasses::packets::TofPacket;
29////use tof_dataclasses::threading::ThreadControl;
30//
31//use liftof_lib::settings::{
32//    RBSettings,
33//    RBBufferStrategy
34//};
35//
36//use liftof_lib::thread_control::ThreadControl;
37
38
39/// Shutdown a run within the runner thread
40fn termination_seqeunce(prog_ev       : &ProgressBar,
41                        prog_a        : &ProgressBar,
42                        prog_b        : &ProgressBar,
43                        show_progress : bool,
44                        bs_sender     : &Sender<Vec<u8>>) {
45  info!("Calling termination sequence, will end current run!");
46  // just to be sure we set the self trigger rate to 0 
47  // this is for the poisson trigger)
48  match set_self_trig_rate(0) {
49    Err(err) => error!("Resetting self trigger rate to 0Hz failed! Err {err}"),
50    Ok(_)    => ()
51  }
52  match disable_trigger() {
53    Err(err) => error!("Can not disable triggers, error {err}"),
54    Ok(_)    => info!("Disabling triggers! Stopping current run!")
55  }
56  if show_progress {
57    prog_ev.finish();
58    prog_a.finish();
59    prog_b.finish();
60  }
61  match ram_buffer_handler(1,
62                           &bs_sender) { 
63    Err(err)   => {
64      error!("Can not deal with RAM buffers {err}");
65    },
66    Ok(_) => ()
67  }
68  info!("Termination sequence complete!");
69}
70
71
72/// Thread which controls run start/stop, deals with 
73/// runconfigs and dis/enable triggers accordingly
74///
75///
76///  # Arguments
77///
78///  * run_config     : A channel over which we can pass a RunConfig.
79///                     This will either initialize data taking or 
80///                     stop it.
81/// 
82///  * prog_op_ev     : An option for a progress bar which
83///                     is helpful for debugging
84///  * force_trigger  : Run in forced trigger mode
85///
86///
87pub fn runner(run_config              : &Receiver<RunConfig>,
88              bs_sender               : &Sender<Vec<u8>>,
89              dtf_to_evproc           : &Sender<DataType>,
90              opmode_to_cache         : &Sender<TofOperationMode>,
91              show_progress           : bool,
92              settings                : &RBSettings, 
93              thread_control          : Arc<Mutex<ThreadControl>>) { 
94  
95  let one_milli        = time::Duration::from_millis(1);
96  let one_sec          = time::Duration::from_secs(1);
97  let mut first_iter   = true; 
98  let mut last_evt_cnt : u32 = 0;
99  let mut evt_cnt      = 0u32;
100  let mut delta_events : u64;
101  let mut n_events     : u64 = 0;
102 
103  // trigger settings. Per default, we latch to the 
104  let mut latch_to_mtb = true;
105
106  let mut timer               = Instant::now();
107  // do we have to manually trigger at the desired 
108  // time inberval? Then we set force_trigger.
109  // The Poisson trigger triggers automatically.
110  let mut force_trigger = false;
111  let mut time_between_events : Option<f32> = None;
112  let met = time::Instant::now();
113
114  // run start/stop conditions
115  let mut terminate             = false;
116  let mut is_running            = false;
117  let mut listen_for_new_config = false;
118  let mut rc = RunConfig::new();
119  
120  // this are all settings for the progress bar
121  let mut template_bar_n_ev : &str;
122  let mut sty_ev     : ProgressStyle;
123  let mut multi_prog : MultiProgress;
124  let mut prog_a  = ProgressBar::hidden();
125  let mut prog_b  = ProgressBar::hidden();
126  let mut prog_ev = ProgressBar::hidden();
127  let template_bar_a   : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {bytes:>7}/{total_bytes:7} ";
128  let template_bar_b   : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.green/grey} {bytes:>7}/{total_bytes:7} ";
129  let label_a   = String::from("Buff A");
130  let label_b   = String::from("Buff B");
131  let sty_a = ProgressStyle::with_template(template_bar_a)
132  .unwrap();
133  let sty_b = ProgressStyle::with_template(template_bar_b)
134  .unwrap();
135  prog_a.set_position(0);
136  prog_b.set_position(0);
137  prog_ev.set_position(0);
138
139  let mut which_buff  : RamBuffer;
140  let mut buff_size   : usize;
141  // set a default of 2000 events in the cache, 
142  // but this will be defined in the run params
143  let mut buffer_trip : usize = 2000*EVENT_SIZE;
144  let mut uio1_total_size = DATABUF_TOTAL_SIZE;
145  let mut uio2_total_size = DATABUF_TOTAL_SIZE;
146  loop {
147    match thread_control.lock() {
148      Ok(tc) => {
149        if tc.stop_flag {
150          info!("Received stop signal. Will stop thread!");
151          termination_seqeunce(&prog_ev     ,
152                               &prog_a      ,
153                               &prog_b      ,
154                               show_progress,
155                               &bs_sender   );
156          break;
157        }
158      },
159      Err(err) => {
160        trace!("Can't acquire lock! {err}");
161      },
162    }
163    match run_config.try_recv() {
164      Err(err) => {
165        trace!("Did not receive a new RunConfig! Err {err}");
166        // in this case, we just wait until we get a new run config!
167        if listen_for_new_config {
168          thread::sleep(one_sec);
169          continue;
170        }
171      }
172      Ok(new_config) => {
173        // we got a new config. We will proceed with our loop,
174        // except the config says run_active = false.
175        // In that case, we will end and listen for the next
176        // config
177        listen_for_new_config = false;
178        println!("==> Received a new set of RunConfig\n {}!", new_config);
179
180        // reset some variables for the loop
181        first_iter   = true; 
182        last_evt_cnt = 0;
183        evt_cnt      = 0;
184        //delta_events = 0;
185        n_events     = 0;
186        rc           = new_config;
187        // first of all, check if the new run config is active. 
188        // if not, stop all triggers
189        if !rc.is_active {
190          listen_for_new_config = true;
191          termination_seqeunce(&prog_ev     ,
192                               &prog_a      ,
193                               &prog_b      ,
194                               show_progress,
195                               &bs_sender   );
196          continue;
197        }
198        // we have an active run, initializing
199        terminate = false;
200
201        // deal with the individual settings:
202        // first buffer size
203        info!("Setting buffer trip value to {}", buffer_trip);
204        buffer_trip = (rc.rb_buff_size as usize)*EVENT_SIZE; 
205        if (buffer_trip > DATABUF_TOTAL_SIZE) 
206        || (buffer_trip > DATABUF_TOTAL_SIZE) {
207          error!("Tripsize of {buffer_trip} exceeds buffer sizes of A : {uio1_total_size} or B : {uio2_total_size}. The EVENT_SIZE is {EVENT_SIZE}");
208          warn!("Will set buffer_trip to {DATABUF_TOTAL_SIZE}");
209          buffer_trip = DATABUF_TOTAL_SIZE;
210        } else {
211          uio1_total_size = buffer_trip;
212          uio2_total_size = buffer_trip;
213        }
214        
215        match opmode_to_cache.send(rc.tof_op_mode.clone()) {
216          Err(err) => {
217            error!("Unable to send TofOperationMode to the event cache! Err {err}");
218          }
219          Ok(_)    => {
220            info!("Send TofOperationMode {} to event processing thread!", rc.tof_op_mode);
221          }
222        }
223
224        let dt_c = rc.data_type.clone();
225        match dtf_to_evproc.send(dt_c) {
226          Err(err) => {
227            error!("Unable to send dataformat to event processing thread! {err}");
228          }
229          Ok(_) => {
230            info!("Sent dataformat {} to event processing thread!", rc.data_type);
231          }
232        }
233
234        // data type
235        match rc.data_type {
236          DataType::VoltageCalibration | 
237          DataType::TimingCalibration  | 
238          DataType::Noi                |
239          DataType::RBTriggerPoisson   | 
240          DataType::RBTriggerPeriodic =>  {
241            latch_to_mtb = false;
242          },
243          _ => ()
244        }
245        if rc.trigger_poisson_rate > 0 {
246          latch_to_mtb = false;
247          // we also activate the poisson trigger
248          //enable_poisson_self_trigger(rc.trigger_poisson_rate as f32);
249        }
250        if rc.trigger_fixed_rate>0 {
251          force_trigger = true;
252          time_between_events = Some(1.0/(rc.trigger_fixed_rate as f32));
253          warn!("Will run in forced trigger mode with a rate of {} Hz!", rc.trigger_fixed_rate);
254          debug!("Will call trigger() every {} seconds...", time_between_events.unwrap());
255          latch_to_mtb = false;
256        }
257        match disable_trigger() {
258          Err(err) => error!("Can not disable triggers! {err}"),
259          Ok(_)    => info!("Triggers disabled awaiting SOFT_RESET!"),
260        }
261        info!("Attempting soft reset...");
262        match soft_reset_board() {
263          Err(err) => error!("Unable to reset board! {err}"),
264          Ok(_)    => info!("SOFT_RESET succesful!"),
265        }
266        // preparations done, let's gooo
267        //reset_dma_and_buffers();
268
269        if latch_to_mtb {
270          match set_master_trigger_mode() {
271            Err(err) => error!("Can not initialize master trigger mode, Err {err}"),
272            Ok(_)    => info!("Latching to MasterTrigger")
273          }
274        } else {
275          match disable_master_trigger_mode() {
276            Err(err) => error!("Can not disable master trigger mode, Err {err}"),
277            Ok(_)    => info!("Master trigger mode didsabled!")
278          }
279        }
280        
281        // this basically signals "RUNSTART"
282        match enable_trigger() {
283          Err(err) => error!("Can not enable triggers! Err {err}"),
284          Ok(_)    => info!("Triggers enabled - Run start!")
285        }
286        if rc.trigger_poisson_rate > 0 {
287          enable_poisson_self_trigger(rc.trigger_poisson_rate as f32);
288        }
289        // FIXME - only if above call Ok()
290        is_running = true;
291
292        if !force_trigger {
293          // we relax and let the system go 
294          // for a short bit
295          thread::sleep(one_sec);
296          match get_trigger_rate() {
297            Err(err) => error!("Unable to obtain trigger rate! Err {err}"),
298            Ok(rate) => info!("Seing MTB trigger rate of {rate} Hz")
299          }
300        }
301        if show_progress {
302          if rc.nevents == 0 {
303            template_bar_n_ev = "[{elapsed_precise}] {prefix} {msg} {spinner} ";
304          } else {
305            template_bar_n_ev = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.red/grey} {pos:>7}/{len:7}";
306          }
307          sty_ev = ProgressStyle::with_template(template_bar_n_ev)
308          .unwrap();
309          multi_prog = MultiProgress::new();
310          prog_a  = multi_prog
311                    .add(ProgressBar::new(uio1_total_size as u64)); 
312          prog_b  = multi_prog
313                    .insert_after(&prog_a, ProgressBar::new(uio2_total_size as u64)); 
314          prog_ev = multi_prog
315                    .insert_after(&prog_b, ProgressBar::new(rc.nevents as u64)); 
316          prog_a.set_message (label_a.clone());
317          prog_a.set_prefix  ("\u{1F4BE}");
318          prog_a.set_style   (sty_a.clone());
319          prog_a.set_position(0);
320          prog_b.set_message (label_b.clone());
321          prog_b.set_prefix  ("\u{1F4BE}");
322          prog_b.set_style   (sty_b.clone());
323          prog_b.set_position(0);
324          prog_ev.set_style  (sty_ev.clone());
325          prog_ev.set_prefix ("\u{2728}");
326          prog_ev.set_message("EVENTS");
327          prog_ev.set_position(0);
328          info!("Preparations complete. Run start should be imminent.");
329        }
330        continue; // start loop again
331      } // end Ok(RunConfig) 
332    } // end run_params.try_recv()
333
334    if is_running {
335      if terminate {
336        is_running = false;
337        termination_seqeunce(&prog_ev     ,
338                             &prog_a      ,
339                             &prog_b      ,
340                             show_progress,
341                             &bs_sender   );
342        info!("Run stopped! The runner has processed {n_events} events!");
343        continue;
344      } // end if terminate
345      
346      // We did not terminate the run,
347      // that means we are still going!
348      if force_trigger {
349        //println!("Forcing trigger!");
350        //println!("Time between events {}", time_between_events.unwrap());
351        let elapsed = timer.elapsed().as_secs_f32();
352        //println!("Elapsed {}", elapsed);
353        trace!("Forced trigger mode, {} seconds since last trigger", elapsed);
354        // decide if we have to issue the trigger signal NOW!
355        if elapsed > time_between_events.unwrap() {
356          timer = Instant::now(); 
357          match trigger() {
358            Err(err) => error!("Error when triggering! {err}"),
359            Ok(_)    => trace!("Firing trigger!")
360          }
361        } else { // not enough time has yet passed for the next trigger signal
362          // FIXME - we could sleep here for a bit!
363          continue;
364        }
365      }    
366
367      // calculate current event count
368      if !force_trigger {
369        // this checks if we have seen a new event
370        //match get_event_count_mt() {
371        match get_event_count() {
372          Err (err) => {
373            error!("Can not obtain event count! Err {:?}", err);
374            continue;
375          }
376          Ok (cnt) => {
377            evt_cnt = cnt;
378            if first_iter {
379              last_evt_cnt = evt_cnt;
380              first_iter = false;
381              continue;
382            }
383            if evt_cnt == last_evt_cnt {
384              thread::sleep(one_milli);
385              trace!("We didn't get an updated event count!");
386              continue; // only continue after we see a new event!
387            }
388          } // end ok
389        } // end match
390      } // end force trigger
391
392      // AT THIS POINT WE KNOW WE HAVE SEEN SOMETHING!!!
393      // THIS IS IMPORTANT
394      match ram_buffer_handler(buffer_trip,
395                               &bs_sender) { 
396        Err(err)   => {
397          error!("Can not deal with RAM buffers {err}");
398          continue;
399        }
400        Ok(result) => {
401          which_buff = result.0;
402          buff_size  = result.1;
403          if result.2 { // buffer has tripped
404            // in case we chose a dynamic buffer strategy, 
405            // adapt the buffer size for the next time
406            match settings.rb_buff_strategy {
407              // first case we have a fixed buffer size
408              RBBufferStrategy::NEvents(_) => (),
409              RBBufferStrategy::AdaptToRate(n_secs) => {
410                match get_trigger_rate() {
411                  Err(err) => {
412                    error!("Unable to obtain trigger rate! {err}");
413                    // FIXME - Reasonable default?
414                    buffer_trip = 50;
415                  },
416                  Ok(rate) => {
417                    buffer_trip = rate as usize*n_secs as usize*EVENT_SIZE ;
418                    trace!("Dynamic setting of buffer trip size for rate {} and n_secs {}! Setting buffer trip size to {}",rate, n_secs, buffer_trip);
419                  }
420                }
421              }
422            }
423            // check again if buffer trip exceeds total size
424            if (buffer_trip > DATABUF_TOTAL_SIZE) 
425            || (buffer_trip > DATABUF_TOTAL_SIZE) {
426              error!("Tripsize of {buffer_trip} exceeds buffer sizes of A : {uio1_total_size} or B : {uio2_total_size}. The EVENT_SIZE is {EVENT_SIZE}");
427              warn!("Will set buffer_trip to {DATABUF_TOTAL_SIZE}");
428              buffer_trip = DATABUF_TOTAL_SIZE;
429            } else {
430              uio1_total_size = buffer_trip;
431              uio2_total_size = buffer_trip;
432            }
433            if show_progress {
434              prog_a.set_length(uio1_total_size as u64);
435              prog_b.set_length(uio2_total_size as u64);
436            }
437          }
438        }
439      }
440      if force_trigger {
441          n_events += 1;
442      } else {
443        delta_events = (evt_cnt - last_evt_cnt) as u64;
444        n_events    += delta_events;
445        last_evt_cnt = evt_cnt;
446      }
447      if show_progress {
448        match which_buff {
449          RamBuffer::A => {
450            prog_a.set_position(buff_size as u64);
451            prog_b.set_position(0);
452          }
453          RamBuffer::B => {
454            prog_b.set_position(buff_size as u64);
455            prog_a.set_position(0);
456          }
457        }
458        prog_ev.set_position(n_events);
459      }
460    } // end is_running
461    
462    // from here on, check termination 
463    // conditions
464    if rc.nseconds > 0 {
465      if met.elapsed().as_secs() > rc.nseconds  as u64{
466        terminate = true;
467      }
468    }
469    
470    // FIXME
471    if !rc.nevents == 0 {
472      if rc.nevents != 0 {
473        if n_events > rc.nevents as u64{
474          terminate = true;
475        }
476      }
477      
478      if rc.nseconds > 0 {
479          if met.elapsed().as_secs() > rc.nseconds  as u64{
480            terminate = true;
481          }
482        }
483
484      // reduce cpu load
485      if !terminate {
486        if !force_trigger { 
487          thread::sleep(100*one_milli);
488        }
489      }
490    }
491  } // end loop
492}
493
494
495