liftof_rb/threads/
runner.rs1use std::time::{Instant};
2use std::{thread, time};
3use crossbeam_channel::{Sender,
4 Receiver};
5use std::sync::{
6 Arc,
7 Mutex,
8};
9
10use indicatif::{
11 MultiProgress,
12 ProgressBar,
13 ProgressStyle
14};
15
16use crate::control::*;
17use crate::memory::*;
18use crate::api::*;
19
20use tof_dataclasses::events::{DataType};
21use tof_dataclasses::commands::{TofOperationMode};
22use tof_dataclasses::commands::config::{
23 RunConfig
24};
25use liftof_lib::settings::{
30 RBSettings,
31 RBBufferStrategy
32};
33
34use liftof_lib::thread_control::ThreadControl;
35
36
37fn termination_seqeunce(prog_ev : &ProgressBar,
39 prog_a : &ProgressBar,
40 prog_b : &ProgressBar,
41 show_progress : bool,
42 bs_sender : &Sender<Vec<u8>>) {
43 info!("Calling termination sequence, will end current run!");
44 match set_self_trig_rate(0) {
47 Err(err) => error!("Resetting self trigger rate to 0Hz failed! Err {err}"),
48 Ok(_) => ()
49 }
50 match disable_trigger() {
51 Err(err) => error!("Can not disable triggers, error {err}"),
52 Ok(_) => info!("Disabling triggers! Stopping current run!")
53 }
54 if show_progress {
55 prog_ev.finish();
56 prog_a.finish();
57 prog_b.finish();
58 }
59 match ram_buffer_handler(1,
60 &bs_sender) {
61 Err(err) => {
62 error!("Can not deal with RAM buffers {err}");
63 },
64 Ok(_) => ()
65 }
66 info!("Termination sequence complete!");
67}
68
69
70pub fn runner(run_config : &Receiver<RunConfig>,
86 bs_sender : &Sender<Vec<u8>>,
87 dtf_to_evproc : &Sender<DataType>,
88 opmode_to_cache : &Sender<TofOperationMode>,
89 show_progress : bool,
90 settings : &RBSettings,
91 thread_control : Arc<Mutex<ThreadControl>>) {
92
93 let one_milli = time::Duration::from_millis(1);
94 let one_sec = time::Duration::from_secs(1);
95 let mut first_iter = true;
96 let mut last_evt_cnt : u32 = 0;
97 let mut evt_cnt = 0u32;
98 let mut delta_events : u64;
99 let mut n_events : u64 = 0;
100
101 let mut latch_to_mtb = true;
103
104 let mut timer = Instant::now();
105 let mut force_trigger = false;
109 let mut time_between_events : Option<f32> = None;
110 let met = time::Instant::now();
111
112 let mut terminate = false;
114 let mut is_running = false;
115 let mut listen_for_new_config = false;
116 let mut rc = RunConfig::new();
117
118 let mut template_bar_n_ev : &str;
120 let mut sty_ev : ProgressStyle;
121 let mut multi_prog : MultiProgress;
122 let mut prog_a = ProgressBar::hidden();
123 let mut prog_b = ProgressBar::hidden();
124 let mut prog_ev = ProgressBar::hidden();
125 let template_bar_a : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {bytes:>7}/{total_bytes:7} ";
126 let template_bar_b : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.green/grey} {bytes:>7}/{total_bytes:7} ";
127 let label_a = String::from("Buff A");
128 let label_b = String::from("Buff B");
129 let sty_a = ProgressStyle::with_template(template_bar_a)
130 .unwrap();
131 let sty_b = ProgressStyle::with_template(template_bar_b)
132 .unwrap();
133 prog_a.set_position(0);
134 prog_b.set_position(0);
135 prog_ev.set_position(0);
136
137 let mut which_buff : RamBuffer;
138 let mut buff_size : usize;
139 let mut buffer_trip : usize = 2000*EVENT_SIZE;
142 let mut uio1_total_size = DATABUF_TOTAL_SIZE;
143 let mut uio2_total_size = DATABUF_TOTAL_SIZE;
144 loop {
145 match thread_control.lock() {
146 Ok(tc) => {
147 if tc.stop_flag {
148 info!("Received stop signal. Will stop thread!");
149 termination_seqeunce(&prog_ev ,
150 &prog_a ,
151 &prog_b ,
152 show_progress,
153 &bs_sender );
154 break;
155 }
156 },
157 Err(err) => {
158 trace!("Can't acquire lock! {err}");
159 },
160 }
161 match run_config.try_recv() {
162 Err(err) => {
163 trace!("Did not receive a new RunConfig! Err {err}");
164 if listen_for_new_config {
166 thread::sleep(one_sec);
167 continue;
168 }
169 }
170 Ok(new_config) => {
171 listen_for_new_config = false;
176 println!("==> Received a new set of RunConfig\n {}!", new_config);
177
178 first_iter = true;
180 last_evt_cnt = 0;
181 evt_cnt = 0;
182 n_events = 0;
184 rc = new_config;
185 if !rc.is_active {
188 listen_for_new_config = true;
189 termination_seqeunce(&prog_ev ,
190 &prog_a ,
191 &prog_b ,
192 show_progress,
193 &bs_sender );
194 continue;
195 }
196 terminate = false;
198
199 info!("Setting buffer trip value to {}", buffer_trip);
202 buffer_trip = (rc.rb_buff_size as usize)*EVENT_SIZE;
203 if (buffer_trip > DATABUF_TOTAL_SIZE)
204 || (buffer_trip > DATABUF_TOTAL_SIZE) {
205 error!("Tripsize of {buffer_trip} exceeds buffer sizes of A : {uio1_total_size} or B : {uio2_total_size}. The EVENT_SIZE is {EVENT_SIZE}");
206 warn!("Will set buffer_trip to {DATABUF_TOTAL_SIZE}");
207 buffer_trip = DATABUF_TOTAL_SIZE;
208 } else {
209 uio1_total_size = buffer_trip;
210 uio2_total_size = buffer_trip;
211 }
212
213 match opmode_to_cache.send(rc.tof_op_mode.clone()) {
214 Err(err) => {
215 error!("Unable to send TofOperationMode to the event cache! Err {err}");
216 }
217 Ok(_) => {
218 info!("Send TofOperationMode {} to event processing thread!", rc.tof_op_mode);
219 }
220 }
221
222 let dt_c = rc.data_type.clone();
223 match dtf_to_evproc.send(dt_c) {
224 Err(err) => {
225 error!("Unable to send dataformat to event processing thread! {err}");
226 }
227 Ok(_) => {
228 info!("Sent dataformat {} to event processing thread!", rc.data_type);
229 }
230 }
231
232 match rc.data_type {
234 DataType::VoltageCalibration |
235 DataType::TimingCalibration |
236 DataType::Noi |
237 DataType::RBTriggerPoisson |
238 DataType::RBTriggerPeriodic => {
239 latch_to_mtb = false;
240 },
241 _ => ()
242 }
243 if rc.trigger_poisson_rate > 0 {
244 latch_to_mtb = false;
245 }
248 if rc.trigger_fixed_rate>0 {
249 force_trigger = true;
250 time_between_events = Some(1.0/(rc.trigger_fixed_rate as f32));
251 warn!("Will run in forced trigger mode with a rate of {} Hz!", rc.trigger_fixed_rate);
252 debug!("Will call trigger() every {} seconds...", time_between_events.unwrap());
253 latch_to_mtb = false;
254 }
255 match disable_trigger() {
256 Err(err) => error!("Can not disable triggers! {err}"),
257 Ok(_) => info!("Triggers disabled awaiting SOFT_RESET!"),
258 }
259 info!("Attempting soft reset...");
260 match soft_reset_board() {
261 Err(err) => error!("Unable to reset board! {err}"),
262 Ok(_) => info!("SOFT_RESET succesful!"),
263 }
264 if latch_to_mtb {
268 match set_master_trigger_mode() {
269 Err(err) => error!("Can not initialize master trigger mode, Err {err}"),
270 Ok(_) => info!("Latching to MasterTrigger")
271 }
272 } else {
273 match disable_master_trigger_mode() {
274 Err(err) => error!("Can not disable master trigger mode, Err {err}"),
275 Ok(_) => info!("Master trigger mode didsabled!")
276 }
277 }
278
279 match enable_trigger() {
281 Err(err) => error!("Can not enable triggers! Err {err}"),
282 Ok(_) => info!("Triggers enabled - Run start!")
283 }
284 if rc.trigger_poisson_rate > 0 {
285 enable_poisson_self_trigger(rc.trigger_poisson_rate as f32);
286 }
287 is_running = true;
289
290 if !force_trigger {
291 thread::sleep(one_sec);
294 match get_trigger_rate() {
295 Err(err) => error!("Unable to obtain trigger rate! Err {err}"),
296 Ok(rate) => info!("Seing MTB trigger rate of {rate} Hz")
297 }
298 }
299 if show_progress {
300 if rc.nevents == 0 {
301 template_bar_n_ev = "[{elapsed_precise}] {prefix} {msg} {spinner} ";
302 } else {
303 template_bar_n_ev = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.red/grey} {pos:>7}/{len:7}";
304 }
305 sty_ev = ProgressStyle::with_template(template_bar_n_ev)
306 .unwrap();
307 multi_prog = MultiProgress::new();
308 prog_a = multi_prog
309 .add(ProgressBar::new(uio1_total_size as u64));
310 prog_b = multi_prog
311 .insert_after(&prog_a, ProgressBar::new(uio2_total_size as u64));
312 prog_ev = multi_prog
313 .insert_after(&prog_b, ProgressBar::new(rc.nevents as u64));
314 prog_a.set_message (label_a.clone());
315 prog_a.set_prefix ("\u{1F4BE}");
316 prog_a.set_style (sty_a.clone());
317 prog_a.set_position(0);
318 prog_b.set_message (label_b.clone());
319 prog_b.set_prefix ("\u{1F4BE}");
320 prog_b.set_style (sty_b.clone());
321 prog_b.set_position(0);
322 prog_ev.set_style (sty_ev.clone());
323 prog_ev.set_prefix ("\u{2728}");
324 prog_ev.set_message("EVENTS");
325 prog_ev.set_position(0);
326 info!("Preparations complete. Run start should be imminent.");
327 }
328 continue; } } if is_running {
333 if terminate {
334 is_running = false;
335 termination_seqeunce(&prog_ev ,
336 &prog_a ,
337 &prog_b ,
338 show_progress,
339 &bs_sender );
340 info!("Run stopped! The runner has processed {n_events} events!");
341 continue;
342 } if force_trigger {
347 let elapsed = timer.elapsed().as_secs_f32();
350 trace!("Forced trigger mode, {} seconds since last trigger", elapsed);
352 if elapsed > time_between_events.unwrap() {
354 timer = Instant::now();
355 match trigger() {
356 Err(err) => error!("Error when triggering! {err}"),
357 Ok(_) => trace!("Firing trigger!")
358 }
359 } else { continue;
362 }
363 }
364
365 if !force_trigger {
367 match get_event_count() {
370 Err (err) => {
371 error!("Can not obtain event count! Err {:?}", err);
372 continue;
373 }
374 Ok (cnt) => {
375 evt_cnt = cnt;
376 if first_iter {
377 last_evt_cnt = evt_cnt;
378 first_iter = false;
379 continue;
380 }
381 if evt_cnt == last_evt_cnt {
382 thread::sleep(one_milli);
383 trace!("We didn't get an updated event count!");
384 continue; }
386 } } } match ram_buffer_handler(buffer_trip,
393 &bs_sender) {
394 Err(err) => {
395 error!("Can not deal with RAM buffers {err}");
396 continue;
397 }
398 Ok(result) => {
399 which_buff = result.0;
400 buff_size = result.1;
401 if result.2 { match settings.rb_buff_strategy {
405 RBBufferStrategy::NEvents(_) => (),
407 RBBufferStrategy::AdaptToRate(n_secs) => {
408 match get_trigger_rate() {
409 Err(err) => {
410 error!("Unable to obtain trigger rate! {err}");
411 buffer_trip = 50;
413 },
414 Ok(rate) => {
415 buffer_trip = rate as usize*n_secs as usize*EVENT_SIZE ;
416 trace!("Dynamic setting of buffer trip size for rate {} and n_secs {}! Setting buffer trip size to {}",rate, n_secs, buffer_trip);
417 }
418 }
419 }
420 }
421 if (buffer_trip > DATABUF_TOTAL_SIZE)
423 || (buffer_trip > DATABUF_TOTAL_SIZE) {
424 error!("Tripsize of {buffer_trip} exceeds buffer sizes of A : {uio1_total_size} or B : {uio2_total_size}. The EVENT_SIZE is {EVENT_SIZE}");
425 warn!("Will set buffer_trip to {DATABUF_TOTAL_SIZE}");
426 buffer_trip = DATABUF_TOTAL_SIZE;
427 } else {
428 uio1_total_size = buffer_trip;
429 uio2_total_size = buffer_trip;
430 }
431 if show_progress {
432 prog_a.set_length(uio1_total_size as u64);
433 prog_b.set_length(uio2_total_size as u64);
434 }
435 }
436 }
437 }
438 if force_trigger {
439 n_events += 1;
440 } else {
441 delta_events = (evt_cnt - last_evt_cnt) as u64;
442 n_events += delta_events;
443 last_evt_cnt = evt_cnt;
444 }
445 if show_progress {
446 match which_buff {
447 RamBuffer::A => {
448 prog_a.set_position(buff_size as u64);
449 prog_b.set_position(0);
450 }
451 RamBuffer::B => {
452 prog_b.set_position(buff_size as u64);
453 prog_a.set_position(0);
454 }
455 }
456 prog_ev.set_position(n_events);
457 }
458 } if rc.nseconds > 0 {
463 if met.elapsed().as_secs() > rc.nseconds as u64{
464 terminate = true;
465 }
466 }
467
468 if !rc.nevents == 0 {
470 if rc.nevents != 0 {
471 if n_events > rc.nevents as u64{
472 terminate = true;
473 }
474 }
475
476 if rc.nseconds > 0 {
477 if met.elapsed().as_secs() > rc.nseconds as u64{
478 terminate = true;
479 }
480 }
481
482 if !terminate {
484 if !force_trigger {
485 thread::sleep(100*one_milli);
486 }
487 }
488 }
489 } }
491
492
493