liftof_cc/
lib.rs

1#[macro_use] extern crate log;
2
3pub mod constants;
4pub mod threads;
5
6use std::fs;
7use std::path::{
8  PathBuf,
9  Path
10};
11
12use std::collections::HashMap;
13use std::os::unix::fs::symlink;
14use std::sync::{
15  Arc,
16  Mutex,
17};
18
19use std::process::{
20  Command,
21  Child,
22};
23
24use std::thread;
25use std::fs::create_dir_all;
26
27use std::time::{
28  Duration,
29  Instant,
30};
31
32use indicatif::{
33  ProgressBar,
34  ProgressStyle
35};
36
37use comfy_table::modifiers::{
38  UTF8_ROUND_CORNERS,
39  UTF8_SOLID_INNER_BORDERS,
40};
41
42use comfy_table::presets::UTF8_FULL;
43use comfy_table::*;
44
45use gondola_core::prelude::*;
46
47/// communicaton between liftof-scheduler and 
48/// liftof-cc
49pub const LIFTOF_HOTWIRE : &str = "tcp://127.0.0.1:54321";
50
51/// Produce a nicely formattable table with per RB information for scalar
52/// values
53pub fn rb_table(counters : &HashMap<u8, u64>, label_is_hz : bool) -> Table {
54  let mut unit = "";
55  if label_is_hz {
56    unit = "Hz"
57  }
58  let mut table = Table::new();
59  table
60    .load_preset(UTF8_FULL)
61    .apply_modifier(UTF8_ROUND_CORNERS)
62    .apply_modifier(UTF8_SOLID_INNER_BORDERS)
63    .set_content_arrangement(ContentArrangement::Dynamic)
64    .set_width(80)
65    //.set_header(vec!["Readoutboard Rates:"])
66    .add_row(vec![
67        Cell::new(&(format!("RB01 {:.1} {}", counters[&1], unit))),
68        Cell::new(&(format!("RB02 {:.1} {}", counters[&2], unit))),
69        Cell::new(&(format!("RB03 {:.1} {}", counters[&3], unit))),
70        Cell::new(&(format!("RB04 {:.1} {}", counters[&4], unit))),
71        Cell::new(&(format!("RB05 {:.1} {}", counters[&5], unit))),
72        //Cell::new("Center aligned").set_alignment(CellAlignment::Center),
73    ])
74    .add_row(vec![
75        Cell::new(&(format!("RB06 {:.1} {}", counters[&6], unit))),
76        Cell::new(&(format!("RB07 {:.1} {}", counters[&7], unit))),
77        Cell::new(&(format!("RB08 {:.1} {}", counters[&8], unit))),
78        Cell::new(&(format!("RB09 {:.1} {}", counters[&9], unit))),
79        Cell::new(&(format!("RB10 {}", "N.A."))),
80    ])
81    .add_row(vec![
82        Cell::new(&(format!("RB11 {:.1} Hz", counters[&11]))),
83        Cell::new(&(format!("RB12 {}", "N.A."))),
84        Cell::new(&(format!("RB13 {:.1} Hz", counters[&13]))),
85        Cell::new(&(format!("RB14 {:.1} Hz", counters[&14]))),
86        Cell::new(&(format!("RB15 {:.1} Hz", counters[&15]))),
87    ])
88    .add_row(vec![
89        Cell::new(&(format!("RB16 {:.1} Hz", counters[&16]))),
90        Cell::new(&(format!("RB17 {:.1} Hz", counters[&17]))),
91        Cell::new(&(format!("RB18 {:.1} Hz", counters[&18]))),
92        Cell::new(&(format!("RB19 {:.1} Hz", counters[&19]))),
93        Cell::new(&(format!("RB20 {:.1} Hz", counters[&20]))),
94    ])
95    .add_row(vec![
96        Cell::new(&(format!("RB21 {:.1} Hz", counters[&21]))),
97        Cell::new(&(format!("RB22 {:.1} Hz", counters[&22]))),
98        Cell::new(&(format!("RB23 {:.1} Hz", counters[&23]))),
99        Cell::new(&(format!("RB24 {:.1} Hz", counters[&24]))),
100        Cell::new(&(format!("RB25 {:.1} Hz", counters[&25]))),
101    ])
102    .add_row(vec![
103        Cell::new(&(format!("RB26 {:.1} Hz", counters[&26]))),
104        Cell::new(&(format!("RB27 {:.1} Hz", counters[&27]))),
105        Cell::new(&(format!("RB28 {:.1} Hz", counters[&28]))),
106        Cell::new(&(format!("RB29 {:.1} Hz", counters[&29]))),
107        Cell::new(&(format!("RB30 {:.1} Hz", counters[&30]))),
108    ])
109    .add_row(vec![
110        Cell::new(&(format!("RB31 {:.1} Hz", counters[&31]))),
111        Cell::new(&(format!("RB32 {:.1} Hz", counters[&32]))),
112        Cell::new(&(format!("RB33 {:.1} Hz", counters[&33]))),
113        Cell::new(&(format!("RB34 {:.1} Hz", counters[&34]))),
114        Cell::new(&(format!("RB35 {:.1} Hz", counters[&35]))),
115    ])
116    .add_row(vec![
117        Cell::new(&(format!("RB36 {:.1}", counters[&36]))),
118        Cell::new(&(format!("RB37 {}", "N.A."))),
119        Cell::new(&(format!("RB38 {}", "N.A."))),
120        Cell::new(&(format!("RB39 {:.1}", counters[&39]))),
121        Cell::new(&(format!("RB40 {:.1}", counters[&40]))),
122    ])
123    .add_row(vec![
124        Cell::new(&(format!("RB41 {:.1}", counters[&41]))),
125        Cell::new(&(format!("RB43 {:.1}", counters[&42]))),
126        Cell::new(&(format!("RB42 {}", "N.A."))),
127        Cell::new(&(format!("RB44 {:.1}", counters[&44]))),
128        Cell::new(&(format!("RB45 {}", "N.A."))),
129    ])
130    .add_row(vec![
131        Cell::new(&(format!("RB46 {:.1} Hz", counters[&46]))),
132        Cell::new(&(format!("{}", "N.A."))),
133        Cell::new(&(format!("{}", "N.A."))),
134        Cell::new(&(format!("{}", "N.A."))),
135        Cell::new(&(format!("{}", "N.A."))),
136    ]);
137  table
138}
139
140/// Regular run start sequence
141pub fn init_run_start(cc_pub_addr : &str) {
142  let one_second   = Duration::from_secs(1);
143  // deprecated way of sending commands, however as long as we might have RBS
144  // with old software, we do want to send the "old style" as well
145  //let cmd_payload  = PAD_CMD_32BIT | (255u32) << 16 | (255u32) << 8 | (255u32);
146  //let cmd_depr     = TofCommand::DataRunStart(cmd_payload);
147  //let packet_depr  = cmd_depr.pack();
148  //let mut payload_depr = String::from("BRCT").into_bytes();
149  //payload_depr.append(&mut packet_depr.to_bytestream());
150  
151  let mut cmd      = TofCommand::new();
152  cmd.command_code = TofCommandCode::DataRunStart;
153  let packet       = cmd.pack();
154  let mut payload  = String::from("BRCT").into_bytes();
155  payload.append(&mut packet.to_bytestream());
156  
157  // open 0MQ socket here
158  let ctx         = zmq::Context::new();
159  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
160  cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
161  // after we opened the socket, give the RBs a chance to connect
162  println!("=> Sending run start command to RBs ..");
163  for _ in 0..10 {
164    thread::sleep(one_second);
165    print!("..");
166  }
167  //// send old and new commands
168  //match cmd_sender.send(&payload_depr, 0) {
169  //  Err(err) => {
170  //    error!("Unable to send command! {err}");
171  //  },
172  //  Ok(_) => {
173  //    debug!("We sent {:?}", payload);
174  //  }
175  //}
176  match cmd_sender.send(&payload, 0) {
177    Err(err) => {
178      error!("Unable to send command! {err}");
179    },
180    Ok(_) => {
181      debug!("We sent {:?}", payload);
182    }
183  }
184  print!("done!\n");
185}
186
187/// Regular run stop sequence
188pub fn end_run(cc_pub_addr : &str) {
189  //let cmd_depr     = TofCommand::DataRunStop(DEFAULT_RB_ID as u32);
190  //let packet_depr  = cmd_depr.pack();
191  //let mut payload_depr = String::from("BRCT").into_bytes();
192  //payload_depr.append(&mut packet_depr.to_bytestream());
193
194  let mut cmd      = TofCommand::new();
195  cmd.command_code = TofCommandCode::DataRunStop;
196  let packet       = cmd.pack();
197  let mut payload  = String::from("BRCT").into_bytes();
198  payload.append(&mut packet.to_bytestream());
199  let ctx         = zmq::Context::new();
200  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
201  cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
202  // after we opened the socket, give the RBs a chance to connect
203  println!("=> Sending run stop command to all RBs...");
204  println!("=> Waiting for RBs to stop data acquisition..");
205  for _ in 0..10 {
206    print!("..");
207  }
208  //match cmd_sender.send(&payload_depr, 0) {
209  //  Err(err) => {
210  //    error!("Unable to send command! {err}");
211  //  },
212  //  Ok(_) => {
213  //    debug!("We sent {:?}", payload);
214  //  }
215  //}
216  match cmd_sender.send(&payload, 0) {
217    Err(err) => {
218      error!("Unable to send command! {err}");
219    },
220    Ok(_) => {
221      debug!("We sent {:?}", payload);
222    }
223  }
224  print!("..done!\n");
225}
226
227/// Get the files in the queue and sort them by number
228pub fn get_queue(dir_path : &String) -> Vec<String> {
229  let mut entries = fs::read_dir(dir_path)
230    .expect("Directory might not exist!")
231    .map(|entry| entry.unwrap().path())
232    .collect::<Vec<PathBuf>>();
233  entries.sort_by(|a, b| {
234    let meta_a = fs::metadata(a).unwrap();
235    let meta_b = fs::metadata(b).unwrap();
236    meta_a.modified().unwrap().cmp(&meta_b.modified().unwrap())
237  });
238  entries.iter()
239    .map(|path| path.to_str().unwrap().to_string())
240    .collect()
241}
242
243pub fn move_file_with_name(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
244  let old_path  = Path::new(old_path);
245  let file_name = old_path.file_name().unwrap().to_str().unwrap(); // Extract filename
246  let new_path  = Path::new(new_dir).join(file_name); // Combine new directory with filename
247  fs::rename(old_path, new_path) // Move the file
248}
249
250pub fn move_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
251  let old_path  = Path::new(old_path);
252  let new_path  = Path::new(new_dir).join("liftof-config.toml"); // Combine new directory with filename
253  fs::rename(old_path, new_path) // Move the file
254}
255
256pub fn copy_file(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
257  let old_path  = Path::new(old_path);
258  let file_name = old_path.file_name().unwrap().to_str().unwrap(); // Extract filename
259  let new_path  = Path::new(new_dir).join(file_name); // Combine new directory with filename
260  fs::copy(old_path, new_path) 
261}
262
263pub fn copy_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
264  let old_path  = Path::new(old_path);
265  let new_path  = Path::new(new_dir).join("liftof-config.toml"); // Combine new directory with filename
266  fs::copy(old_path, new_path) 
267}
268
269pub fn delete_file(file_path: &str) -> Result<(), std::io::Error> {
270  let path = Path::new(file_path);
271  fs::remove_file(path) // Attempts to delete the file at the given path
272}
273
274/// Copy a config file from the queue to the current and 
275/// next directories and restart liftof-cc. 
276///
277/// As soon as the run is started, prepare the next run
278pub fn run_cycler(staging_dir : String, dry_run : bool) -> Result<(),StagingError> {
279  let queue_dir   = format!("{}/queue", staging_dir);
280  let next_dir    = format!("{}/next",  staging_dir);
281  let current_dir = format!("{}/current", staging_dir);
282
283  let queue   = get_queue(&queue_dir);
284  let current = get_queue(&current_dir);
285  let next    = get_queue(&next_dir); 
286  
287  if current.len() == 0 {
288    // we are f***ed
289    error!("We don't have a current configuration. This is BAD!");
290    return Err(StagingError::NoCurrentConfig);
291  }
292  
293  println!("= => Found {} files in run queue!", queue.len());
294  if next.len() == 0 && queue.len() == 0 {
295    println!("= => Nothing staged, will jusr repeat current run setting!");
296    if !dry_run {
297      manage_liftof_cc_service("restart");
298    }
299    thread::sleep(Duration::from_secs(20));
300    return Ok(());
301  }
302  if next.len() == 0 && queue.len() != 0 {
303    error!("Empty next directory, but we have files in the queue!");
304    match copy_file_rename_liftof(&queue[0], &next_dir) {
305      Ok(_) => (),
306      Err(err) => {
307        error!("Unable to copy {} to {}! {}", next[0], next_dir, err);
308      }
309    }
310    match move_file_rename_liftof(&queue[0], &current_dir) {
311      Ok(_) => (),
312      Err(err) => {
313        error!("Unable to copy {} to {}! {}", queue[0], current_dir, err);
314      }
315    }
316  }
317  if next.len() != 0  {
318    match delete_file(&current[0]) {
319      Ok(_)    => (),
320      Err(err) => {
321        error!("Unable to delete {}! {}", current[0], err);
322      }
323    }
324    match move_file_rename_liftof(&next[0], &current_dir) {
325      Ok(_) => (),
326      Err(err) => {
327        error!("Unable to copy {} to {}! {}", next[0], current_dir, err);
328      }
329    }
330    if queue.len() != 0 {
331      match move_file_with_name(&queue[0], &next_dir) {
332        Ok(_) => (),
333        Err(err) => {
334          error!("Unable to move {} to {}! {}", queue[0], next_dir, err);
335        }
336      }
337    }
338    println!("=> Restarting liftof-cc!");
339    if !dry_run {
340      manage_liftof_cc_service("restart");
341    }
342    thread::sleep(Duration::from_secs(20));
343  }
344  Ok(())
345}
346
347/// Prepare a new folder with the run id
348///
349/// This will assign a run id based on the 
350/// run ids in that folder. Additionally, 
351/// we will copy the current settings into
352/// that folder
353///
354/// TODO - connect it to the run database
355///
356/// # Arguments:
357///
358/// * data_path  : The global path on the inflight 
359///                tof computer where to store data.
360/// * config     : The current configuration, to be 
361///                copied into the new folder
362/// * run_id     : Optionally define a pre-given 
363///                run-id
364/// * create_dir : Create the directory for runfiles
365pub fn prepare_run(data_path  : String,
366                   config     : &LiftofSettings,
367                   run_id     : Option<u32>,
368                   create_dir : bool) -> Option<u32> {
369  let mut stream_files_path = PathBuf::from(data_path);
370  // Unwrap is ok here, since this should only happen at run start.
371  // Also, if the path is wrong, we are going to fail catastrophically
372  // anyway
373  let paths = fs::read_dir(stream_files_path.clone()).unwrap();
374  
375  let mut used_runids = Vec::<u32>::new();
376  for path in paths {
377    // this is hideous, I am so sorry. May the rust gods have mercy on my soul...
378    match format!("{}",path.as_ref().unwrap().path().iter().last().unwrap().to_str().unwrap()).parse::<u32>() {
379      Ok(this_run_id) => {
380        debug!("Extracted run id {}", this_run_id);
381        used_runids.push(this_run_id);
382      },
383      Err(err)        => {
384        warn!("Can not get runid from {}! {}", path.unwrap().path().display(), err);
385      }
386    }
387  }
388  let mut max_run_id = 0u32;
389  match used_runids.iter().max() {
390    None => (),
391    Some(_r) => {
392      max_run_id = *_r;
393    }
394  }
395  println!("=> Found {} used run ids in {}. Largest run id is {}",used_runids.len(), stream_files_path.display(), max_run_id);
396  let new_run_id : Option<u32>;
397  if max_run_id == 0 {
398    // we can use the run)given as the argument
399    // return run_id;
400    new_run_id = run_id;
401  } else if run_id.is_some() {
402    if used_runids.contains(&run_id.unwrap()) {
403      // the assigned run id has been used already
404      error!("Duplicate run id ({})!", run_id.unwrap());
405      //return None;
406      new_run_id = None;
407    } else {
408      //return run_id;
409      new_run_id = run_id;
410    }
411  } else {
412      new_run_id = Some(max_run_id + 1);
413  }
414  // We were not able to assign a new run id
415  if new_run_id.is_none() {
416    return new_run_id;
417  }
418  stream_files_path.push(new_run_id.unwrap().to_string().as_str());
419  if create_dir {
420    // Create directory if it does not exist
421    // Check if the directory exists
422    if let Ok(metadata) = fs::metadata(&stream_files_path) {
423      if metadata.is_dir() {
424        println!("=> Directory {} for run number {} already consists and may contain files!", stream_files_path.display(), new_run_id.unwrap());
425        // FILXME - in flight, we can not have interactivity.
426        // But the whole system with the run ids might change 
427      } 
428    } else {
429      match fs::create_dir(&stream_files_path) {
430        Ok(())   => println!("=> Created {} to save stream data", stream_files_path.display()),
431        Err(err) => panic!("Failed to create directory: {}! {}", stream_files_path.display(), err),
432      }
433    }
434  }
435  let settings_fname = format!("{}/run{}.toml",
436    stream_files_path.display(),
437    new_run_id.unwrap()); 
438  println!("=> Writing data to {}/{}!", stream_files_path.display(), new_run_id.unwrap());
439  println!("=> Writing settings to {}!", settings_fname);
440  config.to_toml(settings_fname);
441  return new_run_id;
442}
443
444
445///// Taka a data run. This can be either verification or physcis
446//pub fn run(with_calibration : bool, verification : bool) {
447//  //prepare_run(data_path  : String,
448//  //                 config     : &LiftofSettings,
449//  //                 run_id     : Option<u32>,
450//  //                 create_dir : bool) -> Option<u32> {
451//  if with_calibration {
452//    //calibrate_tof()
453//  }
454//}
455
456/// Trigger a restart of liftof-cc and start a new run
457///
458///
459/// # Arguments
460///
461///   * mode     : The argument given to the systemd service 
462///                - either "start", "stop", "restart", etc.
463/// # Returns:
464///   * success : true for succes
465pub fn manage_liftof_cc_service(mode : &str) -> TofReturnCode {
466  match Command::new("sudo")
467    .args(["systemctl", mode, "liftof"])
468    .spawn() {
469    Err(err) => {
470      error!("Unable to execute sudo systemctl {} liftof! {}", mode, err);
471      TofReturnCode::GeneralFail
472    }
473    Ok(_) => {
474      println!("=> Executed sudo systemctl {} liftof", mode);
475      TofReturnCode::Success
476    }
477  }
478}
479
480
481/// Trigger a general command on the Readoutboards 
482/// remotly through ssh.
483///
484/// Ssh keys and aliases (e.g. tof-rb02) must be 
485/// set up for this to work
486///
487/// # Arguments:
488///
489///   * rb_list : The list of ReadoutBoard ids the commands
490///               will get executed
491///   * cmd     : The actual command without 'ssh \<ip\>'
492///
493/// # Returns:
494///
495///   * A list of rb ids where the process failed
496pub fn ssh_command_rbs(rb_list         : &Vec<u8>,
497                       cmd             : Vec<String>,
498                       ignore_fail     : bool) -> Result<Vec<u8>, TofError> {
499  let mut rb_handles       = Vec::<thread::JoinHandle<_>>::new();
500  info!("=> Executing ssh command {:?} on {} RBs!", cmd, rb_list.len());
501  let mut children = Vec::<(u8,Child)>::new();
502  for rb in rb_list {
503    // also populate the rb thread nandles
504    rb_handles.push(thread::spawn(||{}));
505    let rb_address   = format!("tof-rb{:02}", rb);
506    let mut ssh_args = vec![rb_address];
507    let mut thisrb_cmd = cmd.clone();
508    ssh_args.append(&mut thisrb_cmd);
509    match Command::new("ssh")
510      //.args([&rb_address, "sudo", "systemctl", "restart", "liftof"])
511      .args(ssh_args)
512      .spawn() {
513      Err(err) => {
514        error!("Unable to spawn ssh process on RB {}! {}", rb, err);
515      }
516      Ok(child) => {
517        children.push((*rb,child));
518      }
519    }
520  }
521  let mut issues = Vec::<u8>::new();
522  for rb_child in &mut children {
523    // this is not optimal, since this will take as much 
524    // time as the slowest child, but at the moment we 
525    // have bigger fish to fry.
526    let timeout = Duration::from_secs(10);
527    let kill_t  = Instant::now();
528    loop {
529      if kill_t.elapsed() > timeout {
530        error!("SSH process for board {} timed out!", rb_child.0);
531        // Duuu hast aber einen schöönen Ball! [M. eine Stadt sucht einen Moerder]
532        match rb_child.1.kill() {
533          Err(err) => {
534            error!("Unable to kill the SSH process for RB {}! {err}", rb_child.0);
535          }
536          Ok(_) => {
537            error!("Killed SSH process for for RB {}", rb_child.0);
538          }
539        }
540        issues.push(rb_child.0);
541        // FIXME
542        break
543      }
544      // non-blocking
545      match rb_child.1.try_wait() {
546        Ok(None) => {
547          // the child is still busy
548          thread::sleep(Duration::from_secs(1));
549          continue;
550        }
551        Ok(Some(status)) => {
552          if status.success() {
553            info!("Execution of command on {} successful!", rb_child.0);
554            break;
555          } else {
556            if !ignore_fail {
557              error!("Execution of command on {} failed with exit code {:?}!", rb_child.0, status.code());
558              issues.push(rb_child.0);
559            }
560            break;
561          }
562        }
563        Err(err) => {
564          error!("Unable to wait for the SSH process! {err}");
565          break;
566        }
567      }
568    }
569  }
570  if issues.len() == 0 {
571    println!("=> Executing ssh command {:?} on {} RBs successful!", cmd, rb_list.len());
572  }
573  Ok(issues)
574}
575
576/// Restart liftof-rb on RBs
577pub fn restart_liftof_rb(rb_list : &Vec<u8>) {
578  let command = vec![String::from("sudo"),
579                     String::from("systemctl"),
580                     String::from("restart"),
581                     String::from("liftof")];
582  println!("=> Restarting liftof-rb on RBs!");
583  match ssh_command_rbs(rb_list, command, false) {
584    Err(err) => error!("Restarting liftof-rb on all RBs failed! {err}"),
585    Ok(_)    => ()
586  }
587}
588
589/// Set SMA mode through rb-mode with tof-control's binary
590pub fn set_rbs_sma_mode(rb_list : &Vec<u8>) {
591  let command = vec![String::from("bin/rb-mode"),
592                     String::from("-s"),
593                     String::from("sma")];
594  println!("=> Calling rb-mode on RBs to set SMA mode!");
595  match ssh_command_rbs(rb_list, command, false) {
596    Err(err) => error!("Calling rb-mode on all RBs failed! {err}"),
597    Ok(_)    => ()
598  }
599}
600
601/// Restart liftof-rb on RBs
602pub fn rm_liftof_rb_logs(rb_list : &Vec<u8>) {
603  let command = vec![String::from("rm"),
604                     String::from("/home/gaps/log/liftof.log"),
605                     String::from("/home/gaps/log/liftof.err")];
606  println!("=> Deleting Logs on RBs!");
607  match ssh_command_rbs(rb_list, command, true) {
608    Err(err) => debug!("Deleting logs on all RBs failed! {err}"),
609    Ok(_)    => ()
610  }
611}
612
613
614/// Run a full tof calibration - RBCalibration
615/// 
616/// The purpose of the RB calibration to is to create a 
617/// relationship between the adc/timing bins and voltages
618/// and nanoseconds
619///
620/// This function is blocking, until a certain (configurable)
621/// timeout is expired. The timeout can be set in the configuration
622/// file
623///
624/// # Argumeents:
625///
626///   * thread_control : general shared memory to hold configuration
627///                      settings, program st ate
628///   * rb_list        : List of active readoutboards
629///   * show_progress  : if true, it will show a progressbar with 
630///                      indicatif
631///
632pub fn calibrate_tof(thread_control : Arc<Mutex<ThreadControl>>,
633                     rb_list        : &Vec<ReadoutBoard>,
634                     show_progress  : bool) {
635
636  let one_second               = Duration::from_millis(1000);
637  let mut cc_pub_addr          = String::from("");
638  let calibration_timeout_fail = Duration::from_secs(300); // in seconds
639 
640  let mut cali_dir_created = false;
641  let mut cali_output_dir  = String::from("");
642  let mut cali_base_dir        = String::from("");
643
644  match thread_control.lock() {
645    Ok(mut tc) => {
646      for rb in rb_list {
647        tc.finished_calibrations.insert(rb.rb_id,false); 
648      }
649      cali_base_dir         = tc.liftof_settings.calibration_dir.clone();
650      cc_pub_addr           = tc.liftof_settings.cmd_dispatcher_settings.cc_server_address.clone();
651      tc.write_data_to_disk = true;
652    },
653    Err(err) => {
654      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
655    },
656  }
657
658  // deprecated commanding
659  //let voltage_level = DEFAULT_CALIB_VOLTAGE;
660  //let rb_id         = DEFAULT_RB_ID;
661  //let extra         = DEFAULT_CALIB_EXTRA;
662  //println!("=> Received calibration default command! Will init calibration run of all RBs...");
663  //let cmd_payload: u32
664  //  = (voltage_level as u32) << 16 | (rb_id as u32) << 8 | (extra as u32);
665  //let default_calib_depr = TofCommand::DefaultCalibration(cmd_payload);
666  //let tp_depr = default_calib_depr.pack();
667  //let mut payload_depr = String::from("BRCT").into_bytes();
668  //payload_depr.append(&mut tp_depr.to_bytestream());
669
670  let mut default_calib      = TofCommand::new();
671  default_calib.command_code = TofCommandCode::RBCalibration;
672  let tp                     = default_calib.pack();
673  let mut payload            = String::from("BRCT").into_bytes();
674  payload.append(&mut tp.to_bytestream());
675  // open 0MQ socket here
676  let ctx = zmq::Context::new();
677  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
678
679  cmd_sender.bind(&cc_pub_addr).expect("Unable to bind to (PUB) socket!");
680  println!("=> Give the RBs a chance to connect and wait a bit..");
681  thread::sleep(10*one_second);
682  //match cmd_sender.send(&payload_depr, 0) { Err(err) => {
683  //    error!("Unable to send command! {err}");
684  //  },
685  //  Ok(_) => {
686  //    println!("=> Calibration  initialized!");
687  //  }
688  //}
689  match cmd_sender.send(&payload, 0) { Err(err) => {
690      error!("Unable to send command! {err}");
691    },
692    Ok(_) => {
693      println!("=> Calibration  initialized!");
694    }
695  }
696  match thread_control.lock() {
697    Ok(mut tc) => {
698      // deactivate the master trigger thread
699      tc.thread_master_trg_active =false;
700      tc.calibration_active = true;
701    },
702    Err(err) => {
703      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
704    },
705  }
706
707  println!("=> .. now we need to wait until the calibration is finished!");
708  let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
709  let bar_style  = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
710  let mut bar    = ProgressBar::hidden();
711  if show_progress {
712    bar = ProgressBar::new(rb_list.len() as u64); 
713    bar.set_position(0);
714    let bar_label  = String::from("Acquiring RB calibration data");
715    bar.set_message (bar_label);
716    bar.set_prefix  ("\u{2699}\u{1F4D0}");
717    bar.set_style   (bar_style);
718  }
719
720  // now block until the calibrations are done or we time outu
721  // FIXME - set timeout parameter in settings
722  let timeout = Instant::now();
723  let mut left_over_rbs : Vec<u8> = vec![1,2,3,4,5,6,7,8,9,11,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,39,40,41,42,44,46];
724  let mut cali_received = 0;
725  'main: loop {
726    thread::sleep(10*one_second);
727    if timeout.elapsed() > calibration_timeout_fail {
728      error!("Calibration timeout! Calibrations might not be complete!");
729      error!("Incomplete calibrations {:?}", left_over_rbs);
730      match thread_control.lock() {
731        Ok(mut tc) => {
732          tc.calibration_active = false;
733        }
734        Err(err) => {
735          debug!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
736        }
737      }
738      if show_progress {
739        bar.finish_with_message("Done");
740      }
741      break;
742    }
743    //let mut rbcali = RBCalibrations::new();
744    match thread_control.lock() {
745      Ok(mut tc) => {
746        for rbid in rb_list {
747          // the global data sink sets these flags
748          let mut finished_keys = Vec::<u8>::new();
749          if tc.stop_flag {
750            println!("Stop signal received, exiting calibration routine!");
751            break 'main;
752          }
753          if tc.finished_calibrations[&rbid.rb_id] {
754            cali_received += 1;
755            let rbcali = tc.calibrations.get(&rbid.rb_id).expect("We got the signal tat this calibration is ready but it is not!");
756            let pack   = rbcali.pack();
757            // See RBCalibration reference
758            let file_type  = FileType::CalibrationFile(rbid.rb_id);
759            //println!("==> Writing stream to file with prefix {}", streamfile_name);
760            //let mut cali_writer = TofPacketWriter::new(write_stream_path.clone(), file_type);
761            if !cali_dir_created {
762              let today           = get_utc_timestamp();
763              cali_output_dir     = format!("{}/{}", cali_base_dir.clone(), today);
764              match create_dir_all(cali_output_dir.clone()) {
765                Ok(_)    => info!("Created {} for calibration data!", cali_output_dir),
766                Err(err) => error!("Unable to create {} for calibration data! {}", cali_output_dir, err)
767              }
768              cali_dir_created = true;
769            }
770            let mut cali_writer = TofPacketWriter::new(cali_output_dir.clone(), file_type);
771            cali_writer.add_tof_packet(&pack);
772            drop(cali_writer);
773
774            bar.set_position(cali_received);
775            finished_keys.push(rbid.rb_id);
776            left_over_rbs.retain(|&x| x != rbid.rb_id);
777          }
778          for rbid in &finished_keys {
779            *tc.finished_calibrations.get_mut(&rbid).unwrap() = false; 
780          }
781        }
782        // FIXME - this or a timer
783        if cali_received as usize == rb_list.len() {
784          // cali_received = 0;
785          // if we want to redo a calibration, 
786          // somebody else has to set this 
787          // flag again.
788          tc.calibration_active = false;
789          // reset the counters
790          for rbid in rb_list {
791            *tc.finished_calibrations.get_mut(&rbid.rb_id).unwrap() = false; 
792          }
793          if show_progress {
794            bar.finish_with_message("Done");
795          }
796          break;
797        }
798      }
799      Err(err) => {
800        debug!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
801      }
802    }
803  } // end loop
804  // The last step is to create te symlink
805  let cali_link_dir = cali_base_dir.clone() + "latest";
806  match fs::remove_file(cali_link_dir.clone()) {
807    Ok(_) => {
808      println!("=> Symlink {} removed!", cali_link_dir);
809    },
810    Err(err) => {
811      error!("Unable to remove symlink to latest calibrations! {err}");
812    }
813  }
814  println!("=> Will create symlink {}", cali_link_dir);
815  match symlink(cali_output_dir, cali_link_dir) {
816    Err(err) => error!("Unable to create symlink for calibration data! {err}"),
817    Ok(_)    => ()
818  }
819}