liftof_rb/threads/
runner.rs1use std::time::{Instant};
2use std::{thread, time};
3use crossbeam_channel::{Sender,
4 Receiver};
5use std::sync::{
6 Arc,
7 Mutex,
8};
9
10use indicatif::{
11 MultiProgress,
12 ProgressBar,
13 ProgressStyle
14};
15
16use crate::control::*;
17use crate::memory::*;
18use crate::api::*;
19
20use gondola_core::prelude::*;
21
22fn termination_seqeunce(prog_ev : &ProgressBar,
24 prog_a : &ProgressBar,
25 prog_b : &ProgressBar,
26 show_progress : bool,
27 bs_sender : &Sender<Vec<u8>>) {
28 info!("Calling termination sequence, will end current run!");
29 match set_self_trig_rate(0) {
32 Err(err) => error!("Resetting self trigger rate to 0Hz failed! Err {err}"),
33 Ok(_) => ()
34 }
35 match disable_trigger() {
36 Err(err) => error!("Can not disable triggers, error {err}"),
37 Ok(_) => info!("Disabling triggers! Stopping current run!")
38 }
39 if show_progress {
40 prog_ev.finish();
41 prog_a.finish();
42 prog_b.finish();
43 }
44 match ram_buffer_handler(1,
45 &bs_sender) {
46 Err(err) => {
47 error!("Can not deal with RAM buffers {err}");
48 },
49 Ok(_) => ()
50 }
51 info!("Termination sequence complete!");
52}
53
54
55pub fn runner(rb_id : u8,
71 run_config : &Receiver<RunConfig>,
72 bs_sender : &Sender<Vec<u8>>,
73 dtf_to_evproc : &Sender<DataType>,
74 opmode_to_cache : &Sender<TofOperationMode>,
75 show_progress : bool,
76 settings : &RBSettings,
77 thread_control : Arc<Mutex<ThreadControl>>) {
78
79 let one_milli = time::Duration::from_millis(1);
80 let one_sec = time::Duration::from_secs(1);
81 let mut first_iter = true;
82 let mut last_evt_cnt : u32 = 0;
83 let mut evt_cnt = 0u32;
84 let mut delta_events : u64;
85 let mut n_events : u64 = 0;
86
87 let rb_id_str : String = format!("{rb_id}");
88
89 let mut latch_to_mtb = true;
91
92 let mut timer = Instant::now();
93 let mut force_trigger = false;
97 let mut time_between_events : Option<f32> = None;
98 let met = time::Instant::now();
99
100 let mut terminate = false;
102 let mut is_running = false;
103 let mut listen_for_new_config = false;
104 let mut rc = RunConfig::new();
105
106 let mut template_bar_n_ev : &str;
108 let mut sty_ev : ProgressStyle;
109 let mut multi_prog : MultiProgress;
110 let mut prog_a = ProgressBar::hidden();
111 let mut prog_b = ProgressBar::hidden();
112 let mut prog_ev = ProgressBar::hidden();
113 let template_bar_a : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {bytes:>7}/{total_bytes:7} ";
114 let template_bar_b : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.green/grey} {bytes:>7}/{total_bytes:7} ";
115 let label_a = String::from("Buff A");
116 let label_b = String::from("Buff B");
117 let sty_a = ProgressStyle::with_template(template_bar_a)
118 .unwrap();
119 let sty_b = ProgressStyle::with_template(template_bar_b)
120 .unwrap();
121 prog_a.set_position(0);
122 prog_b.set_position(0);
123 prog_ev.set_position(0);
124
125 let mut which_buff = RamBuffer::A;
126 let mut buff_size = 0usize;
127 let mut has_tripped = false;
128 let mut buffer_trip : usize = 2000*EVENT_SIZE;
131 let mut uio1_total_size = DATABUF_TOTAL_SIZE;
132 let mut uio2_total_size = DATABUF_TOTAL_SIZE;
133 let mut buffer_strategy_timeout = false;
134 let mut buffer_trips_every = 0.0f32;
135 let mut buffer_timer = Instant::now();
136 loop {
137 match thread_control.lock() {
138 Ok(tc) => {
139 if tc.stop_flag {
140 info!("Received stop signal. Will stop thread!");
141 termination_seqeunce(&prog_ev ,
142 &prog_a ,
143 &prog_b ,
144 show_progress,
145 &bs_sender );
146 break;
147 }
148 },
149 Err(err) => {
150 trace!("Can't acquire lock! {err}");
151 },
152 }
153 match run_config.try_recv() {
154 Err(err) => {
155 trace!("Did not receive a new RunConfig! Err {err}");
156 if listen_for_new_config {
158 thread::sleep(one_sec);
159 continue;
160 }
161 }
162 Ok(new_config) => {
163 listen_for_new_config = false;
168 println!("==> Received a new set of RunConfig\n {}!", new_config);
169
170 first_iter = true;
172 last_evt_cnt = 0;
173 evt_cnt = 0;
174 n_events = 0;
176 rc = new_config;
177 if !rc.is_active {
180 listen_for_new_config = true;
181 termination_seqeunce(&prog_ev ,
182 &prog_a ,
183 &prog_b ,
184 show_progress,
185 &bs_sender );
186 continue;
187 }
188 terminate = false;
190
191 info!("Setting buffer trip value to {}", buffer_trip);
194 let buff_trip_size = rc.rb_buff_size.unwrap_or(50);
195 if buff_trip_size == 50 {
196 warn!("Using buffer trip size of 50. This ssems to be a fallback setting");
197 }
198 buffer_trip = (buff_trip_size as usize)*EVENT_SIZE;
199 if (buffer_trip > DATABUF_TOTAL_SIZE)
200 || (buffer_trip > DATABUF_TOTAL_SIZE) {
201 error!("Tripsize of {buffer_trip} exceeds buffer sizes of A : {uio1_total_size} or B : {uio2_total_size}. The EVENT_SIZE is {EVENT_SIZE}");
202 warn!("Will set buffer_trip to {DATABUF_TOTAL_SIZE}");
203 buffer_trip = DATABUF_TOTAL_SIZE;
204 } else {
205 uio1_total_size = buffer_trip;
206 uio2_total_size = buffer_trip;
207 }
208
209 match opmode_to_cache.send(rc.tof_op_mode.clone()) {
210 Err(err) => {
211 error!("Unable to send TofOperationMode to the event cache! Err {err}");
212 }
213 Ok(_) => {
214 info!("Send TofOperationMode {} to event processing thread!", rc.tof_op_mode);
215 }
216 }
217
218 let dt_c = rc.data_type.clone();
219 match dtf_to_evproc.send(dt_c) {
220 Err(err) => {
221 error!("Unable to send dataformat to event processing thread! {err}");
222 }
223 Ok(_) => {
224 info!("Sent dataformat {} to event processing thread!", rc.data_type);
225 }
226 }
227
228 match rc.data_type {
230 DataType::VoltageCalibration |
231 DataType::TimingCalibration |
232 DataType::Noi |
233 DataType::RBTriggerPoisson |
234 DataType::RBTriggerPeriodic => {
235 latch_to_mtb = false;
236 },
237 _ => ()
238 }
239 if rc.trigger_poisson_rate > 0 {
240 latch_to_mtb = false;
241 }
244 if rc.trigger_fixed_rate>0 {
245 force_trigger = true;
246 time_between_events = Some(1.0/(rc.trigger_fixed_rate as f32));
247 warn!("Will run in forced trigger mode with a rate of {} Hz!", rc.trigger_fixed_rate);
248 debug!("Will call trigger() every {} seconds...", time_between_events.unwrap());
249 latch_to_mtb = false;
250 }
251 match disable_trigger() {
252 Err(err) => error!("Can not disable triggers! {err}"),
253 Ok(_) => info!("Triggers disabled awaiting SOFT_RESET!"),
254 }
255 info!("Attempting soft reset...");
256 match soft_reset_board() {
257 Err(err) => error!("Unable to reset board! {err}"),
258 Ok(_) => info!("SOFT_RESET succesful!"),
259 }
260 if latch_to_mtb {
264 match set_master_trigger_mode() {
265 Err(err) => error!("Can not initialize master trigger mode, Err {err}"),
266 Ok(_) => info!("Latching to MasterTrigger")
267 }
268 } else {
269 match disable_master_trigger_mode() {
270 Err(err) => error!("Can not disable master trigger mode, Err {err}"),
271 Ok(_) => info!("Master trigger mode didsabled!")
272 }
273 }
274
275 match enable_trigger() {
277 Err(err) => error!("Can not enable triggers! Err {err}"),
278 Ok(_) => info!("Triggers enabled - Run start!")
279 }
280 if rc.trigger_poisson_rate > 0 {
281 enable_poisson_self_trigger(rc.trigger_poisson_rate as f32);
282 }
283 is_running = true;
285
286 if !force_trigger {
287 thread::sleep(one_sec);
290 match get_trigger_rate() {
291 Err(err) => error!("Unable to obtain trigger rate! Err {err}"),
292 Ok(rate) => info!("Seing MTB trigger rate of {rate} Hz")
293 }
294 }
295 if show_progress {
296 if rc.nevents == 0 {
297 template_bar_n_ev = "[{elapsed_precise}] {prefix} {msg} {spinner} ";
298 } else {
299 template_bar_n_ev = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.red/grey} {pos:>7}/{len:7}";
300 }
301 sty_ev = ProgressStyle::with_template(template_bar_n_ev)
302 .unwrap();
303 multi_prog = MultiProgress::new();
304 prog_a = multi_prog
305 .add(ProgressBar::new(uio1_total_size as u64));
306 prog_b = multi_prog
307 .insert_after(&prog_a, ProgressBar::new(uio2_total_size as u64));
308 prog_ev = multi_prog
309 .insert_after(&prog_b, ProgressBar::new(rc.nevents as u64));
310 prog_a.set_message (label_a.clone());
311 prog_a.set_prefix ("\u{1F4BE}");
312 prog_a.set_style (sty_a.clone());
313 prog_a.set_position(0);
314 prog_b.set_message (label_b.clone());
315 prog_b.set_prefix ("\u{1F4BE}");
316 prog_b.set_style (sty_b.clone());
317 prog_b.set_position(0);
318 prog_ev.set_style (sty_ev.clone());
319 prog_ev.set_prefix ("\u{2728}");
320 prog_ev.set_message("EVENTS");
321 prog_ev.set_position(0);
322 info!("Preparations complete. Run start should be imminent.");
323 }
324 continue; } } if is_running {
329 if terminate {
330 is_running = false;
331 termination_seqeunce(&prog_ev ,
332 &prog_a ,
333 &prog_b ,
334 show_progress,
335 &bs_sender );
336 info!("Run stopped! The runner has processed {n_events} events!");
337 continue;
338 } if force_trigger {
343 let elapsed = timer.elapsed().as_secs_f32();
346 trace!("Forced trigger mode, {} seconds since last trigger", elapsed);
348 if elapsed > time_between_events.unwrap() {
350 timer = Instant::now();
351 match trigger() {
352 Err(err) => error!("Error when triggering! {err}"),
353 Ok(_) => trace!("Firing trigger!")
354 }
355 } else { continue;
358 }
359 }
360
361 if !force_trigger {
363 match get_event_count() {
366 Err (err) => {
367 error!("Can not obtain event count! Err {:?}", err);
368 continue;
369 }
370 Ok (cnt) => {
371 evt_cnt = cnt;
372 if first_iter {
373 last_evt_cnt = evt_cnt;
374 first_iter = false;
375 continue;
376 }
377 if evt_cnt == last_evt_cnt {
378 thread::sleep(one_milli);
379 trace!("We did not get an updated event count!");
380 continue; }
382 } } } if buffer_strategy_timeout {
390 if buffer_timer.elapsed().as_secs_f32() > buffer_trips_every {
391 buffer_timer = Instant::now();
392 match ram_buffer_handler(buffer_trip,
393 &bs_sender) {
394 Err(err) => {
395 error!("Can not deal with RAM buffers {err}");
396 continue;
397 }
398 Ok(result) => {
399 which_buff = result.0;
400 buff_size = result.1;
401 has_tripped = result.2;
402 }
403 }
404 }
405 } else {
406 match ram_buffer_handler(buffer_trip,
407 &bs_sender) {
408 Err(err) => {
409 error!("Can not deal with RAM buffers {err}");
410 continue;
411 }
412 Ok(result) => {
413 which_buff = result.0;
414 buff_size = result.1;
415 has_tripped = result.2;
416 }
417 }
418 }
419 if has_tripped { match settings.rb_buff_strategy.get(&rb_id_str).unwrap() {
424 RBBufferStrategy::NEvents(_) => (),
426 RBBufferStrategy::NSeconds(n_secs) => {
427 buffer_trip = 1;
428 buffer_strategy_timeout = true;
429 buffer_trips_every = *n_secs;
430 }
431 RBBufferStrategy::ActuallySmart => {
432 match get_trigger_rate() {
433 Err(err) => {
434 error!("Unable to obtain trigger rate! {err}");
435 buffer_trip = 50;
437 },
438 Ok(rate) => {
439 buffer_trip = rate as usize*EVENT_SIZE ;
440 trace!("Dynamic setting of buffer trip size for rate {}! Setting buffer trip size to {}",rate, buffer_trip);
441 }
442 }
443 }
444 }
445 if (buffer_trip > DATABUF_TOTAL_SIZE)
447 || (buffer_trip > DATABUF_TOTAL_SIZE) {
448 error!("Tripsize of {buffer_trip} exceeds buffer sizes of A : {uio1_total_size} or B : {uio2_total_size}. The EVENT_SIZE is {EVENT_SIZE}");
449 warn!("Will set buffer_trip to {DATABUF_TOTAL_SIZE}");
450 buffer_trip = DATABUF_TOTAL_SIZE;
451 } else {
452 uio1_total_size = buffer_trip;
453 uio2_total_size = buffer_trip;
454 }
455 if show_progress {
456 prog_a.set_length(uio1_total_size as u64);
457 prog_b.set_length(uio2_total_size as u64);
458 }
459 }
460 if force_trigger {
461 n_events += 1;
462 } else {
463 delta_events = (evt_cnt - last_evt_cnt) as u64;
464 n_events += delta_events;
465 last_evt_cnt = evt_cnt;
466 }
467 if show_progress {
468 match which_buff {
469 RamBuffer::A => {
470 prog_a.set_position(buff_size as u64);
471 prog_b.set_position(0);
472 }
473 RamBuffer::B => {
474 prog_b.set_position(buff_size as u64);
475 prog_a.set_position(0);
476 }
477 }
478 prog_ev.set_position(n_events);
479 }
480 } if rc.nseconds > 0 {
485 if met.elapsed().as_secs() > rc.nseconds as u64{
486 terminate = true;
487 }
488 }
489
490 if !rc.nevents == 0 {
492 if rc.nevents != 0 {
493 if n_events > rc.nevents as u64{
494 terminate = true;
495 }
496 }
497
498 if rc.nseconds > 0 {
499 if met.elapsed().as_secs() > rc.nseconds as u64{
500 terminate = true;
501 }
502 }
503
504 if !terminate {
506 if !force_trigger {
507 thread::sleep(100*one_milli);
508 }
509 }
510 }
511 } }
513
514
515