liftof_rb/threads/
runner.rs1use std::time::{Instant};
2use std::{thread, time};
3use crossbeam_channel::{Sender,
4 Receiver};
5use std::sync::{
6 Arc,
7 Mutex,
8};
9
10use indicatif::{
11 MultiProgress,
12 ProgressBar,
13 ProgressStyle
14};
15
16use crate::control::*;
17use crate::memory::*;
18use crate::api::*;
19
20use gondola_core::prelude::*;
21
22fn termination_seqeunce(prog_ev : &ProgressBar,
41 prog_a : &ProgressBar,
42 prog_b : &ProgressBar,
43 show_progress : bool,
44 bs_sender : &Sender<Vec<u8>>) {
45 info!("Calling termination sequence, will end current run!");
46 match set_self_trig_rate(0) {
49 Err(err) => error!("Resetting self trigger rate to 0Hz failed! Err {err}"),
50 Ok(_) => ()
51 }
52 match disable_trigger() {
53 Err(err) => error!("Can not disable triggers, error {err}"),
54 Ok(_) => info!("Disabling triggers! Stopping current run!")
55 }
56 if show_progress {
57 prog_ev.finish();
58 prog_a.finish();
59 prog_b.finish();
60 }
61 match ram_buffer_handler(1,
62 &bs_sender) {
63 Err(err) => {
64 error!("Can not deal with RAM buffers {err}");
65 },
66 Ok(_) => ()
67 }
68 info!("Termination sequence complete!");
69}
70
71
72pub fn runner(run_config : &Receiver<RunConfig>,
88 bs_sender : &Sender<Vec<u8>>,
89 dtf_to_evproc : &Sender<DataType>,
90 opmode_to_cache : &Sender<TofOperationMode>,
91 show_progress : bool,
92 settings : &RBSettings,
93 thread_control : Arc<Mutex<ThreadControl>>) {
94
95 let one_milli = time::Duration::from_millis(1);
96 let one_sec = time::Duration::from_secs(1);
97 let mut first_iter = true;
98 let mut last_evt_cnt : u32 = 0;
99 let mut evt_cnt = 0u32;
100 let mut delta_events : u64;
101 let mut n_events : u64 = 0;
102
103 let mut latch_to_mtb = true;
105
106 let mut timer = Instant::now();
107 let mut force_trigger = false;
111 let mut time_between_events : Option<f32> = None;
112 let met = time::Instant::now();
113
114 let mut terminate = false;
116 let mut is_running = false;
117 let mut listen_for_new_config = false;
118 let mut rc = RunConfig::new();
119
120 let mut template_bar_n_ev : &str;
122 let mut sty_ev : ProgressStyle;
123 let mut multi_prog : MultiProgress;
124 let mut prog_a = ProgressBar::hidden();
125 let mut prog_b = ProgressBar::hidden();
126 let mut prog_ev = ProgressBar::hidden();
127 let template_bar_a : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {bytes:>7}/{total_bytes:7} ";
128 let template_bar_b : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.green/grey} {bytes:>7}/{total_bytes:7} ";
129 let label_a = String::from("Buff A");
130 let label_b = String::from("Buff B");
131 let sty_a = ProgressStyle::with_template(template_bar_a)
132 .unwrap();
133 let sty_b = ProgressStyle::with_template(template_bar_b)
134 .unwrap();
135 prog_a.set_position(0);
136 prog_b.set_position(0);
137 prog_ev.set_position(0);
138
139 let mut which_buff : RamBuffer;
140 let mut buff_size : usize;
141 let mut buffer_trip : usize = 2000*EVENT_SIZE;
144 let mut uio1_total_size = DATABUF_TOTAL_SIZE;
145 let mut uio2_total_size = DATABUF_TOTAL_SIZE;
146 loop {
147 match thread_control.lock() {
148 Ok(tc) => {
149 if tc.stop_flag {
150 info!("Received stop signal. Will stop thread!");
151 termination_seqeunce(&prog_ev ,
152 &prog_a ,
153 &prog_b ,
154 show_progress,
155 &bs_sender );
156 break;
157 }
158 },
159 Err(err) => {
160 trace!("Can't acquire lock! {err}");
161 },
162 }
163 match run_config.try_recv() {
164 Err(err) => {
165 trace!("Did not receive a new RunConfig! Err {err}");
166 if listen_for_new_config {
168 thread::sleep(one_sec);
169 continue;
170 }
171 }
172 Ok(new_config) => {
173 listen_for_new_config = false;
178 println!("==> Received a new set of RunConfig\n {}!", new_config);
179
180 first_iter = true;
182 last_evt_cnt = 0;
183 evt_cnt = 0;
184 n_events = 0;
186 rc = new_config;
187 if !rc.is_active {
190 listen_for_new_config = true;
191 termination_seqeunce(&prog_ev ,
192 &prog_a ,
193 &prog_b ,
194 show_progress,
195 &bs_sender );
196 continue;
197 }
198 terminate = false;
200
201 info!("Setting buffer trip value to {}", buffer_trip);
204 buffer_trip = (rc.rb_buff_size as usize)*EVENT_SIZE;
205 if (buffer_trip > DATABUF_TOTAL_SIZE)
206 || (buffer_trip > DATABUF_TOTAL_SIZE) {
207 error!("Tripsize of {buffer_trip} exceeds buffer sizes of A : {uio1_total_size} or B : {uio2_total_size}. The EVENT_SIZE is {EVENT_SIZE}");
208 warn!("Will set buffer_trip to {DATABUF_TOTAL_SIZE}");
209 buffer_trip = DATABUF_TOTAL_SIZE;
210 } else {
211 uio1_total_size = buffer_trip;
212 uio2_total_size = buffer_trip;
213 }
214
215 match opmode_to_cache.send(rc.tof_op_mode.clone()) {
216 Err(err) => {
217 error!("Unable to send TofOperationMode to the event cache! Err {err}");
218 }
219 Ok(_) => {
220 info!("Send TofOperationMode {} to event processing thread!", rc.tof_op_mode);
221 }
222 }
223
224 let dt_c = rc.data_type.clone();
225 match dtf_to_evproc.send(dt_c) {
226 Err(err) => {
227 error!("Unable to send dataformat to event processing thread! {err}");
228 }
229 Ok(_) => {
230 info!("Sent dataformat {} to event processing thread!", rc.data_type);
231 }
232 }
233
234 match rc.data_type {
236 DataType::VoltageCalibration |
237 DataType::TimingCalibration |
238 DataType::Noi |
239 DataType::RBTriggerPoisson |
240 DataType::RBTriggerPeriodic => {
241 latch_to_mtb = false;
242 },
243 _ => ()
244 }
245 if rc.trigger_poisson_rate > 0 {
246 latch_to_mtb = false;
247 }
250 if rc.trigger_fixed_rate>0 {
251 force_trigger = true;
252 time_between_events = Some(1.0/(rc.trigger_fixed_rate as f32));
253 warn!("Will run in forced trigger mode with a rate of {} Hz!", rc.trigger_fixed_rate);
254 debug!("Will call trigger() every {} seconds...", time_between_events.unwrap());
255 latch_to_mtb = false;
256 }
257 match disable_trigger() {
258 Err(err) => error!("Can not disable triggers! {err}"),
259 Ok(_) => info!("Triggers disabled awaiting SOFT_RESET!"),
260 }
261 info!("Attempting soft reset...");
262 match soft_reset_board() {
263 Err(err) => error!("Unable to reset board! {err}"),
264 Ok(_) => info!("SOFT_RESET succesful!"),
265 }
266 if latch_to_mtb {
270 match set_master_trigger_mode() {
271 Err(err) => error!("Can not initialize master trigger mode, Err {err}"),
272 Ok(_) => info!("Latching to MasterTrigger")
273 }
274 } else {
275 match disable_master_trigger_mode() {
276 Err(err) => error!("Can not disable master trigger mode, Err {err}"),
277 Ok(_) => info!("Master trigger mode didsabled!")
278 }
279 }
280
281 match enable_trigger() {
283 Err(err) => error!("Can not enable triggers! Err {err}"),
284 Ok(_) => info!("Triggers enabled - Run start!")
285 }
286 if rc.trigger_poisson_rate > 0 {
287 enable_poisson_self_trigger(rc.trigger_poisson_rate as f32);
288 }
289 is_running = true;
291
292 if !force_trigger {
293 thread::sleep(one_sec);
296 match get_trigger_rate() {
297 Err(err) => error!("Unable to obtain trigger rate! Err {err}"),
298 Ok(rate) => info!("Seing MTB trigger rate of {rate} Hz")
299 }
300 }
301 if show_progress {
302 if rc.nevents == 0 {
303 template_bar_n_ev = "[{elapsed_precise}] {prefix} {msg} {spinner} ";
304 } else {
305 template_bar_n_ev = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.red/grey} {pos:>7}/{len:7}";
306 }
307 sty_ev = ProgressStyle::with_template(template_bar_n_ev)
308 .unwrap();
309 multi_prog = MultiProgress::new();
310 prog_a = multi_prog
311 .add(ProgressBar::new(uio1_total_size as u64));
312 prog_b = multi_prog
313 .insert_after(&prog_a, ProgressBar::new(uio2_total_size as u64));
314 prog_ev = multi_prog
315 .insert_after(&prog_b, ProgressBar::new(rc.nevents as u64));
316 prog_a.set_message (label_a.clone());
317 prog_a.set_prefix ("\u{1F4BE}");
318 prog_a.set_style (sty_a.clone());
319 prog_a.set_position(0);
320 prog_b.set_message (label_b.clone());
321 prog_b.set_prefix ("\u{1F4BE}");
322 prog_b.set_style (sty_b.clone());
323 prog_b.set_position(0);
324 prog_ev.set_style (sty_ev.clone());
325 prog_ev.set_prefix ("\u{2728}");
326 prog_ev.set_message("EVENTS");
327 prog_ev.set_position(0);
328 info!("Preparations complete. Run start should be imminent.");
329 }
330 continue; } } if is_running {
335 if terminate {
336 is_running = false;
337 termination_seqeunce(&prog_ev ,
338 &prog_a ,
339 &prog_b ,
340 show_progress,
341 &bs_sender );
342 info!("Run stopped! The runner has processed {n_events} events!");
343 continue;
344 } if force_trigger {
349 let elapsed = timer.elapsed().as_secs_f32();
352 trace!("Forced trigger mode, {} seconds since last trigger", elapsed);
354 if elapsed > time_between_events.unwrap() {
356 timer = Instant::now();
357 match trigger() {
358 Err(err) => error!("Error when triggering! {err}"),
359 Ok(_) => trace!("Firing trigger!")
360 }
361 } else { continue;
364 }
365 }
366
367 if !force_trigger {
369 match get_event_count() {
372 Err (err) => {
373 error!("Can not obtain event count! Err {:?}", err);
374 continue;
375 }
376 Ok (cnt) => {
377 evt_cnt = cnt;
378 if first_iter {
379 last_evt_cnt = evt_cnt;
380 first_iter = false;
381 continue;
382 }
383 if evt_cnt == last_evt_cnt {
384 thread::sleep(one_milli);
385 trace!("We didn't get an updated event count!");
386 continue; }
388 } } } match ram_buffer_handler(buffer_trip,
395 &bs_sender) {
396 Err(err) => {
397 error!("Can not deal with RAM buffers {err}");
398 continue;
399 }
400 Ok(result) => {
401 which_buff = result.0;
402 buff_size = result.1;
403 if result.2 { match settings.rb_buff_strategy {
407 RBBufferStrategy::NEvents(_) => (),
409 RBBufferStrategy::AdaptToRate(n_secs) => {
410 match get_trigger_rate() {
411 Err(err) => {
412 error!("Unable to obtain trigger rate! {err}");
413 buffer_trip = 50;
415 },
416 Ok(rate) => {
417 buffer_trip = rate as usize*n_secs as usize*EVENT_SIZE ;
418 trace!("Dynamic setting of buffer trip size for rate {} and n_secs {}! Setting buffer trip size to {}",rate, n_secs, buffer_trip);
419 }
420 }
421 }
422 }
423 if (buffer_trip > DATABUF_TOTAL_SIZE)
425 || (buffer_trip > DATABUF_TOTAL_SIZE) {
426 error!("Tripsize of {buffer_trip} exceeds buffer sizes of A : {uio1_total_size} or B : {uio2_total_size}. The EVENT_SIZE is {EVENT_SIZE}");
427 warn!("Will set buffer_trip to {DATABUF_TOTAL_SIZE}");
428 buffer_trip = DATABUF_TOTAL_SIZE;
429 } else {
430 uio1_total_size = buffer_trip;
431 uio2_total_size = buffer_trip;
432 }
433 if show_progress {
434 prog_a.set_length(uio1_total_size as u64);
435 prog_b.set_length(uio2_total_size as u64);
436 }
437 }
438 }
439 }
440 if force_trigger {
441 n_events += 1;
442 } else {
443 delta_events = (evt_cnt - last_evt_cnt) as u64;
444 n_events += delta_events;
445 last_evt_cnt = evt_cnt;
446 }
447 if show_progress {
448 match which_buff {
449 RamBuffer::A => {
450 prog_a.set_position(buff_size as u64);
451 prog_b.set_position(0);
452 }
453 RamBuffer::B => {
454 prog_b.set_position(buff_size as u64);
455 prog_a.set_position(0);
456 }
457 }
458 prog_ev.set_position(n_events);
459 }
460 } if rc.nseconds > 0 {
465 if met.elapsed().as_secs() > rc.nseconds as u64{
466 terminate = true;
467 }
468 }
469
470 if !rc.nevents == 0 {
472 if rc.nevents != 0 {
473 if n_events > rc.nevents as u64{
474 terminate = true;
475 }
476 }
477
478 if rc.nseconds > 0 {
479 if met.elapsed().as_secs() > rc.nseconds as u64{
480 terminate = true;
481 }
482 }
483
484 if !terminate {
486 if !force_trigger {
487 thread::sleep(100*one_milli);
488 }
489 }
490 }
491 } }
493
494
495