1#[macro_use] extern crate log;
2
3pub mod constants;
4pub mod threads;
5
6use std::fs;
7use std::path::{
8 PathBuf,
9 Path
10};
11
12use std::collections::HashMap;
13use std::os::unix::fs::symlink;
14use std::sync::{
15 Arc,
16 Mutex,
17};
18
19use std::process::{
20 Command,
21 Child,
22};
23
24use std::thread;
25use std::fs::create_dir_all;
26
27use std::time::{
28 Duration,
29 Instant,
30};
31
32use crossbeam_channel::Sender;
33
34use indicatif::{
35 ProgressBar,
36 ProgressStyle
37};
38
39use comfy_table::modifiers::{
40 UTF8_ROUND_CORNERS,
41 UTF8_SOLID_INNER_BORDERS,
42};
43
44use comfy_table::presets::UTF8_FULL;
45use comfy_table::*;
46
47use gondola_core::prelude::*;
48
49pub const LIFTOF_HOTWIRE : &str = "tcp://127.0.0.1:54321";
52
53pub fn rb_table(counters : &HashMap<u8, u64>, label_is_hz : bool) -> Table {
56 let mut unit = "";
57 if label_is_hz {
58 unit = "Hz"
59 }
60 let mut table = Table::new();
61 table
62 .load_preset(UTF8_FULL)
63 .apply_modifier(UTF8_ROUND_CORNERS)
64 .apply_modifier(UTF8_SOLID_INNER_BORDERS)
65 .set_content_arrangement(ContentArrangement::Dynamic)
66 .set_width(80)
67 .add_row(vec![
69 Cell::new(&(format!("RB01 {:.1} {}", counters[&1], unit))),
70 Cell::new(&(format!("RB02 {:.1} {}", counters[&2], unit))),
71 Cell::new(&(format!("RB03 {:.1} {}", counters[&3], unit))),
72 Cell::new(&(format!("RB04 {:.1} {}", counters[&4], unit))),
73 Cell::new(&(format!("RB05 {:.1} {}", counters[&5], unit))),
74 ])
76 .add_row(vec![
77 Cell::new(&(format!("RB06 {:.1} {}", counters[&6], unit))),
78 Cell::new(&(format!("RB07 {:.1} {}", counters[&7], unit))),
79 Cell::new(&(format!("RB08 {:.1} {}", counters[&8], unit))),
80 Cell::new(&(format!("RB09 {:.1} {}", counters[&9], unit))),
81 Cell::new(&(format!("RB10 {}", "N.A."))),
82 ])
83 .add_row(vec![
84 Cell::new(&(format!("RB11 {:.1} Hz", counters[&11]))),
85 Cell::new(&(format!("RB12 {}", "N.A."))),
86 Cell::new(&(format!("RB13 {:.1} Hz", counters[&13]))),
87 Cell::new(&(format!("RB14 {:.1} Hz", counters[&14]))),
88 Cell::new(&(format!("RB15 {:.1} Hz", counters[&15]))),
89 ])
90 .add_row(vec![
91 Cell::new(&(format!("RB16 {:.1} Hz", counters[&16]))),
92 Cell::new(&(format!("RB17 {:.1} Hz", counters[&17]))),
93 Cell::new(&(format!("RB18 {:.1} Hz", counters[&18]))),
94 Cell::new(&(format!("RB19 {:.1} Hz", counters[&19]))),
95 Cell::new(&(format!("RB20 {:.1} Hz", counters[&20]))),
96 ])
97 .add_row(vec![
98 Cell::new(&(format!("RB21 {:.1} Hz", counters[&21]))),
99 Cell::new(&(format!("RB22 {:.1} Hz", counters[&22]))),
100 Cell::new(&(format!("RB23 {:.1} Hz", counters[&23]))),
101 Cell::new(&(format!("RB24 {:.1} Hz", counters[&24]))),
102 Cell::new(&(format!("RB25 {:.1} Hz", counters[&25]))),
103 ])
104 .add_row(vec![
105 Cell::new(&(format!("RB26 {:.1} Hz", counters[&26]))),
106 Cell::new(&(format!("RB27 {:.1} Hz", counters[&27]))),
107 Cell::new(&(format!("RB28 {:.1} Hz", counters[&28]))),
108 Cell::new(&(format!("RB29 {:.1} Hz", counters[&29]))),
109 Cell::new(&(format!("RB30 {:.1} Hz", counters[&30]))),
110 ])
111 .add_row(vec![
112 Cell::new(&(format!("RB31 {:.1} Hz", counters[&31]))),
113 Cell::new(&(format!("RB32 {:.1} Hz", counters[&32]))),
114 Cell::new(&(format!("RB33 {:.1} Hz", counters[&33]))),
115 Cell::new(&(format!("RB34 {:.1} Hz", counters[&34]))),
116 Cell::new(&(format!("RB35 {:.1} Hz", counters[&35]))),
117 ])
118 .add_row(vec![
119 Cell::new(&(format!("RB36 {:.1}", counters[&36]))),
120 Cell::new(&(format!("RB37 {}", "N.A."))),
121 Cell::new(&(format!("RB38 {}", "N.A."))),
122 Cell::new(&(format!("RB39 {:.1}", counters[&39]))),
123 Cell::new(&(format!("RB40 {:.1}", counters[&40]))),
124 ])
125 .add_row(vec![
126 Cell::new(&(format!("RB41 {:.1}", counters[&41]))),
127 Cell::new(&(format!("RB43 {:.1}", counters[&42]))),
128 Cell::new(&(format!("RB42 {}", "N.A."))),
129 Cell::new(&(format!("RB44 {:.1}", counters[&44]))),
130 Cell::new(&(format!("RB45 {}", "N.A."))),
131 ])
132 .add_row(vec![
133 Cell::new(&(format!("RB46 {:.1} Hz", counters[&46]))),
134 Cell::new(&(format!("{}", "N.A."))),
135 Cell::new(&(format!("{}", "N.A."))),
136 Cell::new(&(format!("{}", "N.A."))),
137 Cell::new(&(format!("{}", "N.A."))),
138 ]);
139 table
140}
141
142pub fn init_run_start(cc_pub_addr : &str) {
144 let one_second = Duration::from_secs(1);
145 let mut cmd = TofCommand::new();
154 cmd.command_code = TofCommandCode::DataRunStart;
155 let packet = cmd.pack();
156 let mut payload = String::from("BRCT").into_bytes();
157 payload.append(&mut packet.to_bytestream());
158
159 let ctx = zmq::Context::new();
161 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
162 cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
163 println!("=> Sending run start command to RBs ..");
165 for _ in 0..10 {
166 thread::sleep(one_second);
167 print!("..");
168 }
169 match cmd_sender.send(&payload, 0) {
179 Err(err) => {
180 error!("Unable to send command! {err}");
181 },
182 Ok(_) => {
183 debug!("We sent {:?}", payload);
184 }
185 }
186 print!("done!\n");
187}
188
189pub fn end_run(cc_pub_addr : &str) {
191 let mut cmd = TofCommand::new();
197 cmd.command_code = TofCommandCode::DataRunStop;
198 let packet = cmd.pack();
199 let mut payload = String::from("BRCT").into_bytes();
200 payload.append(&mut packet.to_bytestream());
201 let ctx = zmq::Context::new();
202 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
203 cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
204 println!("=> Sending run stop command to all RBs...");
206 println!("=> Waiting for RBs to stoop data acquisition..");
207 for _ in 0..10 {
208 print!("..");
209 }
210 match cmd_sender.send(&payload, 0) {
219 Err(err) => {
220 error!("Unable to send command! {err}");
221 },
222 Ok(_) => {
223 debug!("We sent {:?}", payload);
224 }
225 }
226 print!("..done!\n");
227}
228
229pub fn get_queue(dir_path : &String) -> Vec<String> {
231 let mut entries = fs::read_dir(dir_path)
232 .expect("Directory might not exist!")
233 .map(|entry| entry.unwrap().path())
234 .collect::<Vec<PathBuf>>();
235 entries.sort_by(|a, b| {
236 let meta_a = fs::metadata(a).unwrap();
237 let meta_b = fs::metadata(b).unwrap();
238 meta_a.modified().unwrap().cmp(&meta_b.modified().unwrap())
239 });
240 entries.iter()
241 .map(|path| path.to_str().unwrap().to_string())
242 .collect()
243}
244
245pub fn move_file_with_name(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
246 let old_path = Path::new(old_path);
247 let file_name = old_path.file_name().unwrap().to_str().unwrap(); let new_path = Path::new(new_dir).join(file_name); fs::rename(old_path, new_path) }
251
252pub fn move_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
253 let old_path = Path::new(old_path);
254 let new_path = Path::new(new_dir).join("liftof-config.toml"); fs::rename(old_path, new_path) }
257
258pub fn copy_file(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
259 let old_path = Path::new(old_path);
260 let file_name = old_path.file_name().unwrap().to_str().unwrap(); let new_path = Path::new(new_dir).join(file_name); fs::copy(old_path, new_path)
263}
264
265pub fn copy_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
266 let old_path = Path::new(old_path);
267 let new_path = Path::new(new_dir).join("liftof-config.toml"); fs::copy(old_path, new_path)
269}
270
271pub fn delete_file(file_path: &str) -> Result<(), std::io::Error> {
272 let path = Path::new(file_path);
273 fs::remove_file(path) }
275
276pub fn run_cycler(staging_dir : String, dry_run : bool) -> Result<(),StagingError> {
281 let queue_dir = format!("{}/queue", staging_dir);
282 let next_dir = format!("{}/next", staging_dir);
283 let current_dir = format!("{}/current", staging_dir);
284
285 let queue = get_queue(&queue_dir);
286 let current = get_queue(¤t_dir);
287 let next = get_queue(&next_dir);
288
289 if current.len() == 0 {
290 error!("We don't have a current configuration. This is BAD!");
292 return Err(StagingError::NoCurrentConfig);
293 }
294
295 println!("= => Found {} files in run queue!", queue.len());
296 if next.len() == 0 && queue.len() == 0 {
297 println!("= => Nothing staged, will jusr repeat current run setting!");
298 if !dry_run {
299 manage_liftof_cc_service("restart");
300 }
301 thread::sleep(Duration::from_secs(20));
302 return Ok(());
303 }
304 if next.len() == 0 && queue.len() != 0 {
305 error!("Empty next directory, but we have files in the queue!");
306 match copy_file_rename_liftof(&queue[0], &next_dir) {
307 Ok(_) => (),
308 Err(err) => {
309 error!("Unable to copy {} to {}! {}", next[0], next_dir, err);
310 }
311 }
312 match move_file_rename_liftof(&queue[0], ¤t_dir) {
313 Ok(_) => (),
314 Err(err) => {
315 error!("Unable to copy {} to {}! {}", queue[0], current_dir, err);
316 }
317 }
318 }
319 if next.len() != 0 {
320 match delete_file(¤t[0]) {
321 Ok(_) => (),
322 Err(err) => {
323 error!("Unable to delete {}! {}", current[0], err);
324 }
325 }
326 match move_file_rename_liftof(&next[0], ¤t_dir) {
327 Ok(_) => (),
328 Err(err) => {
329 error!("Unable to copy {} to {}! {}", next[0], current_dir, err);
330 }
331 }
332 if queue.len() != 0 {
333 match move_file_with_name(&queue[0], &next_dir) {
334 Ok(_) => (),
335 Err(err) => {
336 error!("Unable to move {} to {}! {}", queue[0], next_dir, err);
337 }
338 }
339 }
340 println!("=> Restarting liftof-cc!");
341 if !dry_run {
342 manage_liftof_cc_service("restart");
343 }
344 thread::sleep(Duration::from_secs(20));
345 }
346 Ok(())
347}
348
349pub fn prepare_run(data_path : String,
368 config : &LiftofSettings,
369 run_id : Option<u32>,
370 create_dir : bool) -> Option<u32> {
371 let mut stream_files_path = PathBuf::from(data_path);
372 let paths = fs::read_dir(stream_files_path.clone()).unwrap();
376
377 let mut used_runids = Vec::<u32>::new();
378 for path in paths {
379 match format!("{}",path.as_ref().unwrap().path().iter().last().unwrap().to_str().unwrap()).parse::<u32>() {
381 Ok(this_run_id) => {
382 debug!("Extracted run id {}", this_run_id);
383 used_runids.push(this_run_id);
384 },
385 Err(err) => {
386 warn!("Can not get runid from {}! {}", path.unwrap().path().display(), err);
387 }
388 }
389 }
390 let mut max_run_id = 0u32;
391 match used_runids.iter().max() {
392 None => (),
393 Some(_r) => {
394 max_run_id = *_r;
395 }
396 }
397 println!("=> Found {} used run ids in {}. Largest run id is {}",used_runids.len(), stream_files_path.display(), max_run_id);
398 let new_run_id : Option<u32>;
399 if max_run_id == 0 {
400 new_run_id = run_id;
403 } else if run_id.is_some() {
404 if used_runids.contains(&run_id.unwrap()) {
405 error!("Duplicate run id ({})!", run_id.unwrap());
407 new_run_id = None;
409 } else {
410 new_run_id = run_id;
412 }
413 } else {
414 new_run_id = Some(max_run_id + 1);
415 }
416 if new_run_id.is_none() {
418 return new_run_id;
419 }
420 stream_files_path.push(new_run_id.unwrap().to_string().as_str());
421 if create_dir {
422 if let Ok(metadata) = fs::metadata(&stream_files_path) {
425 if metadata.is_dir() {
426 println!("=> Directory {} for run number {} already consists and may contain files!", stream_files_path.display(), new_run_id.unwrap());
427 }
430 } else {
431 match fs::create_dir(&stream_files_path) {
432 Ok(()) => println!("=> Created {} to save stream data", stream_files_path.display()),
433 Err(err) => panic!("Failed to create directory: {}! {}", stream_files_path.display(), err),
434 }
435 }
436 }
437 let settings_fname = format!("{}/run{}.toml",
438 stream_files_path.display(),
439 new_run_id.unwrap());
440 println!("=> Writing data to {}/{}!", stream_files_path.display(), new_run_id.unwrap());
441 println!("=> Writing settings to {}!", settings_fname);
442 config.to_toml(settings_fname);
443 return new_run_id;
444}
445
446
447pub fn manage_liftof_cc_service(mode : &str) -> TofReturnCode {
468 match Command::new("sudo")
469 .args(["systemctl", mode, "liftof"])
470 .spawn() {
471 Err(err) => {
472 error!("Unable to execute sudo systemctl {} liftof! {}", mode, err);
473 TofReturnCode::GeneralFail
474 }
475 Ok(_) => {
476 println!("=> Executed sudo systemctl {} liftof", mode);
477 TofReturnCode::Success
478 }
479 }
480}
481
482
483pub fn ssh_command_rbs(rb_list : &Vec<u8>,
499 cmd : Vec<String>) -> Result<Vec<u8>, TofError> {
500 let mut rb_handles = Vec::<thread::JoinHandle<_>>::new();
501 info!("=> Executing ssh command {:?} on {} RBs!", cmd, rb_list.len());
502 let mut children = Vec::<(u8,Child)>::new();
503 for rb in rb_list {
504 rb_handles.push(thread::spawn(||{}));
506 let rb_address = format!("tof-rb{:02}", rb);
507 let mut ssh_args = vec![rb_address];
508 let mut thisrb_cmd = cmd.clone();
509 ssh_args.append(&mut thisrb_cmd);
510 match Command::new("ssh")
511 .args(ssh_args)
513 .spawn() {
514 Err(err) => {
515 error!("Unable to spawn ssh process on RB {}! {}", rb, err);
516 }
517 Ok(child) => {
518 children.push((*rb,child));
519 }
520 }
521 }
522 let mut issues = Vec::<u8>::new();
523 for rb_child in &mut children {
524 let timeout = Duration::from_secs(10);
528 let kill_t = Instant::now();
529 loop {
530 if kill_t.elapsed() > timeout {
531 error!("SSH process for board {} timed out!", rb_child.0);
532 match rb_child.1.kill() {
534 Err(err) => {
535 error!("Unable to kill the SSH process for RB {}! {err}", rb_child.0);
536 }
537 Ok(_) => {
538 error!("Killed SSH process for for RB {}", rb_child.0);
539 }
540 }
541 issues.push(rb_child.0);
542 break
544 }
545 match rb_child.1.try_wait() {
547 Ok(None) => {
548 thread::sleep(Duration::from_secs(1));
550 continue;
551 }
552 Ok(Some(status)) => {
553 if status.success() {
554 info!("Execution of command on {} successful!", rb_child.0);
555 break;
556 } else {
557 error!("Execution of command on {} failed with exit code {:?}!", rb_child.0, status.code());
558 issues.push(rb_child.0);
559 break;
560 }
561 }
562 Err(err) => {
563 error!("Unable to wait for the SSH process! {err}");
564 break;
565 }
566 }
567 }
568 }
569 if issues.len() == 0 {
570 println!("=> Executing ssh command {:?} on {} RBs successful!", cmd, rb_list.len());
571 }
572 Ok(issues)
573}
574
575pub fn restart_liftof_rb(rb_list : &Vec<u8>) {
577 let command = vec![String::from("sudo"),
578 String::from("systemctl"),
579 String::from("restart"),
580 String::from("liftof")];
581 println!("=> Restarting liftof-rb on RBs!");
582 match ssh_command_rbs(rb_list, command) {
583 Err(err) => error!("Restarting liftof-rb on all RBs failed! {err}"),
584 Ok(_) => ()
585 }
586}
587
588pub fn verification_run(timeout : u32,
601 tp_to_sink : Sender<TofPacket>,
602 thread_control : Arc<Mutex<ThreadControl>>,
603 show_progress : bool) {
604 let mut write_state : bool = true; let mut config = LiftofSettings::new();
606 match thread_control.lock() {
607 Ok(mut tc) => {
608 write_state = tc.write_data_to_disk;
609 tc.write_data_to_disk = false;
610 tc.verification_active = true;
611 tc.thread_master_trg_active = true;
612 tc.calibration_active = false;
613 tc.thread_event_bldr_active = true;
614 config = tc.liftof_settings.clone();
615 }
616 Err(err) => {
617 error!("Can't acquire lock for ThreadControl! {err}");
618 },
619 }
620 let one_second = Duration::from_millis(1000);
621 let runtime = Instant::now();
622 let mut cmd = TofCommand::new();
631 cmd.command_code = TofCommandCode::DataRunStart;
632 let packet = cmd.pack();
633 let mut payload = String::from("BRCT").into_bytes();
634 payload.append(&mut packet.to_bytestream());
635
636 let ctx = zmq::Context::new();
638 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
639 let cc_pub_addr = config.cmd_dispatcher_settings.cc_server_address.clone();
640 cmd_sender.bind(&cc_pub_addr).expect("Unable to bind to (PUB) socket!");
641 println!("=> Give the RBs a chance to connect and wait a bit..");
643 thread::sleep(10*one_second);
644 match cmd_sender.send(&payload, 0) {
645 Err(err) => {
646 error!("Unable to send command, error{err}");
647 },
648 Ok(_) => {
649 debug!("We sent {:?}", payload);
650 }
651 }
652
653 println!("=> Verification run initialized!");
654 let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
655 let bar_style = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
656 let mut bar = ProgressBar::hidden();
657 if show_progress {
659 bar = ProgressBar::new(timeout as u64);
660 bar.set_position(0);
661 let bar_label = String::from("Performing verification run");
662 bar.set_message (bar_label);
663 bar.set_prefix ("\u{2699}\u{1F4D0}");
664 bar.set_style (bar_style);
665 }
666 loop {
667 if runtime.elapsed().as_secs() >= timeout as u64 {
668 break;
669 }
670 thread::sleep(1*one_second);
671 bar.set_position(runtime.elapsed().as_secs());
672 }
673 if show_progress {
674 bar.finish_with_message("Done");
675 }
676
677 println!("=> Ending verification run!");
678 println!("=> Sending run termination command to the RBs");
679 let mut cmd = TofCommand::new();
680 cmd.command_code = TofCommandCode::DataRunStop;
681 let packet = cmd.pack();
682 let mut payload = String::from("BRCT").into_bytes();
683 payload.append(&mut packet.to_bytestream());
684 warn!("=> No command socket available! Can not shut down RBs..!");
690 println!("=> Give the RBs a chance to connect and wait a bit..");
692 thread::sleep(10*one_second);
693 match cmd_sender.send(&payload, 0) {
694 Err(err) => {
695 error!("Unable to send command! {err}");
696 },
697 Ok(_) => {
698 debug!("We sent {:?}", payload);
699 }
700 }
701
702 let mut detector_status = TofDetectorStatus::new();
704 match thread_control.lock() {
705 Ok(mut tc) => {
706 tc.write_data_to_disk = write_state;
707 tc.verification_active = false;
708 detector_status = tc.detector_status.clone();
709 },
710 Err(err) => {
711 error!("Can't acquire lock for ThreadControl! {err}");
712 },
713 }
714 println!("=> Acquired TofDetectorStatus!");
715 println!("{}", detector_status);
716 let pack = detector_status.pack();
717 match tp_to_sink.send(pack) {
718 Err(err) => error!("Unable to send TofDetectorStatus to data sink! {err}"),
719 Ok(_) => ()
720 }
721}
722
723
724pub fn calibrate_tof(thread_control : Arc<Mutex<ThreadControl>>,
743 rb_list : &Vec<ReadoutBoard>,
744 show_progress : bool) {
745
746 let one_second = Duration::from_millis(1000);
747 let mut cc_pub_addr = String::from("");
748 let calibration_timeout_fail = Duration::from_secs(300); let mut cali_dir_created = false;
751 let mut cali_output_dir = String::from("");
752 let mut cali_base_dir = String::from("");
753
754 match thread_control.lock() {
755 Ok(mut tc) => {
756 for rb in rb_list {
757 tc.finished_calibrations.insert(rb.rb_id,false);
758 }
759 cali_base_dir = tc.liftof_settings.calibration_dir.clone();
760 cc_pub_addr = tc.liftof_settings.cmd_dispatcher_settings.cc_server_address.clone();
761 tc.write_data_to_disk = true;
762 },
763 Err(err) => {
764 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
765 },
766 }
767
768 let mut default_calib = TofCommand::new();
781 default_calib.command_code = TofCommandCode::RBCalibration;
782 let tp = default_calib.pack();
783 let mut payload = String::from("BRCT").into_bytes();
784 payload.append(&mut tp.to_bytestream());
785 let ctx = zmq::Context::new();
787 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
788
789 cmd_sender.bind(&cc_pub_addr).expect("Unable to bind to (PUB) socket!");
790 println!("=> Give the RBs a chance to connect and wait a bit..");
791 thread::sleep(10*one_second);
792 match cmd_sender.send(&payload, 0) { Err(err) => {
800 error!("Unable to send command! {err}");
801 },
802 Ok(_) => {
803 println!("=> Calibration initialized!");
804 }
805 }
806 match thread_control.lock() {
807 Ok(mut tc) => {
808 tc.thread_master_trg_active =false;
810 tc.calibration_active = true;
811 },
812 Err(err) => {
813 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
814 },
815 }
816
817 println!("=> .. now we need to wait until the calibration is finished!");
818 let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
819 let bar_style = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
820 let mut bar = ProgressBar::hidden();
821 if show_progress {
822 bar = ProgressBar::new(rb_list.len() as u64);
823 bar.set_position(0);
824 let bar_label = String::from("Acquiring RB calibration data");
825 bar.set_message (bar_label);
826 bar.set_prefix ("\u{2699}\u{1F4D0}");
827 bar.set_style (bar_style);
828 }
829
830 let timeout = Instant::now();
833 let mut cali_received = 0;
834 'main: loop {
835 thread::sleep(10*one_second);
836 if timeout.elapsed() > calibration_timeout_fail {
837 error!("Calibration timeout! Calibrations might not be complete!");
838 match thread_control.lock() {
839 Ok(mut tc) => {
840 tc.calibration_active = false;
841 }
842 Err(err) => {
843 error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
844 }
845 }
846 if show_progress {
847 bar.finish_with_message("Done");
848 }
849 break;
850 }
851 match thread_control.lock() {
853 Ok(mut tc) => {
854 for rbid in rb_list {
855 let mut finished_keys = Vec::<u8>::new();
857 if tc.stop_flag {
858 println!("Stop signal received, exiting calibration routine!");
859 break 'main;
860 }
861 if tc.finished_calibrations[&rbid.rb_id] {
862 cali_received += 1;
863 let rbcali = tc.calibrations.get(&rbid.rb_id).expect("We got the signal tat this calibration is ready but it is not!");
864 let pack = rbcali.pack();
865 let file_type = FileType::CalibrationFile(rbid.rb_id);
867 if !cali_dir_created {
870 let today = get_utc_timestamp();
871 cali_output_dir = format!("{}/{}", cali_base_dir.clone(), today);
872 match create_dir_all(cali_output_dir.clone()) {
873 Ok(_) => info!("Created {} for calibration data!", cali_output_dir),
874 Err(err) => error!("Unable to create {} for calibration data! {}", cali_output_dir, err)
875 }
876 cali_dir_created = true;
877 }
878 let mut cali_writer = TofPacketWriter::new(cali_output_dir.clone(), file_type);
879 cali_writer.add_tof_packet(&pack);
880 drop(cali_writer);
881
882 bar.set_position(cali_received);
883 finished_keys.push(rbid.rb_id);
884 }
885 for rbid in &finished_keys {
886 *tc.finished_calibrations.get_mut(&rbid).unwrap() = false;
887 }
888 }
889 if cali_received as usize == rb_list.len() {
891 tc.calibration_active = false;
896 for rbid in rb_list {
898 *tc.finished_calibrations.get_mut(&rbid.rb_id).unwrap() = false;
899 }
900 if show_progress {
901 bar.finish_with_message("Done");
902 }
903 break;
904 }
905 }
906 Err(err) => {
907 error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
908 }
909 }
910 } let cali_link_dir = cali_base_dir.clone() + "latest";
913 match fs::remove_file(cali_link_dir.clone()) {
914 Ok(_) => {
915 println!("=> Symlink {} removed!", cali_link_dir);
916 },
917 Err(err) => {
918 error!("Unable to remove symlink to latest calibrations! {err}");
919 }
920 }
921 println!("=> Will create symlink {}", cali_link_dir);
922 match symlink(cali_output_dir, cali_link_dir) {
923 Err(err) => error!("Unable to create symlink for calibration data! {err}"),
924 Ok(_) => ()
925 }
926}