1#[macro_use] extern crate log;
2
3pub mod constants;
4pub mod threads;
5
6use std::fs;
7use std::path::{
8 PathBuf,
9 Path
10};
11
12use std::collections::HashMap;
13use std::os::unix::fs::symlink;
14use std::sync::{
15 Arc,
16 Mutex,
17};
18
19use std::process::{
20 Command,
21 Child,
22};
23
24use std::thread;
25use std::fs::create_dir_all;
26
27use std::time::{
28 Duration,
29 Instant,
30};
31
32use indicatif::{
33 ProgressBar,
34 ProgressStyle
35};
36
37use comfy_table::modifiers::{
38 UTF8_ROUND_CORNERS,
39 UTF8_SOLID_INNER_BORDERS,
40};
41
42use comfy_table::presets::UTF8_FULL;
43use comfy_table::*;
44
45use gondola_core::prelude::*;
46
47pub const LIFTOF_HOTWIRE : &str = "tcp://127.0.0.1:54321";
50
51pub fn rb_table(counters : &HashMap<u8, u64>, label_is_hz : bool) -> Table {
54 let mut unit = "";
55 if label_is_hz {
56 unit = "Hz"
57 }
58 let mut table = Table::new();
59 table
60 .load_preset(UTF8_FULL)
61 .apply_modifier(UTF8_ROUND_CORNERS)
62 .apply_modifier(UTF8_SOLID_INNER_BORDERS)
63 .set_content_arrangement(ContentArrangement::Dynamic)
64 .set_width(80)
65 .add_row(vec![
67 Cell::new(&(format!("RB01 {:.1} {}", counters[&1], unit))),
68 Cell::new(&(format!("RB02 {:.1} {}", counters[&2], unit))),
69 Cell::new(&(format!("RB03 {:.1} {}", counters[&3], unit))),
70 Cell::new(&(format!("RB04 {:.1} {}", counters[&4], unit))),
71 Cell::new(&(format!("RB05 {:.1} {}", counters[&5], unit))),
72 ])
74 .add_row(vec![
75 Cell::new(&(format!("RB06 {:.1} {}", counters[&6], unit))),
76 Cell::new(&(format!("RB07 {:.1} {}", counters[&7], unit))),
77 Cell::new(&(format!("RB08 {:.1} {}", counters[&8], unit))),
78 Cell::new(&(format!("RB09 {:.1} {}", counters[&9], unit))),
79 Cell::new(&(format!("RB10 {}", "N.A."))),
80 ])
81 .add_row(vec![
82 Cell::new(&(format!("RB11 {:.1} Hz", counters[&11]))),
83 Cell::new(&(format!("RB12 {}", "N.A."))),
84 Cell::new(&(format!("RB13 {:.1} Hz", counters[&13]))),
85 Cell::new(&(format!("RB14 {:.1} Hz", counters[&14]))),
86 Cell::new(&(format!("RB15 {:.1} Hz", counters[&15]))),
87 ])
88 .add_row(vec![
89 Cell::new(&(format!("RB16 {:.1} Hz", counters[&16]))),
90 Cell::new(&(format!("RB17 {:.1} Hz", counters[&17]))),
91 Cell::new(&(format!("RB18 {:.1} Hz", counters[&18]))),
92 Cell::new(&(format!("RB19 {:.1} Hz", counters[&19]))),
93 Cell::new(&(format!("RB20 {:.1} Hz", counters[&20]))),
94 ])
95 .add_row(vec![
96 Cell::new(&(format!("RB21 {:.1} Hz", counters[&21]))),
97 Cell::new(&(format!("RB22 {:.1} Hz", counters[&22]))),
98 Cell::new(&(format!("RB23 {:.1} Hz", counters[&23]))),
99 Cell::new(&(format!("RB24 {:.1} Hz", counters[&24]))),
100 Cell::new(&(format!("RB25 {:.1} Hz", counters[&25]))),
101 ])
102 .add_row(vec![
103 Cell::new(&(format!("RB26 {:.1} Hz", counters[&26]))),
104 Cell::new(&(format!("RB27 {:.1} Hz", counters[&27]))),
105 Cell::new(&(format!("RB28 {:.1} Hz", counters[&28]))),
106 Cell::new(&(format!("RB29 {:.1} Hz", counters[&29]))),
107 Cell::new(&(format!("RB30 {:.1} Hz", counters[&30]))),
108 ])
109 .add_row(vec![
110 Cell::new(&(format!("RB31 {:.1} Hz", counters[&31]))),
111 Cell::new(&(format!("RB32 {:.1} Hz", counters[&32]))),
112 Cell::new(&(format!("RB33 {:.1} Hz", counters[&33]))),
113 Cell::new(&(format!("RB34 {:.1} Hz", counters[&34]))),
114 Cell::new(&(format!("RB35 {:.1} Hz", counters[&35]))),
115 ])
116 .add_row(vec![
117 Cell::new(&(format!("RB36 {:.1}", counters[&36]))),
118 Cell::new(&(format!("RB37 {}", "N.A."))),
119 Cell::new(&(format!("RB38 {}", "N.A."))),
120 Cell::new(&(format!("RB39 {:.1}", counters[&39]))),
121 Cell::new(&(format!("RB40 {:.1}", counters[&40]))),
122 ])
123 .add_row(vec![
124 Cell::new(&(format!("RB41 {:.1}", counters[&41]))),
125 Cell::new(&(format!("RB43 {:.1}", counters[&42]))),
126 Cell::new(&(format!("RB42 {}", "N.A."))),
127 Cell::new(&(format!("RB44 {:.1}", counters[&44]))),
128 Cell::new(&(format!("RB45 {}", "N.A."))),
129 ])
130 .add_row(vec![
131 Cell::new(&(format!("RB46 {:.1} Hz", counters[&46]))),
132 Cell::new(&(format!("{}", "N.A."))),
133 Cell::new(&(format!("{}", "N.A."))),
134 Cell::new(&(format!("{}", "N.A."))),
135 Cell::new(&(format!("{}", "N.A."))),
136 ]);
137 table
138}
139
140pub fn init_run_start(cc_pub_addr : &str) {
142 let one_second = Duration::from_secs(1);
143 let mut cmd = TofCommand::new();
152 cmd.command_code = TofCommandCode::DataRunStart;
153 let packet = cmd.pack();
154 let mut payload = String::from("BRCT").into_bytes();
155 payload.append(&mut packet.to_bytestream());
156
157 let ctx = zmq::Context::new();
159 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
160 cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
161 println!("=> Sending run start command to RBs ..");
163 for _ in 0..10 {
164 thread::sleep(one_second);
165 print!("..");
166 }
167 match cmd_sender.send(&payload, 0) {
177 Err(err) => {
178 error!("Unable to send command! {err}");
179 },
180 Ok(_) => {
181 debug!("We sent {:?}", payload);
182 }
183 }
184 print!("done!\n");
185}
186
187pub fn end_run(cc_pub_addr : &str) {
189 let mut cmd = TofCommand::new();
195 cmd.command_code = TofCommandCode::DataRunStop;
196 let packet = cmd.pack();
197 let mut payload = String::from("BRCT").into_bytes();
198 payload.append(&mut packet.to_bytestream());
199 let ctx = zmq::Context::new();
200 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
201 cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
202 println!("=> Sending run stop command to all RBs...");
204 println!("=> Waiting for RBs to stop data acquisition..");
205 for _ in 0..10 {
206 print!("..");
207 }
208 match cmd_sender.send(&payload, 0) {
217 Err(err) => {
218 error!("Unable to send command! {err}");
219 },
220 Ok(_) => {
221 debug!("We sent {:?}", payload);
222 }
223 }
224 print!("..done!\n");
225}
226
227pub fn get_queue(dir_path : &String) -> Vec<String> {
229 let mut entries = fs::read_dir(dir_path)
230 .expect("Directory might not exist!")
231 .map(|entry| entry.unwrap().path())
232 .collect::<Vec<PathBuf>>();
233 entries.sort_by(|a, b| {
234 let meta_a = fs::metadata(a).unwrap();
235 let meta_b = fs::metadata(b).unwrap();
236 meta_a.modified().unwrap().cmp(&meta_b.modified().unwrap())
237 });
238 entries.iter()
239 .map(|path| path.to_str().unwrap().to_string())
240 .collect()
241}
242
243pub fn move_file_with_name(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
244 let old_path = Path::new(old_path);
245 let file_name = old_path.file_name().unwrap().to_str().unwrap(); let new_path = Path::new(new_dir).join(file_name); fs::rename(old_path, new_path) }
249
250pub fn move_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
251 let old_path = Path::new(old_path);
252 let new_path = Path::new(new_dir).join("liftof-config.toml"); fs::rename(old_path, new_path) }
255
256pub fn copy_file(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
257 let old_path = Path::new(old_path);
258 let file_name = old_path.file_name().unwrap().to_str().unwrap(); let new_path = Path::new(new_dir).join(file_name); fs::copy(old_path, new_path)
261}
262
263pub fn copy_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
264 let old_path = Path::new(old_path);
265 let new_path = Path::new(new_dir).join("liftof-config.toml"); fs::copy(old_path, new_path)
267}
268
269pub fn delete_file(file_path: &str) -> Result<(), std::io::Error> {
270 let path = Path::new(file_path);
271 fs::remove_file(path) }
273
274pub fn run_cycler(staging_dir : String, dry_run : bool) -> Result<(),StagingError> {
279 let queue_dir = format!("{}/queue", staging_dir);
280 let next_dir = format!("{}/next", staging_dir);
281 let current_dir = format!("{}/current", staging_dir);
282
283 let queue = get_queue(&queue_dir);
284 let current = get_queue(¤t_dir);
285 let next = get_queue(&next_dir);
286
287 if current.len() == 0 {
288 error!("We don't have a current configuration. This is BAD!");
290 return Err(StagingError::NoCurrentConfig);
291 }
292
293 println!("= => Found {} files in run queue!", queue.len());
294 if next.len() == 0 && queue.len() == 0 {
295 println!("= => Nothing staged, will jusr repeat current run setting!");
296 if !dry_run {
297 manage_liftof_cc_service("restart");
298 }
299 thread::sleep(Duration::from_secs(20));
300 return Ok(());
301 }
302 if next.len() == 0 && queue.len() != 0 {
303 error!("Empty next directory, but we have files in the queue!");
304 match copy_file_rename_liftof(&queue[0], &next_dir) {
305 Ok(_) => (),
306 Err(err) => {
307 error!("Unable to copy {} to {}! {}", next[0], next_dir, err);
308 }
309 }
310 match move_file_rename_liftof(&queue[0], ¤t_dir) {
311 Ok(_) => (),
312 Err(err) => {
313 error!("Unable to copy {} to {}! {}", queue[0], current_dir, err);
314 }
315 }
316 }
317 if next.len() != 0 {
318 match delete_file(¤t[0]) {
319 Ok(_) => (),
320 Err(err) => {
321 error!("Unable to delete {}! {}", current[0], err);
322 }
323 }
324 match move_file_rename_liftof(&next[0], ¤t_dir) {
325 Ok(_) => (),
326 Err(err) => {
327 error!("Unable to copy {} to {}! {}", next[0], current_dir, err);
328 }
329 }
330 if queue.len() != 0 {
331 match move_file_with_name(&queue[0], &next_dir) {
332 Ok(_) => (),
333 Err(err) => {
334 error!("Unable to move {} to {}! {}", queue[0], next_dir, err);
335 }
336 }
337 }
338 println!("=> Restarting liftof-cc!");
339 if !dry_run {
340 manage_liftof_cc_service("restart");
341 }
342 thread::sleep(Duration::from_secs(20));
343 }
344 Ok(())
345}
346
347pub fn prepare_run(data_path : String,
366 config : &LiftofSettings,
367 run_id : Option<u32>,
368 create_dir : bool) -> Option<u32> {
369 let mut stream_files_path = PathBuf::from(data_path);
370 let paths = fs::read_dir(stream_files_path.clone()).unwrap();
374
375 let mut used_runids = Vec::<u32>::new();
376 for path in paths {
377 match format!("{}",path.as_ref().unwrap().path().iter().last().unwrap().to_str().unwrap()).parse::<u32>() {
379 Ok(this_run_id) => {
380 debug!("Extracted run id {}", this_run_id);
381 used_runids.push(this_run_id);
382 },
383 Err(err) => {
384 warn!("Can not get runid from {}! {}", path.unwrap().path().display(), err);
385 }
386 }
387 }
388 let mut max_run_id = 0u32;
389 match used_runids.iter().max() {
390 None => (),
391 Some(_r) => {
392 max_run_id = *_r;
393 }
394 }
395 println!("=> Found {} used run ids in {}. Largest run id is {}",used_runids.len(), stream_files_path.display(), max_run_id);
396 let new_run_id : Option<u32>;
397 if max_run_id == 0 {
398 new_run_id = run_id;
401 } else if run_id.is_some() {
402 if used_runids.contains(&run_id.unwrap()) {
403 error!("Duplicate run id ({})!", run_id.unwrap());
405 new_run_id = None;
407 } else {
408 new_run_id = run_id;
410 }
411 } else {
412 new_run_id = Some(max_run_id + 1);
413 }
414 if new_run_id.is_none() {
416 return new_run_id;
417 }
418 stream_files_path.push(new_run_id.unwrap().to_string().as_str());
419 if create_dir {
420 if let Ok(metadata) = fs::metadata(&stream_files_path) {
423 if metadata.is_dir() {
424 println!("=> Directory {} for run number {} already consists and may contain files!", stream_files_path.display(), new_run_id.unwrap());
425 }
428 } else {
429 match fs::create_dir(&stream_files_path) {
430 Ok(()) => println!("=> Created {} to save stream data", stream_files_path.display()),
431 Err(err) => panic!("Failed to create directory: {}! {}", stream_files_path.display(), err),
432 }
433 }
434 }
435 let settings_fname = format!("{}/run{}.toml",
436 stream_files_path.display(),
437 new_run_id.unwrap());
438 println!("=> Writing data to {}/{}!", stream_files_path.display(), new_run_id.unwrap());
439 println!("=> Writing settings to {}!", settings_fname);
440 config.to_toml(settings_fname);
441 return new_run_id;
442}
443
444
445pub fn manage_liftof_cc_service(mode : &str) -> TofReturnCode {
466 match Command::new("sudo")
467 .args(["systemctl", mode, "liftof"])
468 .spawn() {
469 Err(err) => {
470 error!("Unable to execute sudo systemctl {} liftof! {}", mode, err);
471 TofReturnCode::GeneralFail
472 }
473 Ok(_) => {
474 println!("=> Executed sudo systemctl {} liftof", mode);
475 TofReturnCode::Success
476 }
477 }
478}
479
480
481pub fn ssh_command_rbs(rb_list : &Vec<u8>,
497 cmd : Vec<String>) -> Result<Vec<u8>, TofError> {
498 let mut rb_handles = Vec::<thread::JoinHandle<_>>::new();
499 info!("=> Executing ssh command {:?} on {} RBs!", cmd, rb_list.len());
500 let mut children = Vec::<(u8,Child)>::new();
501 for rb in rb_list {
502 rb_handles.push(thread::spawn(||{}));
504 let rb_address = format!("tof-rb{:02}", rb);
505 let mut ssh_args = vec![rb_address];
506 let mut thisrb_cmd = cmd.clone();
507 ssh_args.append(&mut thisrb_cmd);
508 match Command::new("ssh")
509 .args(ssh_args)
511 .spawn() {
512 Err(err) => {
513 error!("Unable to spawn ssh process on RB {}! {}", rb, err);
514 }
515 Ok(child) => {
516 children.push((*rb,child));
517 }
518 }
519 }
520 let mut issues = Vec::<u8>::new();
521 for rb_child in &mut children {
522 let timeout = Duration::from_secs(10);
526 let kill_t = Instant::now();
527 loop {
528 if kill_t.elapsed() > timeout {
529 error!("SSH process for board {} timed out!", rb_child.0);
530 match rb_child.1.kill() {
532 Err(err) => {
533 error!("Unable to kill the SSH process for RB {}! {err}", rb_child.0);
534 }
535 Ok(_) => {
536 error!("Killed SSH process for for RB {}", rb_child.0);
537 }
538 }
539 issues.push(rb_child.0);
540 break
542 }
543 match rb_child.1.try_wait() {
545 Ok(None) => {
546 thread::sleep(Duration::from_secs(1));
548 continue;
549 }
550 Ok(Some(status)) => {
551 if status.success() {
552 info!("Execution of command on {} successful!", rb_child.0);
553 break;
554 } else {
555 error!("Execution of command on {} failed with exit code {:?}!", rb_child.0, status.code());
556 issues.push(rb_child.0);
557 break;
558 }
559 }
560 Err(err) => {
561 error!("Unable to wait for the SSH process! {err}");
562 break;
563 }
564 }
565 }
566 }
567 if issues.len() == 0 {
568 println!("=> Executing ssh command {:?} on {} RBs successful!", cmd, rb_list.len());
569 }
570 Ok(issues)
571}
572
573pub fn restart_liftof_rb(rb_list : &Vec<u8>) {
575 let command = vec![String::from("sudo"),
576 String::from("systemctl"),
577 String::from("restart"),
578 String::from("liftof")];
579 println!("=> Restarting liftof-rb on RBs!");
580 match ssh_command_rbs(rb_list, command) {
581 Err(err) => error!("Restarting liftof-rb on all RBs failed! {err}"),
582 Ok(_) => ()
583 }
584}
585
586
587
588pub fn calibrate_tof(thread_control : Arc<Mutex<ThreadControl>>,
607 rb_list : &Vec<ReadoutBoard>,
608 show_progress : bool) {
609
610 let one_second = Duration::from_millis(1000);
611 let mut cc_pub_addr = String::from("");
612 let calibration_timeout_fail = Duration::from_secs(300); let mut cali_dir_created = false;
615 let mut cali_output_dir = String::from("");
616 let mut cali_base_dir = String::from("");
617
618 match thread_control.lock() {
619 Ok(mut tc) => {
620 for rb in rb_list {
621 tc.finished_calibrations.insert(rb.rb_id,false);
622 }
623 cali_base_dir = tc.liftof_settings.calibration_dir.clone();
624 cc_pub_addr = tc.liftof_settings.cmd_dispatcher_settings.cc_server_address.clone();
625 tc.write_data_to_disk = true;
626 },
627 Err(err) => {
628 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
629 },
630 }
631
632 let mut default_calib = TofCommand::new();
645 default_calib.command_code = TofCommandCode::RBCalibration;
646 let tp = default_calib.pack();
647 let mut payload = String::from("BRCT").into_bytes();
648 payload.append(&mut tp.to_bytestream());
649 let ctx = zmq::Context::new();
651 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
652
653 cmd_sender.bind(&cc_pub_addr).expect("Unable to bind to (PUB) socket!");
654 println!("=> Give the RBs a chance to connect and wait a bit..");
655 thread::sleep(10*one_second);
656 match cmd_sender.send(&payload, 0) { Err(err) => {
664 error!("Unable to send command! {err}");
665 },
666 Ok(_) => {
667 println!("=> Calibration initialized!");
668 }
669 }
670 match thread_control.lock() {
671 Ok(mut tc) => {
672 tc.thread_master_trg_active =false;
674 tc.calibration_active = true;
675 },
676 Err(err) => {
677 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
678 },
679 }
680
681 println!("=> .. now we need to wait until the calibration is finished!");
682 let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
683 let bar_style = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
684 let mut bar = ProgressBar::hidden();
685 if show_progress {
686 bar = ProgressBar::new(rb_list.len() as u64);
687 bar.set_position(0);
688 let bar_label = String::from("Acquiring RB calibration data");
689 bar.set_message (bar_label);
690 bar.set_prefix ("\u{2699}\u{1F4D0}");
691 bar.set_style (bar_style);
692 }
693
694 let timeout = Instant::now();
697 let mut cali_received = 0;
698 'main: loop {
699 thread::sleep(10*one_second);
700 if timeout.elapsed() > calibration_timeout_fail {
701 error!("Calibration timeout! Calibrations might not be complete!");
702 match thread_control.lock() {
703 Ok(mut tc) => {
704 tc.calibration_active = false;
705 }
706 Err(err) => {
707 error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
708 }
709 }
710 if show_progress {
711 bar.finish_with_message("Done");
712 }
713 break;
714 }
715 match thread_control.lock() {
717 Ok(mut tc) => {
718 for rbid in rb_list {
719 let mut finished_keys = Vec::<u8>::new();
721 if tc.stop_flag {
722 println!("Stop signal received, exiting calibration routine!");
723 break 'main;
724 }
725 if tc.finished_calibrations[&rbid.rb_id] {
726 cali_received += 1;
727 let rbcali = tc.calibrations.get(&rbid.rb_id).expect("We got the signal tat this calibration is ready but it is not!");
728 let pack = rbcali.pack();
729 let file_type = FileType::CalibrationFile(rbid.rb_id);
731 if !cali_dir_created {
734 let today = get_utc_timestamp();
735 cali_output_dir = format!("{}/{}", cali_base_dir.clone(), today);
736 match create_dir_all(cali_output_dir.clone()) {
737 Ok(_) => info!("Created {} for calibration data!", cali_output_dir),
738 Err(err) => error!("Unable to create {} for calibration data! {}", cali_output_dir, err)
739 }
740 cali_dir_created = true;
741 }
742 let mut cali_writer = TofPacketWriter::new(cali_output_dir.clone(), file_type);
743 cali_writer.add_tof_packet(&pack);
744 drop(cali_writer);
745
746 bar.set_position(cali_received);
747 finished_keys.push(rbid.rb_id);
748 }
749 for rbid in &finished_keys {
750 *tc.finished_calibrations.get_mut(&rbid).unwrap() = false;
751 }
752 }
753 if cali_received as usize == rb_list.len() {
755 tc.calibration_active = false;
760 for rbid in rb_list {
762 *tc.finished_calibrations.get_mut(&rbid.rb_id).unwrap() = false;
763 }
764 if show_progress {
765 bar.finish_with_message("Done");
766 }
767 break;
768 }
769 }
770 Err(err) => {
771 error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
772 }
773 }
774 } let cali_link_dir = cali_base_dir.clone() + "latest";
777 match fs::remove_file(cali_link_dir.clone()) {
778 Ok(_) => {
779 println!("=> Symlink {} removed!", cali_link_dir);
780 },
781 Err(err) => {
782 error!("Unable to remove symlink to latest calibrations! {err}");
783 }
784 }
785 println!("=> Will create symlink {}", cali_link_dir);
786 match symlink(cali_output_dir, cali_link_dir) {
787 Err(err) => error!("Unable to create symlink for calibration data! {err}"),
788 Ok(_) => ()
789 }
790}