liftof_cc/
lib.rs

1#[macro_use] extern crate log;
2
3pub mod constants;
4pub mod threads;
5
6use std::fs;
7use std::path::{
8  PathBuf,
9  Path
10};
11
12use std::collections::HashMap;
13use std::os::unix::fs::symlink;
14use std::sync::{
15  Arc,
16  Mutex,
17};
18
19use std::process::{
20  Command,
21  Child,
22};
23
24use std::thread;
25use std::fs::create_dir_all;
26
27use std::time::{
28  Duration,
29  Instant,
30};
31
32use indicatif::{
33  ProgressBar,
34  ProgressStyle
35};
36
37use comfy_table::modifiers::{
38  UTF8_ROUND_CORNERS,
39  UTF8_SOLID_INNER_BORDERS,
40};
41
42use comfy_table::presets::UTF8_FULL;
43use comfy_table::*;
44
45use gondola_core::prelude::*;
46
47/// communicaton between liftof-scheduler and 
48/// liftof-cc
49pub const LIFTOF_HOTWIRE : &str = "tcp://127.0.0.1:54321";
50
51/// Produce a nicely formattable table with per RB information for scalar
52/// values
53pub fn rb_table(counters : &HashMap<u8, u64>, label_is_hz : bool) -> Table {
54  let mut unit = "";
55  if label_is_hz {
56    unit = "Hz"
57  }
58  let mut table = Table::new();
59  table
60    .load_preset(UTF8_FULL)
61    .apply_modifier(UTF8_ROUND_CORNERS)
62    .apply_modifier(UTF8_SOLID_INNER_BORDERS)
63    .set_content_arrangement(ContentArrangement::Dynamic)
64    .set_width(80)
65    //.set_header(vec!["Readoutboard Rates:"])
66    .add_row(vec![
67        Cell::new(&(format!("RB01 {:.1} {}", counters[&1], unit))),
68        Cell::new(&(format!("RB02 {:.1} {}", counters[&2], unit))),
69        Cell::new(&(format!("RB03 {:.1} {}", counters[&3], unit))),
70        Cell::new(&(format!("RB04 {:.1} {}", counters[&4], unit))),
71        Cell::new(&(format!("RB05 {:.1} {}", counters[&5], unit))),
72        //Cell::new("Center aligned").set_alignment(CellAlignment::Center),
73    ])
74    .add_row(vec![
75        Cell::new(&(format!("RB06 {:.1} {}", counters[&6], unit))),
76        Cell::new(&(format!("RB07 {:.1} {}", counters[&7], unit))),
77        Cell::new(&(format!("RB08 {:.1} {}", counters[&8], unit))),
78        Cell::new(&(format!("RB09 {:.1} {}", counters[&9], unit))),
79        Cell::new(&(format!("RB10 {}", "N.A."))),
80    ])
81    .add_row(vec![
82        Cell::new(&(format!("RB11 {:.1} Hz", counters[&11]))),
83        Cell::new(&(format!("RB12 {}", "N.A."))),
84        Cell::new(&(format!("RB13 {:.1} Hz", counters[&13]))),
85        Cell::new(&(format!("RB14 {:.1} Hz", counters[&14]))),
86        Cell::new(&(format!("RB15 {:.1} Hz", counters[&15]))),
87    ])
88    .add_row(vec![
89        Cell::new(&(format!("RB16 {:.1} Hz", counters[&16]))),
90        Cell::new(&(format!("RB17 {:.1} Hz", counters[&17]))),
91        Cell::new(&(format!("RB18 {:.1} Hz", counters[&18]))),
92        Cell::new(&(format!("RB19 {:.1} Hz", counters[&19]))),
93        Cell::new(&(format!("RB20 {:.1} Hz", counters[&20]))),
94    ])
95    .add_row(vec![
96        Cell::new(&(format!("RB21 {:.1} Hz", counters[&21]))),
97        Cell::new(&(format!("RB22 {:.1} Hz", counters[&22]))),
98        Cell::new(&(format!("RB23 {:.1} Hz", counters[&23]))),
99        Cell::new(&(format!("RB24 {:.1} Hz", counters[&24]))),
100        Cell::new(&(format!("RB25 {:.1} Hz", counters[&25]))),
101    ])
102    .add_row(vec![
103        Cell::new(&(format!("RB26 {:.1} Hz", counters[&26]))),
104        Cell::new(&(format!("RB27 {:.1} Hz", counters[&27]))),
105        Cell::new(&(format!("RB28 {:.1} Hz", counters[&28]))),
106        Cell::new(&(format!("RB29 {:.1} Hz", counters[&29]))),
107        Cell::new(&(format!("RB30 {:.1} Hz", counters[&30]))),
108    ])
109    .add_row(vec![
110        Cell::new(&(format!("RB31 {:.1} Hz", counters[&31]))),
111        Cell::new(&(format!("RB32 {:.1} Hz", counters[&32]))),
112        Cell::new(&(format!("RB33 {:.1} Hz", counters[&33]))),
113        Cell::new(&(format!("RB34 {:.1} Hz", counters[&34]))),
114        Cell::new(&(format!("RB35 {:.1} Hz", counters[&35]))),
115    ])
116    .add_row(vec![
117        Cell::new(&(format!("RB36 {:.1}", counters[&36]))),
118        Cell::new(&(format!("RB37 {}", "N.A."))),
119        Cell::new(&(format!("RB38 {}", "N.A."))),
120        Cell::new(&(format!("RB39 {:.1}", counters[&39]))),
121        Cell::new(&(format!("RB40 {:.1}", counters[&40]))),
122    ])
123    .add_row(vec![
124        Cell::new(&(format!("RB41 {:.1}", counters[&41]))),
125        Cell::new(&(format!("RB43 {:.1}", counters[&42]))),
126        Cell::new(&(format!("RB42 {}", "N.A."))),
127        Cell::new(&(format!("RB44 {:.1}", counters[&44]))),
128        Cell::new(&(format!("RB45 {}", "N.A."))),
129    ])
130    .add_row(vec![
131        Cell::new(&(format!("RB46 {:.1} Hz", counters[&46]))),
132        Cell::new(&(format!("{}", "N.A."))),
133        Cell::new(&(format!("{}", "N.A."))),
134        Cell::new(&(format!("{}", "N.A."))),
135        Cell::new(&(format!("{}", "N.A."))),
136    ]);
137  table
138}
139
140/// Regular run start sequence
141pub fn init_run_start(cc_pub_addr : &str) {
142  let one_second   = Duration::from_secs(1);
143  // deprecated way of sending commands, however as long as we might have RBS
144  // with old software, we do want to send the "old style" as well
145  //let cmd_payload  = PAD_CMD_32BIT | (255u32) << 16 | (255u32) << 8 | (255u32);
146  //let cmd_depr     = TofCommand::DataRunStart(cmd_payload);
147  //let packet_depr  = cmd_depr.pack();
148  //let mut payload_depr = String::from("BRCT").into_bytes();
149  //payload_depr.append(&mut packet_depr.to_bytestream());
150  
151  let mut cmd      = TofCommand::new();
152  cmd.command_code = TofCommandCode::DataRunStart;
153  let packet       = cmd.pack();
154  let mut payload  = String::from("BRCT").into_bytes();
155  payload.append(&mut packet.to_bytestream());
156  
157  // open 0MQ socket here
158  let ctx         = zmq::Context::new();
159  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
160  cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
161  // after we opened the socket, give the RBs a chance to connect
162  println!("=> Sending run start command to RBs ..");
163  for _ in 0..10 {
164    thread::sleep(one_second);
165    print!("..");
166  }
167  //// send old and new commands
168  //match cmd_sender.send(&payload_depr, 0) {
169  //  Err(err) => {
170  //    error!("Unable to send command! {err}");
171  //  },
172  //  Ok(_) => {
173  //    debug!("We sent {:?}", payload);
174  //  }
175  //}
176  match cmd_sender.send(&payload, 0) {
177    Err(err) => {
178      error!("Unable to send command! {err}");
179    },
180    Ok(_) => {
181      debug!("We sent {:?}", payload);
182    }
183  }
184  print!("done!\n");
185}
186
187/// Regular run stop sequence
188pub fn end_run(cc_pub_addr : &str) {
189  //let cmd_depr     = TofCommand::DataRunStop(DEFAULT_RB_ID as u32);
190  //let packet_depr  = cmd_depr.pack();
191  //let mut payload_depr = String::from("BRCT").into_bytes();
192  //payload_depr.append(&mut packet_depr.to_bytestream());
193
194  let mut cmd      = TofCommand::new();
195  cmd.command_code = TofCommandCode::DataRunStop;
196  let packet       = cmd.pack();
197  let mut payload  = String::from("BRCT").into_bytes();
198  payload.append(&mut packet.to_bytestream());
199  let ctx         = zmq::Context::new();
200  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
201  cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
202  // after we opened the socket, give the RBs a chance to connect
203  println!("=> Sending run stop command to all RBs...");
204  println!("=> Waiting for RBs to stop data acquisition..");
205  for _ in 0..10 {
206    print!("..");
207  }
208  //match cmd_sender.send(&payload_depr, 0) {
209  //  Err(err) => {
210  //    error!("Unable to send command! {err}");
211  //  },
212  //  Ok(_) => {
213  //    debug!("We sent {:?}", payload);
214  //  }
215  //}
216  match cmd_sender.send(&payload, 0) {
217    Err(err) => {
218      error!("Unable to send command! {err}");
219    },
220    Ok(_) => {
221      debug!("We sent {:?}", payload);
222    }
223  }
224  print!("..done!\n");
225}
226
227/// Get the files in the queue and sort them by number
228pub fn get_queue(dir_path : &String) -> Vec<String> {
229  let mut entries = fs::read_dir(dir_path)
230    .expect("Directory might not exist!")
231    .map(|entry| entry.unwrap().path())
232    .collect::<Vec<PathBuf>>();
233  entries.sort_by(|a, b| {
234    let meta_a = fs::metadata(a).unwrap();
235    let meta_b = fs::metadata(b).unwrap();
236    meta_a.modified().unwrap().cmp(&meta_b.modified().unwrap())
237  });
238  entries.iter()
239    .map(|path| path.to_str().unwrap().to_string())
240    .collect()
241}
242
243pub fn move_file_with_name(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
244  let old_path  = Path::new(old_path);
245  let file_name = old_path.file_name().unwrap().to_str().unwrap(); // Extract filename
246  let new_path  = Path::new(new_dir).join(file_name); // Combine new directory with filename
247  fs::rename(old_path, new_path) // Move the file
248}
249
250pub fn move_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
251  let old_path  = Path::new(old_path);
252  let new_path  = Path::new(new_dir).join("liftof-config.toml"); // Combine new directory with filename
253  fs::rename(old_path, new_path) // Move the file
254}
255
256pub fn copy_file(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
257  let old_path  = Path::new(old_path);
258  let file_name = old_path.file_name().unwrap().to_str().unwrap(); // Extract filename
259  let new_path  = Path::new(new_dir).join(file_name); // Combine new directory with filename
260  fs::copy(old_path, new_path) 
261}
262
263pub fn copy_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
264  let old_path  = Path::new(old_path);
265  let new_path  = Path::new(new_dir).join("liftof-config.toml"); // Combine new directory with filename
266  fs::copy(old_path, new_path) 
267}
268
269pub fn delete_file(file_path: &str) -> Result<(), std::io::Error> {
270  let path = Path::new(file_path);
271  fs::remove_file(path) // Attempts to delete the file at the given path
272}
273
274/// Copy a config file from the queue to the current and 
275/// next directories and restart liftof-cc. 
276///
277/// As soon as the run is started, prepare the next run
278pub fn run_cycler(staging_dir : String, dry_run : bool) -> Result<(),StagingError> {
279  let queue_dir   = format!("{}/queue", staging_dir);
280  let next_dir    = format!("{}/next",  staging_dir);
281  let current_dir = format!("{}/current", staging_dir);
282
283  let queue   = get_queue(&queue_dir);
284  let current = get_queue(&current_dir);
285  let next    = get_queue(&next_dir); 
286  
287  if current.len() == 0 {
288    // we are f***ed
289    error!("We don't have a current configuration. This is BAD!");
290    return Err(StagingError::NoCurrentConfig);
291  }
292  
293  println!("= => Found {} files in run queue!", queue.len());
294  if next.len() == 0 && queue.len() == 0 {
295    println!("= => Nothing staged, will jusr repeat current run setting!");
296    if !dry_run {
297      manage_liftof_cc_service("restart");
298    }
299    thread::sleep(Duration::from_secs(20));
300    return Ok(());
301  }
302  if next.len() == 0 && queue.len() != 0 {
303    error!("Empty next directory, but we have files in the queue!");
304    match copy_file_rename_liftof(&queue[0], &next_dir) {
305      Ok(_) => (),
306      Err(err) => {
307        error!("Unable to copy {} to {}! {}", next[0], next_dir, err);
308      }
309    }
310    match move_file_rename_liftof(&queue[0], &current_dir) {
311      Ok(_) => (),
312      Err(err) => {
313        error!("Unable to copy {} to {}! {}", queue[0], current_dir, err);
314      }
315    }
316  }
317  if next.len() != 0  {
318    match delete_file(&current[0]) {
319      Ok(_)    => (),
320      Err(err) => {
321        error!("Unable to delete {}! {}", current[0], err);
322      }
323    }
324    match move_file_rename_liftof(&next[0], &current_dir) {
325      Ok(_) => (),
326      Err(err) => {
327        error!("Unable to copy {} to {}! {}", next[0], current_dir, err);
328      }
329    }
330    if queue.len() != 0 {
331      match move_file_with_name(&queue[0], &next_dir) {
332        Ok(_) => (),
333        Err(err) => {
334          error!("Unable to move {} to {}! {}", queue[0], next_dir, err);
335        }
336      }
337    }
338    println!("=> Restarting liftof-cc!");
339    if !dry_run {
340      manage_liftof_cc_service("restart");
341    }
342    thread::sleep(Duration::from_secs(20));
343  }
344  Ok(())
345}
346
347/// Prepare a new folder with the run id
348///
349/// This will assign a run id based on the 
350/// run ids in that folder. Additionally, 
351/// we will copy the current settings into
352/// that folder
353///
354/// TODO - connect it to the run database
355///
356/// # Arguments:
357///
358/// * data_path  : The global path on the inflight 
359///                tof computer where to store data.
360/// * config     : The current configuration, to be 
361///                copied into the new folder
362/// * run_id     : Optionally define a pre-given 
363///                run-id
364/// * create_dir : Create the directory for runfiles
365pub fn prepare_run(data_path  : String,
366                   config     : &LiftofSettings,
367                   run_id     : Option<u32>,
368                   create_dir : bool) -> Option<u32> {
369  let mut stream_files_path = PathBuf::from(data_path);
370  // Unwrap is ok here, since this should only happen at run start.
371  // Also, if the path is wrong, we are going to fail catastrophically
372  // anyway
373  let paths = fs::read_dir(stream_files_path.clone()).unwrap();
374  
375  let mut used_runids = Vec::<u32>::new();
376  for path in paths {
377    // this is hideous, I am so sorry. May the rust gods have mercy on my soul...
378    match format!("{}",path.as_ref().unwrap().path().iter().last().unwrap().to_str().unwrap()).parse::<u32>() {
379      Ok(this_run_id) => {
380        debug!("Extracted run id {}", this_run_id);
381        used_runids.push(this_run_id);
382      },
383      Err(err)        => {
384        warn!("Can not get runid from {}! {}", path.unwrap().path().display(), err);
385      }
386    }
387  }
388  let mut max_run_id = 0u32;
389  match used_runids.iter().max() {
390    None => (),
391    Some(_r) => {
392      max_run_id = *_r;
393    }
394  }
395  println!("=> Found {} used run ids in {}. Largest run id is {}",used_runids.len(), stream_files_path.display(), max_run_id);
396  let new_run_id : Option<u32>;
397  if max_run_id == 0 {
398    // we can use the run)given as the argument
399    // return run_id;
400    new_run_id = run_id;
401  } else if run_id.is_some() {
402    if used_runids.contains(&run_id.unwrap()) {
403      // the assigned run id has been used already
404      error!("Duplicate run id ({})!", run_id.unwrap());
405      //return None;
406      new_run_id = None;
407    } else {
408      //return run_id;
409      new_run_id = run_id;
410    }
411  } else {
412      new_run_id = Some(max_run_id + 1);
413  }
414  // We were not able to assign a new run id
415  if new_run_id.is_none() {
416    return new_run_id;
417  }
418  stream_files_path.push(new_run_id.unwrap().to_string().as_str());
419  if create_dir {
420    // Create directory if it does not exist
421    // Check if the directory exists
422    if let Ok(metadata) = fs::metadata(&stream_files_path) {
423      if metadata.is_dir() {
424        println!("=> Directory {} for run number {} already consists and may contain files!", stream_files_path.display(), new_run_id.unwrap());
425        // FILXME - in flight, we can not have interactivity.
426        // But the whole system with the run ids might change 
427      } 
428    } else {
429      match fs::create_dir(&stream_files_path) {
430        Ok(())   => println!("=> Created {} to save stream data", stream_files_path.display()),
431        Err(err) => panic!("Failed to create directory: {}! {}", stream_files_path.display(), err),
432      }
433    }
434  }
435  let settings_fname = format!("{}/run{}.toml",
436    stream_files_path.display(),
437    new_run_id.unwrap()); 
438  println!("=> Writing data to {}/{}!", stream_files_path.display(), new_run_id.unwrap());
439  println!("=> Writing settings to {}!", settings_fname);
440  config.to_toml(settings_fname);
441  return new_run_id;
442}
443
444
445///// Taka a data run. This can be either verification or physcis
446//pub fn run(with_calibration : bool, verification : bool) {
447//  //prepare_run(data_path  : String,
448//  //                 config     : &LiftofSettings,
449//  //                 run_id     : Option<u32>,
450//  //                 create_dir : bool) -> Option<u32> {
451//  if with_calibration {
452//    //calibrate_tof()
453//  }
454//}
455
456/// Trigger a restart of liftof-cc and start a new run
457///
458///
459/// # Arguments
460///
461///   * mode     : The argument given to the systemd service 
462///                - either "start", "stop", "restart", etc.
463/// # Returns:
464///   * success : true for succes
465pub fn manage_liftof_cc_service(mode : &str) -> TofReturnCode {
466  match Command::new("sudo")
467    .args(["systemctl", mode, "liftof"])
468    .spawn() {
469    Err(err) => {
470      error!("Unable to execute sudo systemctl {} liftof! {}", mode, err);
471      TofReturnCode::GeneralFail
472    }
473    Ok(_) => {
474      println!("=> Executed sudo systemctl {} liftof", mode);
475      TofReturnCode::Success
476    }
477  }
478}
479
480
481/// Trigger a general command on the Readoutboards 
482/// remotly through ssh.
483///
484/// Ssh keys and aliases (e.g. tof-rb02) must be 
485/// set up for this to work
486///
487/// # Arguments:
488///
489///   * rb_list : The list of ReadoutBoard ids the commands
490///               will get executed
491///   * cmd     : The actual command without 'ssh \<ip\>'
492///
493/// # Returns:
494///
495///   * A list of rb ids where the process failed
496pub fn ssh_command_rbs(rb_list : &Vec<u8>,
497                       cmd     : Vec<String>) -> Result<Vec<u8>, TofError> {
498  let mut rb_handles       = Vec::<thread::JoinHandle<_>>::new();
499  info!("=> Executing ssh command {:?} on {} RBs!", cmd, rb_list.len());
500  let mut children = Vec::<(u8,Child)>::new();
501  for rb in rb_list {
502    // also populate the rb thread nandles
503    rb_handles.push(thread::spawn(||{}));
504    let rb_address   = format!("tof-rb{:02}", rb);
505    let mut ssh_args = vec![rb_address];
506    let mut thisrb_cmd = cmd.clone();
507    ssh_args.append(&mut thisrb_cmd);
508    match Command::new("ssh")
509      //.args([&rb_address, "sudo", "systemctl", "restart", "liftof"])
510      .args(ssh_args)
511      .spawn() {
512      Err(err) => {
513        error!("Unable to spawn ssh process on RB {}! {}", rb, err);
514      }
515      Ok(child) => {
516        children.push((*rb,child));
517      }
518    }
519  }
520  let mut issues = Vec::<u8>::new();
521  for rb_child in &mut children {
522    // this is not optimal, since this will take as much 
523    // time as the slowest child, but at the moment we 
524    // have bigger fish to fry.
525    let timeout = Duration::from_secs(10);
526    let kill_t  = Instant::now();
527    loop {
528      if kill_t.elapsed() > timeout {
529        error!("SSH process for board {} timed out!", rb_child.0);
530        // Duuu hast aber einen schöönen Ball! [M. eine Stadt sucht einen Moerder]
531        match rb_child.1.kill() {
532          Err(err) => {
533            error!("Unable to kill the SSH process for RB {}! {err}", rb_child.0);
534          }
535          Ok(_) => {
536            error!("Killed SSH process for for RB {}", rb_child.0);
537          }
538        }
539        issues.push(rb_child.0);
540        // FIXME
541        break
542      }
543      // non-blocking
544      match rb_child.1.try_wait() {
545        Ok(None) => {
546          // the child is still busy
547          thread::sleep(Duration::from_secs(1));
548          continue;
549        }
550        Ok(Some(status)) => {
551          if status.success() {
552            info!("Execution of command on {} successful!", rb_child.0);
553            break;
554          } else {
555            error!("Execution of command on {} failed with exit code {:?}!", rb_child.0, status.code());
556            issues.push(rb_child.0);
557            break;
558          }
559        }
560        Err(err) => {
561          error!("Unable to wait for the SSH process! {err}");
562          break;
563        }
564      }
565    }
566  }
567  if issues.len() == 0 {
568    println!("=> Executing ssh command {:?} on {} RBs successful!", cmd, rb_list.len());
569  }
570  Ok(issues)
571}
572
573/// Restart liftof-rb on RBs
574pub fn restart_liftof_rb(rb_list : &Vec<u8>) {
575  let command = vec![String::from("sudo"),
576                     String::from("systemctl"),
577                     String::from("restart"),
578                     String::from("liftof")];
579  println!("=> Restarting liftof-rb on RBs!");
580  match ssh_command_rbs(rb_list, command) {
581    Err(err) => error!("Restarting liftof-rb on all RBs failed! {err}"),
582    Ok(_)    => ()
583  }
584}
585
586
587
588/// Run a full tof calibration - RBCalibration
589/// 
590/// The purpose of the RB calibration to is to create a 
591/// relationship between the adc/timing bins and voltages
592/// and nanoseconds
593///
594/// This function is blocking, until a certain (configurable)
595/// timeout is expired. The timeout can be set in the configuration
596/// file
597///
598/// # Argumeents:
599///
600///   * thread_control : general shared memory to hold configuration
601///                      settings, program st ate
602///   * rb_list        : List of active readoutboards
603///   * show_progress  : if true, it will show a progressbar with 
604///                      indicatif
605///
606pub fn calibrate_tof(thread_control : Arc<Mutex<ThreadControl>>,
607                     rb_list        : &Vec<ReadoutBoard>,
608                     show_progress  : bool) {
609
610  let one_second               = Duration::from_millis(1000);
611  let mut cc_pub_addr          = String::from("");
612  let calibration_timeout_fail = Duration::from_secs(300); // in seconds
613 
614  let mut cali_dir_created = false;
615  let mut cali_output_dir  = String::from("");
616  let mut cali_base_dir        = String::from("");
617
618  match thread_control.lock() {
619    Ok(mut tc) => {
620      for rb in rb_list {
621        tc.finished_calibrations.insert(rb.rb_id,false); 
622      }
623      cali_base_dir = tc.liftof_settings.calibration_dir.clone();
624      cc_pub_addr   = tc.liftof_settings.cmd_dispatcher_settings.cc_server_address.clone();
625      tc.write_data_to_disk = true;
626    },
627    Err(err) => {
628      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
629    },
630  }
631
632  // deprecated commanding
633  //let voltage_level = DEFAULT_CALIB_VOLTAGE;
634  //let rb_id         = DEFAULT_RB_ID;
635  //let extra         = DEFAULT_CALIB_EXTRA;
636  //println!("=> Received calibration default command! Will init calibration run of all RBs...");
637  //let cmd_payload: u32
638  //  = (voltage_level as u32) << 16 | (rb_id as u32) << 8 | (extra as u32);
639  //let default_calib_depr = TofCommand::DefaultCalibration(cmd_payload);
640  //let tp_depr = default_calib_depr.pack();
641  //let mut payload_depr = String::from("BRCT").into_bytes();
642  //payload_depr.append(&mut tp_depr.to_bytestream());
643
644  let mut default_calib      = TofCommand::new();
645  default_calib.command_code = TofCommandCode::RBCalibration;
646  let tp                     = default_calib.pack();
647  let mut payload            = String::from("BRCT").into_bytes();
648  payload.append(&mut tp.to_bytestream());
649  // open 0MQ socket here
650  let ctx = zmq::Context::new();
651  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
652
653  cmd_sender.bind(&cc_pub_addr).expect("Unable to bind to (PUB) socket!");
654  println!("=> Give the RBs a chance to connect and wait a bit..");
655  thread::sleep(10*one_second);
656  //match cmd_sender.send(&payload_depr, 0) { Err(err) => {
657  //    error!("Unable to send command! {err}");
658  //  },
659  //  Ok(_) => {
660  //    println!("=> Calibration  initialized!");
661  //  }
662  //}
663  match cmd_sender.send(&payload, 0) { Err(err) => {
664      error!("Unable to send command! {err}");
665    },
666    Ok(_) => {
667      println!("=> Calibration  initialized!");
668    }
669  }
670  match thread_control.lock() {
671    Ok(mut tc) => {
672      // deactivate the master trigger thread
673      tc.thread_master_trg_active =false;
674      tc.calibration_active = true;
675    },
676    Err(err) => {
677      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
678    },
679  }
680
681  println!("=> .. now we need to wait until the calibration is finished!");
682  let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
683  let bar_style  = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
684  let mut bar    = ProgressBar::hidden();
685  if show_progress {
686    bar = ProgressBar::new(rb_list.len() as u64); 
687    bar.set_position(0);
688    let bar_label  = String::from("Acquiring RB calibration data");
689    bar.set_message (bar_label);
690    bar.set_prefix  ("\u{2699}\u{1F4D0}");
691    bar.set_style   (bar_style);
692  }
693
694  // now block until the calibrations are done or we time outu
695  // FIXME - set timeout parameter in settings
696  let timeout = Instant::now();
697  let mut cali_received = 0;
698  'main: loop {
699    thread::sleep(10*one_second);
700    if timeout.elapsed() > calibration_timeout_fail {
701      error!("Calibration timeout! Calibrations might not be complete!");
702      match thread_control.lock() {
703        Ok(mut tc) => {
704          tc.calibration_active = false;
705        }
706        Err(err) => {
707          error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
708        }
709      }
710      if show_progress {
711        bar.finish_with_message("Done");
712      }
713      break;
714    }
715    //let mut rbcali = RBCalibrations::new();
716    match thread_control.lock() {
717      Ok(mut tc) => {
718        for rbid in rb_list {
719          // the global data sink sets these flags
720          let mut finished_keys = Vec::<u8>::new();
721          if tc.stop_flag {
722            println!("Stop signal received, exiting calibration routine!");
723            break 'main;
724          }
725          if tc.finished_calibrations[&rbid.rb_id] {
726            cali_received += 1;
727            let rbcali = tc.calibrations.get(&rbid.rb_id).expect("We got the signal tat this calibration is ready but it is not!");
728            let pack   = rbcali.pack();
729            // See RBCalibration reference
730            let file_type  = FileType::CalibrationFile(rbid.rb_id);
731            //println!("==> Writing stream to file with prefix {}", streamfile_name);
732            //let mut cali_writer = TofPacketWriter::new(write_stream_path.clone(), file_type);
733            if !cali_dir_created {
734              let today           = get_utc_timestamp();
735              cali_output_dir     = format!("{}/{}", cali_base_dir.clone(), today);
736              match create_dir_all(cali_output_dir.clone()) {
737                Ok(_)    => info!("Created {} for calibration data!", cali_output_dir),
738                Err(err) => error!("Unable to create {} for calibration data! {}", cali_output_dir, err)
739              }
740              cali_dir_created = true;
741            }
742            let mut cali_writer = TofPacketWriter::new(cali_output_dir.clone(), file_type);
743            cali_writer.add_tof_packet(&pack);
744            drop(cali_writer);
745
746            bar.set_position(cali_received);
747            finished_keys.push(rbid.rb_id);
748          }
749          for rbid in &finished_keys {
750            *tc.finished_calibrations.get_mut(&rbid).unwrap() = false; 
751          }
752        }
753        // FIXME - this or a timer
754        if cali_received as usize == rb_list.len() {
755          // cali_received = 0;
756          // if we want to redo a calibration, 
757          // somebody else has to set this 
758          // flag again.
759          tc.calibration_active = false;
760          // reset the counters
761          for rbid in rb_list {
762            *tc.finished_calibrations.get_mut(&rbid.rb_id).unwrap() = false; 
763          }
764          if show_progress {
765            bar.finish_with_message("Done");
766          }
767          break;
768        }
769      }
770      Err(err) => {
771        error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
772      }
773    }
774  } // end loop
775  // The last step is to create te symlink
776  let cali_link_dir = cali_base_dir.clone() + "latest";
777  match fs::remove_file(cali_link_dir.clone()) {
778    Ok(_) => {
779      println!("=> Symlink {} removed!", cali_link_dir);
780    },
781    Err(err) => {
782      error!("Unable to remove symlink to latest calibrations! {err}");
783    }
784  }
785  println!("=> Will create symlink {}", cali_link_dir);
786  match symlink(cali_output_dir, cali_link_dir) {
787    Err(err) => error!("Unable to create symlink for calibration data! {err}"),
788    Ok(_)    => ()
789  }
790}