1#[macro_use] extern crate log;
2
3pub mod constants;
4pub mod threads;
5
6use std::fs;
7use std::path::{
8 PathBuf,
9 Path
10};
11
12use std::collections::HashMap;
13use std::os::unix::fs::symlink;
14use std::sync::{
15 Arc,
16 Mutex,
17};
18
19use std::process::{
20 Command,
21 Child,
22};
23
24use std::thread;
25use std::fs::create_dir_all;
26
27use std::time::{
28 Duration,
29 Instant,
30};
31
32use crossbeam_channel::Sender;
33
34use indicatif::{
35 ProgressBar,
36 ProgressStyle
37};
38
39use comfy_table::modifiers::{
40 UTF8_ROUND_CORNERS,
41 UTF8_SOLID_INNER_BORDERS,
42};
43
44use comfy_table::presets::UTF8_FULL;
45use comfy_table::*;
46
47use liftof_lib::constants::{
48 DEFAULT_CALIB_VOLTAGE,
49 DEFAULT_RB_ID,
50 DEFAULT_CALIB_EXTRA,
51};
52
53use tof_dataclasses::constants::PAD_CMD_32BIT;
54use tof_dataclasses::serialization::{
55 Serialization,
56 Packable
57};
58
59use tof_dataclasses::errors::{
60 StagingError,
61 TofError
62};
63
64use tof_dataclasses::commands::{
65 TofCommand,
66 TofCommandV2,
67 TofReturnCode,
68 TofCommandCode,
69};
70
71use tof_dataclasses::status::TofDetectorStatus;
72use tof_dataclasses::packets::TofPacket;
73use tof_dataclasses::database::ReadoutBoard;
74
75use tof_dataclasses::io::{
76 TofPacketWriter,
77 FileType,
78 get_utc_timestamp
79};
80
81use liftof_lib::settings::LiftofSettings;
82use liftof_lib::thread_control::ThreadControl;
83
84pub const LIFTOF_HOTWIRE : &str = "tcp://127.0.0.1:54321";
87
88pub fn rb_table(counters : &HashMap<u8, u64>, label_is_hz : bool) -> Table {
91 let mut unit = "";
92 if label_is_hz {
93 unit = "Hz"
94 }
95 let mut table = Table::new();
96 table
97 .load_preset(UTF8_FULL)
98 .apply_modifier(UTF8_ROUND_CORNERS)
99 .apply_modifier(UTF8_SOLID_INNER_BORDERS)
100 .set_content_arrangement(ContentArrangement::Dynamic)
101 .set_width(80)
102 .add_row(vec![
104 Cell::new(&(format!("RB01 {:.1} {}", counters[&1], unit))),
105 Cell::new(&(format!("RB02 {:.1} {}", counters[&2], unit))),
106 Cell::new(&(format!("RB03 {:.1} {}", counters[&3], unit))),
107 Cell::new(&(format!("RB04 {:.1} {}", counters[&4], unit))),
108 Cell::new(&(format!("RB05 {:.1} {}", counters[&5], unit))),
109 ])
111 .add_row(vec![
112 Cell::new(&(format!("RB06 {:.1} {}", counters[&6], unit))),
113 Cell::new(&(format!("RB07 {:.1} {}", counters[&7], unit))),
114 Cell::new(&(format!("RB08 {:.1} {}", counters[&8], unit))),
115 Cell::new(&(format!("RB09 {:.1} {}", counters[&9], unit))),
116 Cell::new(&(format!("RB10 {}", "N.A."))),
117 ])
118 .add_row(vec![
119 Cell::new(&(format!("RB11 {:.1} Hz", counters[&11]))),
120 Cell::new(&(format!("RB12 {}", "N.A."))),
121 Cell::new(&(format!("RB13 {:.1} Hz", counters[&13]))),
122 Cell::new(&(format!("RB14 {:.1} Hz", counters[&14]))),
123 Cell::new(&(format!("RB15 {:.1} Hz", counters[&15]))),
124 ])
125 .add_row(vec![
126 Cell::new(&(format!("RB16 {:.1} Hz", counters[&16]))),
127 Cell::new(&(format!("RB17 {:.1} Hz", counters[&17]))),
128 Cell::new(&(format!("RB18 {:.1} Hz", counters[&18]))),
129 Cell::new(&(format!("RB19 {:.1} Hz", counters[&19]))),
130 Cell::new(&(format!("RB20 {:.1} Hz", counters[&20]))),
131 ])
132 .add_row(vec![
133 Cell::new(&(format!("RB21 {:.1} Hz", counters[&21]))),
134 Cell::new(&(format!("RB22 {:.1} Hz", counters[&22]))),
135 Cell::new(&(format!("RB23 {:.1} Hz", counters[&23]))),
136 Cell::new(&(format!("RB24 {:.1} Hz", counters[&24]))),
137 Cell::new(&(format!("RB25 {:.1} Hz", counters[&25]))),
138 ])
139 .add_row(vec![
140 Cell::new(&(format!("RB26 {:.1} Hz", counters[&26]))),
141 Cell::new(&(format!("RB27 {:.1} Hz", counters[&27]))),
142 Cell::new(&(format!("RB28 {:.1} Hz", counters[&28]))),
143 Cell::new(&(format!("RB29 {:.1} Hz", counters[&29]))),
144 Cell::new(&(format!("RB30 {:.1} Hz", counters[&30]))),
145 ])
146 .add_row(vec![
147 Cell::new(&(format!("RB31 {:.1} Hz", counters[&31]))),
148 Cell::new(&(format!("RB32 {:.1} Hz", counters[&32]))),
149 Cell::new(&(format!("RB33 {:.1} Hz", counters[&33]))),
150 Cell::new(&(format!("RB34 {:.1} Hz", counters[&34]))),
151 Cell::new(&(format!("RB35 {:.1} Hz", counters[&35]))),
152 ])
153 .add_row(vec![
154 Cell::new(&(format!("RB36 {:.1}", counters[&36]))),
155 Cell::new(&(format!("RB37 {}", "N.A."))),
156 Cell::new(&(format!("RB38 {}", "N.A."))),
157 Cell::new(&(format!("RB39 {:.1}", counters[&39]))),
158 Cell::new(&(format!("RB40 {:.1}", counters[&40]))),
159 ])
160 .add_row(vec![
161 Cell::new(&(format!("RB41 {:.1}", counters[&41]))),
162 Cell::new(&(format!("RB43 {:.1}", counters[&42]))),
163 Cell::new(&(format!("RB42 {}", "N.A."))),
164 Cell::new(&(format!("RB44 {:.1}", counters[&44]))),
165 Cell::new(&(format!("RB45 {}", "N.A."))),
166 ])
167 .add_row(vec![
168 Cell::new(&(format!("RB46 {:.1} Hz", counters[&46]))),
169 Cell::new(&(format!("{}", "N.A."))),
170 Cell::new(&(format!("{}", "N.A."))),
171 Cell::new(&(format!("{}", "N.A."))),
172 Cell::new(&(format!("{}", "N.A."))),
173 ]);
174 table
175}
176
177pub fn init_run_start(cc_pub_addr : &str) {
179 let one_second = Duration::from_secs(1);
180 let cmd_payload = PAD_CMD_32BIT | (255u32) << 16 | (255u32) << 8 | (255u32);
183 let cmd_depr = TofCommand::DataRunStart(cmd_payload);
184 let packet_depr = cmd_depr.pack();
185 let mut payload_depr = String::from("BRCT").into_bytes();
186 payload_depr.append(&mut packet_depr.to_bytestream());
187
188 let mut cmd = TofCommandV2::new();
189 cmd.command_code = TofCommandCode::DataRunStart;
190 let packet = cmd.pack();
191 let mut payload = String::from("BRCT").into_bytes();
192 payload.append(&mut packet.to_bytestream());
193
194 let ctx = zmq::Context::new();
196 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
197 cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
198 println!("=> Sending run start command to RBs ..");
200 for _ in 0..10 {
201 thread::sleep(one_second);
202 print!("..");
203 }
204 match cmd_sender.send(&payload_depr, 0) {
206 Err(err) => {
207 error!("Unable to send command! {err}");
208 },
209 Ok(_) => {
210 debug!("We sent {:?}", payload);
211 }
212 }
213 match cmd_sender.send(&payload, 0) {
214 Err(err) => {
215 error!("Unable to send command! {err}");
216 },
217 Ok(_) => {
218 debug!("We sent {:?}", payload);
219 }
220 }
221 print!("done!\n");
222}
223
224pub fn end_run(cc_pub_addr : &str) {
226 let cmd_depr = TofCommand::DataRunStop(DEFAULT_RB_ID as u32);
227 let packet_depr = cmd_depr.pack();
228 let mut payload_depr = String::from("BRCT").into_bytes();
229 payload_depr.append(&mut packet_depr.to_bytestream());
230
231 let mut cmd = TofCommandV2::new();
232 cmd.command_code = TofCommandCode::DataRunStop;
233 let packet = cmd.pack();
234 let mut payload = String::from("BRCT").into_bytes();
235 payload.append(&mut packet.to_bytestream());
236 let ctx = zmq::Context::new();
237 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
238 cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
239 println!("=> Sending run stop command to all RBs...");
241 println!("=> Waiting for RBs to stoop data acquisition..");
242 for _ in 0..10 {
243 print!("..");
244 }
245 match cmd_sender.send(&payload_depr, 0) {
246 Err(err) => {
247 error!("Unable to send command! {err}");
248 },
249 Ok(_) => {
250 debug!("We sent {:?}", payload);
251 }
252 }
253 match cmd_sender.send(&payload, 0) {
254 Err(err) => {
255 error!("Unable to send command! {err}");
256 },
257 Ok(_) => {
258 debug!("We sent {:?}", payload);
259 }
260 }
261 print!("..done!\n");
262}
263
264pub fn get_queue(dir_path : &String) -> Vec<String> {
266 let mut entries = fs::read_dir(dir_path)
267 .expect("Directory might not exist!")
268 .map(|entry| entry.unwrap().path())
269 .collect::<Vec<PathBuf>>();
270 entries.sort_by(|a, b| {
271 let meta_a = fs::metadata(a).unwrap();
272 let meta_b = fs::metadata(b).unwrap();
273 meta_a.modified().unwrap().cmp(&meta_b.modified().unwrap())
274 });
275 entries.iter()
276 .map(|path| path.to_str().unwrap().to_string())
277 .collect()
278}
279
280pub fn move_file_with_name(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
281 let old_path = Path::new(old_path);
282 let file_name = old_path.file_name().unwrap().to_str().unwrap(); let new_path = Path::new(new_dir).join(file_name); fs::rename(old_path, new_path) }
286
287pub fn move_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
288 let old_path = Path::new(old_path);
289 let new_path = Path::new(new_dir).join("liftof-config.toml"); fs::rename(old_path, new_path) }
292
293pub fn copy_file(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
294 let old_path = Path::new(old_path);
295 let file_name = old_path.file_name().unwrap().to_str().unwrap(); let new_path = Path::new(new_dir).join(file_name); fs::copy(old_path, new_path)
298}
299
300pub fn copy_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
301 let old_path = Path::new(old_path);
302 let new_path = Path::new(new_dir).join("liftof-config.toml"); fs::copy(old_path, new_path)
304}
305
306pub fn delete_file(file_path: &str) -> Result<(), std::io::Error> {
307 let path = Path::new(file_path);
308 fs::remove_file(path) }
310
311pub fn run_cycler(staging_dir : String, dry_run : bool) -> Result<(),StagingError> {
316 let queue_dir = format!("{}/queue", staging_dir);
317 let next_dir = format!("{}/next", staging_dir);
318 let current_dir = format!("{}/current", staging_dir);
319
320 let queue = get_queue(&queue_dir);
321 let current = get_queue(¤t_dir);
322 let next = get_queue(&next_dir);
323
324 if current.len() == 0 {
325 error!("We don't have a current configuration. This is BAD!");
327 return Err(StagingError::NoCurrentConfig);
328 }
329
330 println!("= => Found {} files in run queue!", queue.len());
331 if next.len() == 0 && queue.len() == 0 {
332 println!("= => Nothing staged, will jusr repeat current run setting!");
333 if !dry_run {
334 manage_liftof_cc_service("restart");
335 }
336 thread::sleep(Duration::from_secs(20));
337 return Ok(());
338 }
339 if next.len() == 0 && queue.len() != 0 {
340 error!("Empty next directory, but we have files in the queue!");
341 match copy_file_rename_liftof(&queue[0], &next_dir) {
342 Ok(_) => (),
343 Err(err) => {
344 error!("Unable to copy {} to {}! {}", next[0], next_dir, err);
345 }
346 }
347 match move_file_rename_liftof(&queue[0], ¤t_dir) {
348 Ok(_) => (),
349 Err(err) => {
350 error!("Unable to copy {} to {}! {}", queue[0], current_dir, err);
351 }
352 }
353 }
354 if next.len() != 0 {
355 match delete_file(¤t[0]) {
356 Ok(_) => (),
357 Err(err) => {
358 error!("Unable to delete {}! {}", current[0], err);
359 }
360 }
361 match move_file_rename_liftof(&next[0], ¤t_dir) {
362 Ok(_) => (),
363 Err(err) => {
364 error!("Unable to copy {} to {}! {}", next[0], current_dir, err);
365 }
366 }
367 if queue.len() != 0 {
368 match move_file_with_name(&queue[0], &next_dir) {
369 Ok(_) => (),
370 Err(err) => {
371 error!("Unable to move {} to {}! {}", queue[0], next_dir, err);
372 }
373 }
374 }
375 println!("=> Restarting liftof-cc!");
376 if !dry_run {
377 manage_liftof_cc_service("restart");
378 }
379 thread::sleep(Duration::from_secs(20));
380 }
381 Ok(())
382}
383
384pub fn prepare_run(data_path : String,
403 config : &LiftofSettings,
404 run_id : Option<u32>,
405 create_dir : bool) -> Option<u32> {
406 let mut stream_files_path = PathBuf::from(data_path);
407 let paths = fs::read_dir(stream_files_path.clone()).unwrap();
411
412 let mut used_runids = Vec::<u32>::new();
413 for path in paths {
414 match format!("{}",path.as_ref().unwrap().path().iter().last().unwrap().to_str().unwrap()).parse::<u32>() {
416 Ok(this_run_id) => {
417 debug!("Extracted run id {}", this_run_id);
418 used_runids.push(this_run_id);
419 },
420 Err(err) => {
421 warn!("Can not get runid from {}! {}", path.unwrap().path().display(), err);
422 }
423 }
424 }
425 let mut max_run_id = 0u32;
426 match used_runids.iter().max() {
427 None => (),
428 Some(_r) => {
429 max_run_id = *_r;
430 }
431 }
432 println!("=> Found {} used run ids in {}. Largest run id is {}",used_runids.len(), stream_files_path.display(), max_run_id);
433 let new_run_id : Option<u32>;
434 if max_run_id == 0 {
435 new_run_id = run_id;
438 } else if run_id.is_some() {
439 if used_runids.contains(&run_id.unwrap()) {
440 error!("Duplicate run id ({})!", run_id.unwrap());
442 new_run_id = None;
444 } else {
445 new_run_id = run_id;
447 }
448 } else {
449 new_run_id = Some(max_run_id + 1);
450 }
451 if new_run_id.is_none() {
453 return new_run_id;
454 }
455 stream_files_path.push(new_run_id.unwrap().to_string().as_str());
456 if create_dir {
457 if let Ok(metadata) = fs::metadata(&stream_files_path) {
460 if metadata.is_dir() {
461 println!("=> Directory {} for run number {} already consists and may contain files!", stream_files_path.display(), new_run_id.unwrap());
462 }
465 } else {
466 match fs::create_dir(&stream_files_path) {
467 Ok(()) => println!("=> Created {} to save stream data", stream_files_path.display()),
468 Err(err) => panic!("Failed to create directory: {}! {}", stream_files_path.display(), err),
469 }
470 }
471 }
472 let settings_fname = format!("{}/run{}.toml",
473 stream_files_path.display(),
474 new_run_id.unwrap());
475 println!("=> Writing data to {}/{}!", stream_files_path.display(), new_run_id.unwrap());
476 println!("=> Writing settings to {}!", settings_fname);
477 config.to_toml(settings_fname);
478 return new_run_id;
479}
480
481
482pub fn manage_liftof_cc_service(mode : &str) -> TofReturnCode {
503 match Command::new("sudo")
504 .args(["systemctl", mode, "liftof"])
505 .spawn() {
506 Err(err) => {
507 error!("Unable to execute sudo systemctl {} liftof! {}", mode, err);
508 TofReturnCode::GeneralFail
509 }
510 Ok(_) => {
511 println!("=> Executed sudo systemctl {} liftof", mode);
512 TofReturnCode::Success
513 }
514 }
515}
516
517
518pub fn ssh_command_rbs(rb_list : &Vec<u8>,
534 cmd : Vec<String>) -> Result<Vec<u8>, TofError> {
535 let mut rb_handles = Vec::<thread::JoinHandle<_>>::new();
536 info!("=> Executing ssh command {:?} on {} RBs!", cmd, rb_list.len());
537 let mut children = Vec::<(u8,Child)>::new();
538 for rb in rb_list {
539 rb_handles.push(thread::spawn(||{}));
541 let rb_address = format!("tof-rb{:02}", rb);
542 let mut ssh_args = vec![rb_address];
543 let mut thisrb_cmd = cmd.clone();
544 ssh_args.append(&mut thisrb_cmd);
545 match Command::new("ssh")
546 .args(ssh_args)
548 .spawn() {
549 Err(err) => {
550 error!("Unable to spawn ssh process on RB {}! {}", rb, err);
551 }
552 Ok(child) => {
553 children.push((*rb,child));
554 }
555 }
556 }
557 let mut issues = Vec::<u8>::new();
558 for rb_child in &mut children {
559 let timeout = Duration::from_secs(10);
563 let kill_t = Instant::now();
564 loop {
565 if kill_t.elapsed() > timeout {
566 error!("SSH process for board {} timed out!", rb_child.0);
567 match rb_child.1.kill() {
569 Err(err) => {
570 error!("Unable to kill the SSH process for RB {}! {err}", rb_child.0);
571 }
572 Ok(_) => {
573 error!("Killed SSH process for for RB {}", rb_child.0);
574 }
575 }
576 issues.push(rb_child.0);
577 break
579 }
580 match rb_child.1.try_wait() {
582 Ok(None) => {
583 thread::sleep(Duration::from_secs(1));
585 continue;
586 }
587 Ok(Some(status)) => {
588 if status.success() {
589 info!("Execution of command on {} successful!", rb_child.0);
590 break;
591 } else {
592 error!("Execution of command on {} failed with exit code {:?}!", rb_child.0, status.code());
593 issues.push(rb_child.0);
594 break;
595 }
596 }
597 Err(err) => {
598 error!("Unable to wait for the SSH process! {err}");
599 break;
600 }
601 }
602 }
603 }
604 if issues.len() == 0 {
605 println!("=> Executing ssh command {:?} on {} RBs successful!", cmd, rb_list.len());
606 }
607 Ok(issues)
608}
609
610pub fn restart_liftof_rb(rb_list : &Vec<u8>) {
612 let command = vec![String::from("sudo"),
613 String::from("systemctl"),
614 String::from("restart"),
615 String::from("liftof")];
616 println!("=> Restarting liftof-rb on RBs!");
617 match ssh_command_rbs(rb_list, command) {
618 Err(err) => error!("Restarting liftof-rb on all RBs failed! {err}"),
619 Ok(_) => ()
620 }
621}
622
623pub fn verification_run(timeout : u32,
631 tp_to_sink : Sender<TofPacket>,
632 thread_control : Arc<Mutex<ThreadControl>>) {
633 let mut write_state : bool = true; let mut config = LiftofSettings::new();
635 match thread_control.lock() {
636 Ok(mut tc) => {
637 write_state = tc.write_data_to_disk;
638 tc.write_data_to_disk = false;
639 tc.verification_active = true;
640 tc.thread_master_trg_active = true;
641 tc.calibration_active = false;
642 tc.thread_event_bldr_active = true;
643 config = tc.liftof_settings.clone();
644 }
645 Err(err) => {
646 error!("Can't acquire lock for ThreadControl! {err}");
647 },
648 }
649 let one_second = Duration::from_millis(1000);
650 let runtime = Instant::now();
651 let cmd_payload: u32 = PAD_CMD_32BIT | (255u32) << 16 | (255u32) << 8 | (255u32);
655 let cmd = TofCommand::DataRunStart(cmd_payload);
656 let packet = cmd.pack();
657 let mut payload = String::from("BRCT").into_bytes();
658 payload.append(&mut packet.to_bytestream());
659
660 let ctx = zmq::Context::new();
662 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
663 let cc_pub_addr = config.cmd_dispatcher_settings.cc_server_address.clone();
664 cmd_sender.bind(&cc_pub_addr).expect("Unable to bind to (PUB) socket!");
665 println!("=> Give the RBs a chance to connect and wait a bit..");
667 thread::sleep(10*one_second);
668 match cmd_sender.send(&payload, 0) {
669 Err(err) => {
670 error!("Unable to send command, error{err}");
671 },
672 Ok(_) => {
673 debug!("We sent {:?}", payload);
674 }
675 }
676
677 println!("=> Verification run initialized!");
678 loop {
680 if runtime.elapsed().as_secs() > timeout as u64 {
681 break;
682 }
683 thread::sleep(5*one_second);
684 }
685
686 println!("=> Ending verification run!");
687 println!("=> Sending run termination command to the RBs");
688 let cmd = TofCommand::DataRunStop(DEFAULT_RB_ID as u32);
689 let packet = cmd.pack();
690 let mut payload = String::from("BRCT").into_bytes();
691 payload.append(&mut packet.to_bytestream());
692
693 warn!("=> No command socket available! Can not shut down RBs..!");
694 println!("=> Give the RBs a chance to connect and wait a bit..");
696 thread::sleep(10*one_second);
697 match cmd_sender.send(&payload, 0) {
698 Err(err) => {
699 error!("Unable to send command! {err}");
700 },
701 Ok(_) => {
702 debug!("We sent {:?}", payload);
703 }
704 }
705
706 let mut detector_status = TofDetectorStatus::new();
708 match thread_control.lock() {
709 Ok(mut tc) => {
710 tc.write_data_to_disk = write_state;
711 tc.verification_active = false;
712 detector_status = tc.detector_status.clone();
713 },
714 Err(err) => {
715 error!("Can't acquire lock for ThreadControl! {err}");
716 },
717 }
718 println!("=> Acquired TofDetectorStatus!");
719 println!("{}", detector_status);
720 let pack = detector_status.pack();
721 match tp_to_sink.send(pack) {
722 Err(err) => error!("Unable to send TofDetectorStatus to data sink! {err}"),
723 Ok(_) => ()
724 }
725}
726
727
728pub fn calibrate_tof(thread_control : Arc<Mutex<ThreadControl>>,
747 rb_list : &Vec<ReadoutBoard>,
748 show_progress : bool) {
749
750 let one_second = Duration::from_millis(1000);
751 let mut cc_pub_addr = String::from("");
752 let calibration_timeout_fail = Duration::from_secs(300); let mut cali_dir_created = false;
755 let mut cali_output_dir = String::from("");
756 let mut cali_base_dir = String::from("");
757
758 match thread_control.lock() {
759 Ok(mut tc) => {
760 for rb in rb_list {
761 tc.finished_calibrations.insert(rb.rb_id,false);
762 }
763 cali_base_dir = tc.liftof_settings.calibration_dir.clone();
764 cc_pub_addr = tc.liftof_settings.cmd_dispatcher_settings.cc_server_address.clone();
765 tc.write_data_to_disk = true;
766 },
767 Err(err) => {
768 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
769 },
770 }
771
772 let voltage_level = DEFAULT_CALIB_VOLTAGE;
774 let rb_id = DEFAULT_RB_ID;
775 let extra = DEFAULT_CALIB_EXTRA;
776 println!("=> Received calibration default command! Will init calibration run of all RBs...");
777 let cmd_payload: u32
778 = (voltage_level as u32) << 16 | (rb_id as u32) << 8 | (extra as u32);
779 let default_calib_depr = TofCommand::DefaultCalibration(cmd_payload);
780 let tp_depr = default_calib_depr.pack();
781 let mut payload_depr = String::from("BRCT").into_bytes();
782 payload_depr.append(&mut tp_depr.to_bytestream());
783
784 let mut default_calib = TofCommandV2::new();
785 default_calib.command_code = TofCommandCode::RBCalibration;
786 let tp = default_calib.pack();
787 let mut payload = String::from("BRCT").into_bytes();
788 payload.append(&mut tp.to_bytestream());
789 let ctx = zmq::Context::new();
791 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
792
793 cmd_sender.bind(&cc_pub_addr).expect("Unable to bind to (PUB) socket!");
794 println!("=> Give the RBs a chance to connect and wait a bit..");
795 thread::sleep(10*one_second);
796 match cmd_sender.send(&payload_depr, 0) { Err(err) => {
797 error!("Unable to send command! {err}");
798 },
799 Ok(_) => {
800 println!("=> Calibration initialized!");
801 }
802 }
803 match cmd_sender.send(&payload, 0) { Err(err) => {
804 error!("Unable to send command! {err}");
805 },
806 Ok(_) => {
807 println!("=> Calibration initialized!");
808 }
809 }
810 match thread_control.lock() {
811 Ok(mut tc) => {
812 tc.thread_master_trg_active =false;
814 tc.calibration_active = true;
815 },
816 Err(err) => {
817 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
818 },
819 }
820
821 let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
822 let bar_style = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
823 let mut bar = ProgressBar::hidden();
824
825 println!("=> .. now we need to wait until the calibration is finished!");
826 if show_progress {
827 bar = ProgressBar::new(rb_list.len() as u64);
828 bar.set_position(0);
829 let bar_label = String::from("Acquiring RB calibration data");
830 bar.set_message (bar_label);
831 bar.set_prefix ("\u{2699}\u{1F4D0}");
832 bar.set_style (bar_style);
833 }
834
835 let timeout = Instant::now();
838 let mut cali_received = 0;
839 'main: loop {
840 thread::sleep(10*one_second);
841 if timeout.elapsed() > calibration_timeout_fail {
842 error!("Calibration timeout! Calibrations might not be complete!");
843 match thread_control.lock() {
844 Ok(mut tc) => {
845 tc.calibration_active = false;
846 }
847 Err(err) => {
848 error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
849 }
850 }
851 if show_progress {
852 bar.finish_with_message("Done");
853 }
854 break;
855 }
856 match thread_control.lock() {
858 Ok(mut tc) => {
859 for rbid in rb_list {
860 let mut finished_keys = Vec::<u8>::new();
862 if tc.stop_flag {
863 println!("Stop signal received, exiting calibration routine!");
864 break 'main;
865 }
866 if tc.finished_calibrations[&rbid.rb_id] {
867 cali_received += 1;
868 let rbcali = tc.calibrations.get(&rbid.rb_id).expect("We got the signal tat this calibration is ready but it is not!");
869 let pack = rbcali.pack();
870 let file_type = FileType::CalibrationFile(rbid.rb_id);
872 if !cali_dir_created {
875 let today = get_utc_timestamp();
876 cali_output_dir = format!("{}/{}", cali_base_dir.clone(), today);
877 match create_dir_all(cali_output_dir.clone()) {
878 Ok(_) => info!("Created {} for calibration data!", cali_output_dir),
879 Err(err) => error!("Unable to create {} for calibration data! {}", cali_output_dir, err)
880 }
881 cali_dir_created = true;
882 }
883 let mut cali_writer = TofPacketWriter::new(cali_output_dir.clone(), file_type);
884 cali_writer.add_tof_packet(&pack);
885 drop(cali_writer);
886
887 bar.set_position(cali_received);
888 finished_keys.push(rbid.rb_id);
889 }
890 for rbid in &finished_keys {
891 *tc.finished_calibrations.get_mut(&rbid).unwrap() = false;
892 }
893 }
894 if cali_received as usize == rb_list.len() {
896 tc.calibration_active = false;
901 for rbid in rb_list {
903 *tc.finished_calibrations.get_mut(&rbid.rb_id).unwrap() = false;
904 }
905 if show_progress {
906 bar.finish_with_message("Done");
907 }
908 break;
909 }
910 }
911 Err(err) => {
912 error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
913 }
914 }
915 } let cali_link_dir = cali_base_dir.clone() + "latest";
918 match fs::remove_file(cali_link_dir.clone()) {
919 Ok(_) => {
920 println!("=> Symlink {} removed!", cali_link_dir);
921 },
922 Err(err) => {
923 error!("Unable to remove symlink to latest calibrations! {err}");
924 }
925 }
926 println!("=> Will create symlink {}", cali_link_dir);
927 match symlink(cali_output_dir, cali_link_dir) {
928 Err(err) => error!("Unable to create symlink for calibration data! {err}"),
929 Ok(_) => ()
930 }
931}