liftof_cc/
lib.rs

1#[macro_use] extern crate log;
2
3pub mod constants;
4pub mod threads;
5
6use std::fs;
7use std::path::{
8  PathBuf,
9  Path
10};
11
12use std::collections::HashMap;
13use std::os::unix::fs::symlink;
14use std::sync::{
15  Arc,
16  Mutex,
17};
18
19use std::process::{
20  Command,
21  Child,
22};
23
24use std::thread;
25use std::fs::create_dir_all;
26
27use std::time::{
28  Duration,
29  Instant,
30};
31
32use crossbeam_channel::Sender;
33
34use indicatif::{
35  ProgressBar,
36  ProgressStyle
37};
38
39use comfy_table::modifiers::{
40  UTF8_ROUND_CORNERS,
41  UTF8_SOLID_INNER_BORDERS,
42};
43
44use comfy_table::presets::UTF8_FULL;
45use comfy_table::*;
46
47use liftof_lib::constants::{
48  DEFAULT_CALIB_VOLTAGE,
49  DEFAULT_RB_ID,
50  DEFAULT_CALIB_EXTRA,
51};
52
53use tof_dataclasses::constants::PAD_CMD_32BIT;
54use tof_dataclasses::serialization::{
55  Serialization,
56  Packable
57};
58
59use tof_dataclasses::errors::{
60  StagingError,
61  TofError
62};
63
64use tof_dataclasses::commands::{
65  TofCommand,
66  TofCommandV2,
67  TofReturnCode,
68  TofCommandCode,
69};
70
71use tof_dataclasses::status::TofDetectorStatus;
72use tof_dataclasses::packets::TofPacket;
73use tof_dataclasses::database::ReadoutBoard;
74
75use tof_dataclasses::io::{
76    TofPacketWriter,
77    FileType,
78    get_utc_timestamp
79};
80
81use liftof_lib::settings::LiftofSettings;
82use liftof_lib::thread_control::ThreadControl;
83
84/// communicaton between liftof-scheduler and 
85/// liftof-cc
86pub const LIFTOF_HOTWIRE : &str = "tcp://127.0.0.1:54321";
87
88/// Produce a nicely formattable table with per RB information for scalar
89/// values
90pub fn rb_table(counters : &HashMap<u8, u64>, label_is_hz : bool) -> Table {
91  let mut unit = "";
92  if label_is_hz {
93    unit = "Hz"
94  }
95  let mut table = Table::new();
96  table
97    .load_preset(UTF8_FULL)
98    .apply_modifier(UTF8_ROUND_CORNERS)
99    .apply_modifier(UTF8_SOLID_INNER_BORDERS)
100    .set_content_arrangement(ContentArrangement::Dynamic)
101    .set_width(80)
102    //.set_header(vec!["Readoutboard Rates:"])
103    .add_row(vec![
104        Cell::new(&(format!("RB01 {:.1} {}", counters[&1], unit))),
105        Cell::new(&(format!("RB02 {:.1} {}", counters[&2], unit))),
106        Cell::new(&(format!("RB03 {:.1} {}", counters[&3], unit))),
107        Cell::new(&(format!("RB04 {:.1} {}", counters[&4], unit))),
108        Cell::new(&(format!("RB05 {:.1} {}", counters[&5], unit))),
109        //Cell::new("Center aligned").set_alignment(CellAlignment::Center),
110    ])
111    .add_row(vec![
112        Cell::new(&(format!("RB06 {:.1} {}", counters[&6], unit))),
113        Cell::new(&(format!("RB07 {:.1} {}", counters[&7], unit))),
114        Cell::new(&(format!("RB08 {:.1} {}", counters[&8], unit))),
115        Cell::new(&(format!("RB09 {:.1} {}", counters[&9], unit))),
116        Cell::new(&(format!("RB10 {}", "N.A."))),
117    ])
118    .add_row(vec![
119        Cell::new(&(format!("RB11 {:.1} Hz", counters[&11]))),
120        Cell::new(&(format!("RB12 {}", "N.A."))),
121        Cell::new(&(format!("RB13 {:.1} Hz", counters[&13]))),
122        Cell::new(&(format!("RB14 {:.1} Hz", counters[&14]))),
123        Cell::new(&(format!("RB15 {:.1} Hz", counters[&15]))),
124    ])
125    .add_row(vec![
126        Cell::new(&(format!("RB16 {:.1} Hz", counters[&16]))),
127        Cell::new(&(format!("RB17 {:.1} Hz", counters[&17]))),
128        Cell::new(&(format!("RB18 {:.1} Hz", counters[&18]))),
129        Cell::new(&(format!("RB19 {:.1} Hz", counters[&19]))),
130        Cell::new(&(format!("RB20 {:.1} Hz", counters[&20]))),
131    ])
132    .add_row(vec![
133        Cell::new(&(format!("RB21 {:.1} Hz", counters[&21]))),
134        Cell::new(&(format!("RB22 {:.1} Hz", counters[&22]))),
135        Cell::new(&(format!("RB23 {:.1} Hz", counters[&23]))),
136        Cell::new(&(format!("RB24 {:.1} Hz", counters[&24]))),
137        Cell::new(&(format!("RB25 {:.1} Hz", counters[&25]))),
138    ])
139    .add_row(vec![
140        Cell::new(&(format!("RB26 {:.1} Hz", counters[&26]))),
141        Cell::new(&(format!("RB27 {:.1} Hz", counters[&27]))),
142        Cell::new(&(format!("RB28 {:.1} Hz", counters[&28]))),
143        Cell::new(&(format!("RB29 {:.1} Hz", counters[&29]))),
144        Cell::new(&(format!("RB30 {:.1} Hz", counters[&30]))),
145    ])
146    .add_row(vec![
147        Cell::new(&(format!("RB31 {:.1} Hz", counters[&31]))),
148        Cell::new(&(format!("RB32 {:.1} Hz", counters[&32]))),
149        Cell::new(&(format!("RB33 {:.1} Hz", counters[&33]))),
150        Cell::new(&(format!("RB34 {:.1} Hz", counters[&34]))),
151        Cell::new(&(format!("RB35 {:.1} Hz", counters[&35]))),
152    ])
153    .add_row(vec![
154        Cell::new(&(format!("RB36 {:.1}", counters[&36]))),
155        Cell::new(&(format!("RB37 {}", "N.A."))),
156        Cell::new(&(format!("RB38 {}", "N.A."))),
157        Cell::new(&(format!("RB39 {:.1}", counters[&39]))),
158        Cell::new(&(format!("RB40 {:.1}", counters[&40]))),
159    ])
160    .add_row(vec![
161        Cell::new(&(format!("RB41 {:.1}", counters[&41]))),
162        Cell::new(&(format!("RB43 {:.1}", counters[&42]))),
163        Cell::new(&(format!("RB42 {}", "N.A."))),
164        Cell::new(&(format!("RB44 {:.1}", counters[&44]))),
165        Cell::new(&(format!("RB45 {}", "N.A."))),
166    ])
167    .add_row(vec![
168        Cell::new(&(format!("RB46 {:.1} Hz", counters[&46]))),
169        Cell::new(&(format!("{}", "N.A."))),
170        Cell::new(&(format!("{}", "N.A."))),
171        Cell::new(&(format!("{}", "N.A."))),
172        Cell::new(&(format!("{}", "N.A."))),
173    ]);
174  table
175}
176
177/// Regular run start sequence
178pub fn init_run_start(cc_pub_addr : &str) {
179  let one_second   = Duration::from_secs(1);
180  // deprecated way of sending commands, however as long as we might have RBS
181  // with old software, we do want to send the "old style" as well
182  let cmd_payload  = PAD_CMD_32BIT | (255u32) << 16 | (255u32) << 8 | (255u32);
183  let cmd_depr     = TofCommand::DataRunStart(cmd_payload);
184  let packet_depr  = cmd_depr.pack();
185  let mut payload_depr = String::from("BRCT").into_bytes();
186  payload_depr.append(&mut packet_depr.to_bytestream());
187  
188  let mut cmd      = TofCommandV2::new();
189  cmd.command_code = TofCommandCode::DataRunStart;
190  let packet       = cmd.pack();
191  let mut payload  = String::from("BRCT").into_bytes();
192  payload.append(&mut packet.to_bytestream());
193  
194  // open 0MQ socket here
195  let ctx         = zmq::Context::new();
196  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
197  cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
198  // after we opened the socket, give the RBs a chance to connect
199  println!("=> Sending run start command to RBs ..");
200  for _ in 0..10 {
201    thread::sleep(one_second);
202    print!("..");
203  }
204  // send old and new commands
205  match cmd_sender.send(&payload_depr, 0) {
206    Err(err) => {
207      error!("Unable to send command! {err}");
208    },
209    Ok(_) => {
210      debug!("We sent {:?}", payload);
211    }
212  }
213  match cmd_sender.send(&payload, 0) {
214    Err(err) => {
215      error!("Unable to send command! {err}");
216    },
217    Ok(_) => {
218      debug!("We sent {:?}", payload);
219    }
220  }
221  print!("done!\n");
222}
223
224/// Regular run stop sequence
225pub fn end_run(cc_pub_addr : &str) {
226  let cmd_depr     = TofCommand::DataRunStop(DEFAULT_RB_ID as u32);
227  let packet_depr  = cmd_depr.pack();
228  let mut payload_depr = String::from("BRCT").into_bytes();
229  payload_depr.append(&mut packet_depr.to_bytestream());
230
231  let mut cmd      = TofCommandV2::new();
232  cmd.command_code = TofCommandCode::DataRunStop;
233  let packet       = cmd.pack();
234  let mut payload  = String::from("BRCT").into_bytes();
235  payload.append(&mut packet.to_bytestream());
236  let ctx         = zmq::Context::new();
237  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
238  cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
239  // after we opened the socket, give the RBs a chance to connect
240  println!("=> Sending run stop command to all RBs...");
241  println!("=> Waiting for RBs to stoop data acquisition..");
242  for _ in 0..10 {
243    print!("..");
244  }
245  match cmd_sender.send(&payload_depr, 0) {
246    Err(err) => {
247      error!("Unable to send command! {err}");
248    },
249    Ok(_) => {
250      debug!("We sent {:?}", payload);
251    }
252  }
253  match cmd_sender.send(&payload, 0) {
254    Err(err) => {
255      error!("Unable to send command! {err}");
256    },
257    Ok(_) => {
258      debug!("We sent {:?}", payload);
259    }
260  }
261  print!("..done!\n");
262}
263
264/// Get the files in the queue and sort them by number
265pub fn get_queue(dir_path : &String) -> Vec<String> {
266  let mut entries = fs::read_dir(dir_path)
267    .expect("Directory might not exist!")
268    .map(|entry| entry.unwrap().path())
269    .collect::<Vec<PathBuf>>();
270  entries.sort_by(|a, b| {
271    let meta_a = fs::metadata(a).unwrap();
272    let meta_b = fs::metadata(b).unwrap();
273    meta_a.modified().unwrap().cmp(&meta_b.modified().unwrap())
274  });
275  entries.iter()
276    .map(|path| path.to_str().unwrap().to_string())
277    .collect()
278}
279
280pub fn move_file_with_name(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
281  let old_path  = Path::new(old_path);
282  let file_name = old_path.file_name().unwrap().to_str().unwrap(); // Extract filename
283  let new_path  = Path::new(new_dir).join(file_name); // Combine new directory with filename
284  fs::rename(old_path, new_path) // Move the file
285}
286
287pub fn move_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
288  let old_path  = Path::new(old_path);
289  let new_path  = Path::new(new_dir).join("liftof-config.toml"); // Combine new directory with filename
290  fs::rename(old_path, new_path) // Move the file
291}
292
293pub fn copy_file(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
294  let old_path  = Path::new(old_path);
295  let file_name = old_path.file_name().unwrap().to_str().unwrap(); // Extract filename
296  let new_path  = Path::new(new_dir).join(file_name); // Combine new directory with filename
297  fs::copy(old_path, new_path) 
298}
299
300pub fn copy_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
301  let old_path  = Path::new(old_path);
302  let new_path  = Path::new(new_dir).join("liftof-config.toml"); // Combine new directory with filename
303  fs::copy(old_path, new_path) 
304}
305
306pub fn delete_file(file_path: &str) -> Result<(), std::io::Error> {
307  let path = Path::new(file_path);
308  fs::remove_file(path) // Attempts to delete the file at the given path
309}
310
311/// Copy a config file from the queue to the current and 
312/// next directories and restart liftof-cc. 
313///
314/// As soon as the run is started, prepare the next run
315pub fn run_cycler(staging_dir : String, dry_run : bool) -> Result<(),StagingError> {
316  let queue_dir   = format!("{}/queue", staging_dir);
317  let next_dir    = format!("{}/next",  staging_dir);
318  let current_dir = format!("{}/current", staging_dir);
319
320  let queue   = get_queue(&queue_dir);
321  let current = get_queue(&current_dir);
322  let next    = get_queue(&next_dir); 
323  
324  if current.len() == 0 {
325    // we are f***ed
326    error!("We don't have a current configuration. This is BAD!");
327    return Err(StagingError::NoCurrentConfig);
328  }
329  
330  println!("= => Found {} files in run queue!", queue.len());
331  if next.len() == 0 && queue.len() == 0 {
332    println!("= => Nothing staged, will jusr repeat current run setting!");
333    if !dry_run {
334      manage_liftof_cc_service("restart");
335    }
336    thread::sleep(Duration::from_secs(20));
337    return Ok(());
338  }
339  if next.len() == 0 && queue.len() != 0 {
340    error!("Empty next directory, but we have files in the queue!");
341    match copy_file_rename_liftof(&queue[0], &next_dir) {
342      Ok(_) => (),
343      Err(err) => {
344        error!("Unable to copy {} to {}! {}", next[0], next_dir, err);
345      }
346    }
347    match move_file_rename_liftof(&queue[0], &current_dir) {
348      Ok(_) => (),
349      Err(err) => {
350        error!("Unable to copy {} to {}! {}", queue[0], current_dir, err);
351      }
352    }
353  }
354  if next.len() != 0  {
355    match delete_file(&current[0]) {
356      Ok(_)    => (),
357      Err(err) => {
358        error!("Unable to delete {}! {}", current[0], err);
359      }
360    }
361    match move_file_rename_liftof(&next[0], &current_dir) {
362      Ok(_) => (),
363      Err(err) => {
364        error!("Unable to copy {} to {}! {}", next[0], current_dir, err);
365      }
366    }
367    if queue.len() != 0 {
368      match move_file_with_name(&queue[0], &next_dir) {
369        Ok(_) => (),
370        Err(err) => {
371          error!("Unable to move {} to {}! {}", queue[0], next_dir, err);
372        }
373      }
374    }
375    println!("=> Restarting liftof-cc!");
376    if !dry_run {
377      manage_liftof_cc_service("restart");
378    }
379    thread::sleep(Duration::from_secs(20));
380  }
381  Ok(())
382}
383
384/// Prepare a new folder with the run id
385///
386/// This will assign a run id based on the 
387/// run ids in that folder. Additionally, 
388/// we will copy the current settings into
389/// that folder
390///
391/// TODO - connect it to the run database
392///
393/// # Arguments:
394///
395/// * data_path  : The global path on the inflight 
396///                tof computer where to store data.
397/// * config     : The current configuration, to be 
398///                copied into the new folder
399/// * run_id     : Optionally define a pre-given 
400///                run-id
401/// * create_dir : Create the directory for runfiles
402pub fn prepare_run(data_path  : String,
403                   config     : &LiftofSettings,
404                   run_id     : Option<u32>,
405                   create_dir : bool) -> Option<u32> {
406  let mut stream_files_path = PathBuf::from(data_path);
407  // Unwrap is ok here, since this should only happen at run start.
408  // Also, if the path is wrong, we are going to fail catastrophically
409  // anyway
410  let paths = fs::read_dir(stream_files_path.clone()).unwrap();
411  
412  let mut used_runids = Vec::<u32>::new();
413  for path in paths {
414    // this is hideous, I am so sorry. May the rust gods have mercy on my soul...
415    match format!("{}",path.as_ref().unwrap().path().iter().last().unwrap().to_str().unwrap()).parse::<u32>() {
416      Ok(this_run_id) => {
417        debug!("Extracted run id {}", this_run_id);
418        used_runids.push(this_run_id);
419      },
420      Err(err)        => {
421        warn!("Can not get runid from {}! {}", path.unwrap().path().display(), err);
422      }
423    }
424  }
425  let mut max_run_id = 0u32;
426  match used_runids.iter().max() {
427    None => (),
428    Some(_r) => {
429      max_run_id = *_r;
430    }
431  }
432  println!("=> Found {} used run ids in {}. Largest run id is {}",used_runids.len(), stream_files_path.display(), max_run_id);
433  let new_run_id : Option<u32>;
434  if max_run_id == 0 {
435    // we can use the run)given as the argument
436    // return run_id;
437    new_run_id = run_id;
438  } else if run_id.is_some() {
439    if used_runids.contains(&run_id.unwrap()) {
440      // the assigned run id has been used already
441      error!("Duplicate run id ({})!", run_id.unwrap());
442      //return None;
443      new_run_id = None;
444    } else {
445      //return run_id;
446      new_run_id = run_id;
447    }
448  } else {
449      new_run_id = Some(max_run_id + 1);
450  }
451  // We were not able to assign a new run id
452  if new_run_id.is_none() {
453    return new_run_id;
454  }
455  stream_files_path.push(new_run_id.unwrap().to_string().as_str());
456  if create_dir {
457    // Create directory if it does not exist
458    // Check if the directory exists
459    if let Ok(metadata) = fs::metadata(&stream_files_path) {
460      if metadata.is_dir() {
461        println!("=> Directory {} for run number {} already consists and may contain files!", stream_files_path.display(), new_run_id.unwrap());
462        // FILXME - in flight, we can not have interactivity.
463        // But the whole system with the run ids might change 
464      } 
465    } else {
466      match fs::create_dir(&stream_files_path) {
467        Ok(())   => println!("=> Created {} to save stream data", stream_files_path.display()),
468        Err(err) => panic!("Failed to create directory: {}! {}", stream_files_path.display(), err),
469      }
470    }
471  }
472  let settings_fname = format!("{}/run{}.toml",
473    stream_files_path.display(),
474    new_run_id.unwrap()); 
475  println!("=> Writing data to {}/{}!", stream_files_path.display(), new_run_id.unwrap());
476  println!("=> Writing settings to {}!", settings_fname);
477  config.to_toml(settings_fname);
478  return new_run_id;
479}
480
481
482///// Taka a data run. This can be either verification or physcis
483//pub fn run(with_calibration : bool, verification : bool) {
484//  //prepare_run(data_path  : String,
485//  //                 config     : &LiftofSettings,
486//  //                 run_id     : Option<u32>,
487//  //                 create_dir : bool) -> Option<u32> {
488//  if with_calibration {
489//    //calibrate_tof()
490//  }
491//}
492
493/// Trigger a restart of liftof-cc and start a new run
494///
495///
496/// # Arguments
497///
498///   * mode     : The argument given to the systemd service 
499///                - either "start", "stop", "restart", etc.
500/// # Returns:
501///   * success : true for succes
502pub fn manage_liftof_cc_service(mode : &str) -> TofReturnCode {
503  match Command::new("sudo")
504    .args(["systemctl", mode, "liftof"])
505    .spawn() {
506    Err(err) => {
507      error!("Unable to execute sudo systemctl {} liftof! {}", mode, err);
508      TofReturnCode::GeneralFail
509    }
510    Ok(_) => {
511      println!("=> Executed sudo systemctl {} liftof", mode);
512      TofReturnCode::Success
513    }
514  }
515}
516
517
518/// Trigger a general command on the Readoutboards 
519/// remotly through ssh.
520///
521/// Ssh keys and aliases (e.g. tof-rb02) must be 
522/// set up for this to work
523///
524/// # Arguments:
525///
526///   * rb_list : The list of ReadoutBoard ids the commands
527///               will get executed
528///   * cmd     : The actual command without 'ssh <ip>'
529///
530/// # Returns:
531///
532///   * A list of rb ids where the process failed
533pub fn ssh_command_rbs(rb_list : &Vec<u8>,
534                       cmd     : Vec<String>) -> Result<Vec<u8>, TofError> {
535  let mut rb_handles       = Vec::<thread::JoinHandle<_>>::new();
536  info!("=> Executing ssh command {:?} on {} RBs!", cmd, rb_list.len());
537  let mut children = Vec::<(u8,Child)>::new();
538  for rb in rb_list {
539    // also populate the rb thread nandles
540    rb_handles.push(thread::spawn(||{}));
541    let rb_address   = format!("tof-rb{:02}", rb);
542    let mut ssh_args = vec![rb_address];
543    let mut thisrb_cmd = cmd.clone();
544    ssh_args.append(&mut thisrb_cmd);
545    match Command::new("ssh")
546      //.args([&rb_address, "sudo", "systemctl", "restart", "liftof"])
547      .args(ssh_args)
548      .spawn() {
549      Err(err) => {
550        error!("Unable to spawn ssh process on RB {}! {}", rb, err);
551      }
552      Ok(child) => {
553        children.push((*rb,child));
554      }
555    }
556  }
557  let mut issues = Vec::<u8>::new();
558  for rb_child in &mut children {
559    // this is not optimal, since this will take as much 
560    // time as the slowest child, but at the moment we 
561    // have bigger fish to fry.
562    let timeout = Duration::from_secs(10);
563    let kill_t  = Instant::now();
564    loop {
565      if kill_t.elapsed() > timeout {
566        error!("SSH process for board {} timed out!", rb_child.0);
567        // Duuu hast aber einen schöönen Ball! [M. eine Stadt sucht einen Moerder]
568        match rb_child.1.kill() {
569          Err(err) => {
570            error!("Unable to kill the SSH process for RB {}! {err}", rb_child.0);
571          }
572          Ok(_) => {
573            error!("Killed SSH process for for RB {}", rb_child.0);
574          }
575        }
576        issues.push(rb_child.0);
577        // FIXME
578        break
579      }
580      // non-blocking
581      match rb_child.1.try_wait() {
582        Ok(None) => {
583          // the child is still busy
584          thread::sleep(Duration::from_secs(1));
585          continue;
586        }
587        Ok(Some(status)) => {
588          if status.success() {
589            info!("Execution of command on {} successful!", rb_child.0);
590            break;
591          } else {
592            error!("Execution of command on {} failed with exit code {:?}!", rb_child.0, status.code());
593            issues.push(rb_child.0);
594            break;
595          }
596        }
597        Err(err) => {
598          error!("Unable to wait for the SSH process! {err}");
599          break;
600        }
601      }
602    }
603  }
604  if issues.len() == 0 {
605    println!("=> Executing ssh command {:?} on {} RBs successful!", cmd, rb_list.len());
606  }
607  Ok(issues)
608}
609
610/// Restart liftof-rb on RBs
611pub fn restart_liftof_rb(rb_list : &Vec<u8>) {
612  let command = vec![String::from("sudo"),
613                     String::from("systemctl"),
614                     String::from("restart"),
615                     String::from("liftof")];
616  println!("=> Restarting liftof-rb on RBs!");
617  match ssh_command_rbs(rb_list, command) {
618    Err(err) => error!("Restarting liftof-rb on all RBs failed! {err}"),
619    Ok(_)    => ()
620  }
621}
622
623/// A "verification" run describes an any trigger/track trigger
624/// run which should iluminate the entire tof so that we can do 
625/// a working channel inventory. 
626///
627/// A verification run will not safe data to disk, but instead 
628/// run it through a small analysis engine and count the active 
629/// channels
630pub fn verification_run(timeout        : u32,
631                        tp_to_sink     : Sender<TofPacket>,
632                        thread_control : Arc<Mutex<ThreadControl>>) {
633  let mut write_state : bool = true; // when in doubt, write data to disk
634  let mut config      = LiftofSettings::new();
635  match thread_control.lock() {
636    Ok(mut tc) => {
637      write_state = tc.write_data_to_disk;
638      tc.write_data_to_disk       = false;
639      tc.verification_active      = true;
640      tc.thread_master_trg_active = true;
641      tc.calibration_active       = false;
642      tc.thread_event_bldr_active = true;
643      config = tc.liftof_settings.clone();
644    }
645    Err(err) => {
646      error!("Can't acquire lock for ThreadControl! {err}");
647    },
648  }
649  let one_second  = Duration::from_millis(1000);
650  let runtime     = Instant::now();
651  // technically, it is run_typ, rb_id, event number
652  // all to the max means run start for all
653  // We don't need this - just need to make sure it gets broadcasted
654  let cmd_payload: u32 =  PAD_CMD_32BIT | (255u32) << 16 | (255u32) << 8 | (255u32);
655  let cmd          = TofCommand::DataRunStart(cmd_payload);
656  let packet       = cmd.pack();
657  let mut payload  = String::from("BRCT").into_bytes();
658  payload.append(&mut packet.to_bytestream());
659  
660  // open 0MQ socket here
661  let ctx = zmq::Context::new();
662  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
663  let cc_pub_addr = config.cmd_dispatcher_settings.cc_server_address.clone();
664  cmd_sender.bind(&cc_pub_addr).expect("Unable to bind to (PUB) socket!");
665  // after we opened the socket, give the RBs a chance to connect
666  println!("=> Give the RBs a chance to connect and wait a bit..");
667  thread::sleep(10*one_second);
668  match cmd_sender.send(&payload, 0) {
669    Err(err) => {
670      error!("Unable to send command, error{err}");
671    },
672    Ok(_) => {
673      debug!("We sent {:?}", payload);
674    }
675  }
676  
677  println!("=> Verification run initialized!");
678  // just wait until the run is finisehd
679  loop {
680    if runtime.elapsed().as_secs() > timeout as u64 {
681      break;
682    }
683    thread::sleep(5*one_second);
684  }
685  
686  println!("=> Ending verification run!");
687  println!("=> Sending run termination command to the RBs");
688  let cmd          = TofCommand::DataRunStop(DEFAULT_RB_ID as u32);
689  let packet       = cmd.pack();
690  let mut payload  = String::from("BRCT").into_bytes();
691  payload.append(&mut packet.to_bytestream());
692  
693  warn!("=> No command socket available! Can not shut down RBs..!");
694  // after we opened the socket, give the RBs a chance to connect
695  println!("=> Give the RBs a chance to connect and wait a bit..");
696  thread::sleep(10*one_second);
697  match cmd_sender.send(&payload, 0) {
698    Err(err) => {
699      error!("Unable to send command! {err}");
700    },
701    Ok(_) => {
702      debug!("We sent {:?}", payload);
703    }
704  }
705
706  // move the socket out of here for further use
707  let mut detector_status = TofDetectorStatus::new();
708  match thread_control.lock() {
709    Ok(mut tc) => {
710      tc.write_data_to_disk = write_state;
711      tc.verification_active = false;
712      detector_status = tc.detector_status.clone();
713    },
714    Err(err) => {
715      error!("Can't acquire lock for ThreadControl! {err}");
716    },
717  }
718  println!("=> Acquired TofDetectorStatus!");
719  println!("{}", detector_status);
720  let pack = detector_status.pack();
721  match tp_to_sink.send(pack) {
722    Err(err) => error!("Unable to send TofDetectorStatus to data sink! {err}"),
723    Ok(_)    => ()
724  }
725}
726
727
728/// Run a full tof calibration - RBCalibration
729/// 
730/// The purpose of the RB calibration to is to create a 
731/// relationship between the adc/timing bins and voltages
732/// and nanoseconds
733///
734/// This function is blocking, until a certain (configurable)
735/// timeout is expired. The timeout can be set in the configuration
736/// file
737///
738/// # Argumeents:
739///
740///   * thread_control : general shared memory to hold configuration
741///                      settings, program st ate
742///   * rb_list        : List of active readoutboards
743///   * show_progress  : if true, it will show a progressbar with 
744///                      indicatif
745///
746pub fn calibrate_tof(thread_control : Arc<Mutex<ThreadControl>>,
747                     rb_list        : &Vec<ReadoutBoard>,
748                     show_progress  : bool) {
749
750  let one_second               = Duration::from_millis(1000);
751  let mut cc_pub_addr          = String::from("");
752  let calibration_timeout_fail = Duration::from_secs(300); // in seconds
753 
754  let mut cali_dir_created = false;
755  let mut cali_output_dir  = String::from("");
756  let mut cali_base_dir        = String::from("");
757
758  match thread_control.lock() {
759    Ok(mut tc) => {
760      for rb in rb_list {
761        tc.finished_calibrations.insert(rb.rb_id,false); 
762      }
763      cali_base_dir = tc.liftof_settings.calibration_dir.clone();
764      cc_pub_addr   = tc.liftof_settings.cmd_dispatcher_settings.cc_server_address.clone();
765      tc.write_data_to_disk = true;
766    },
767    Err(err) => {
768      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
769    },
770  }
771
772  // deprecated commanding
773  let voltage_level = DEFAULT_CALIB_VOLTAGE;
774  let rb_id         = DEFAULT_RB_ID;
775  let extra         = DEFAULT_CALIB_EXTRA;
776  println!("=> Received calibration default command! Will init calibration run of all RBs...");
777  let cmd_payload: u32
778    = (voltage_level as u32) << 16 | (rb_id as u32) << 8 | (extra as u32);
779  let default_calib_depr = TofCommand::DefaultCalibration(cmd_payload);
780  let tp_depr = default_calib_depr.pack();
781  let mut payload_depr = String::from("BRCT").into_bytes();
782  payload_depr.append(&mut tp_depr.to_bytestream());
783
784  let mut default_calib      = TofCommandV2::new();
785  default_calib.command_code = TofCommandCode::RBCalibration;
786  let tp                     = default_calib.pack();
787  let mut payload            = String::from("BRCT").into_bytes();
788  payload.append(&mut tp.to_bytestream());
789  // open 0MQ socket here
790  let ctx = zmq::Context::new();
791  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
792
793  cmd_sender.bind(&cc_pub_addr).expect("Unable to bind to (PUB) socket!");
794  println!("=> Give the RBs a chance to connect and wait a bit..");
795  thread::sleep(10*one_second);
796  match cmd_sender.send(&payload_depr, 0) { Err(err) => {
797      error!("Unable to send command! {err}");
798    },
799    Ok(_) => {
800      println!("=> Calibration  initialized!");
801    }
802  }
803  match cmd_sender.send(&payload, 0) { Err(err) => {
804      error!("Unable to send command! {err}");
805    },
806    Ok(_) => {
807      println!("=> Calibration  initialized!");
808    }
809  }
810  match thread_control.lock() {
811    Ok(mut tc) => {
812      // deactivate the master trigger thread
813      tc.thread_master_trg_active =false;
814      tc.calibration_active = true;
815    },
816    Err(err) => {
817      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
818    },
819  }
820
821  let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
822  let bar_style  = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
823  let mut bar    = ProgressBar::hidden();
824  
825  println!("=> .. now we need to wait until the calibration is finished!");
826  if show_progress {
827    bar = ProgressBar::new(rb_list.len() as u64); 
828    bar.set_position(0);
829    let bar_label  = String::from("Acquiring RB calibration data");
830    bar.set_message (bar_label);
831    bar.set_prefix  ("\u{2699}\u{1F4D0}");
832    bar.set_style   (bar_style);
833  }
834
835  // now block until the calibrations are done or we time outu
836  // FIXME - set timeout parameter in settings
837  let timeout = Instant::now();
838  let mut cali_received = 0;
839  'main: loop {
840    thread::sleep(10*one_second);
841    if timeout.elapsed() > calibration_timeout_fail {
842      error!("Calibration timeout! Calibrations might not be complete!");
843      match thread_control.lock() {
844        Ok(mut tc) => {
845          tc.calibration_active = false;
846        }
847        Err(err) => {
848          error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
849        }
850      }
851      if show_progress {
852        bar.finish_with_message("Done");
853      }
854      break;
855    }
856    //let mut rbcali = RBCalibrations::new();
857    match thread_control.lock() {
858      Ok(mut tc) => {
859        for rbid in rb_list {
860          // the global data sink sets these flags
861          let mut finished_keys = Vec::<u8>::new();
862          if tc.stop_flag {
863            println!("Stop signal received, exiting calibration routine!");
864            break 'main;
865          }
866          if tc.finished_calibrations[&rbid.rb_id] {
867            cali_received += 1;
868            let rbcali = tc.calibrations.get(&rbid.rb_id).expect("We got the signal tat this calibration is ready but it is not!");
869            let pack   = rbcali.pack();
870            // See RBCalibration reference
871            let file_type  = FileType::CalibrationFile(rbid.rb_id);
872            //println!("==> Writing stream to file with prefix {}", streamfile_name);
873            //let mut cali_writer = TofPacketWriter::new(write_stream_path.clone(), file_type);
874            if !cali_dir_created {
875              let today           = get_utc_timestamp();
876              cali_output_dir     = format!("{}/{}", cali_base_dir.clone(), today);
877              match create_dir_all(cali_output_dir.clone()) {
878                Ok(_)    => info!("Created {} for calibration data!", cali_output_dir),
879                Err(err) => error!("Unable to create {} for calibration data! {}", cali_output_dir, err)
880              }
881              cali_dir_created = true;
882            }
883            let mut cali_writer = TofPacketWriter::new(cali_output_dir.clone(), file_type);
884            cali_writer.add_tof_packet(&pack);
885            drop(cali_writer);
886
887            bar.set_position(cali_received);
888            finished_keys.push(rbid.rb_id);
889          }
890          for rbid in &finished_keys {
891            *tc.finished_calibrations.get_mut(&rbid).unwrap() = false; 
892          }
893        }
894        // FIXME - this or a timer
895        if cali_received as usize == rb_list.len() {
896          // cali_received = 0;
897          // if we want to redo a calibration, 
898          // somebody else has to set this 
899          // flag again.
900          tc.calibration_active = false;
901          // reset the counters
902          for rbid in rb_list {
903            *tc.finished_calibrations.get_mut(&rbid.rb_id).unwrap() = false; 
904          }
905          if show_progress {
906            bar.finish_with_message("Done");
907          }
908          break;
909        }
910      }
911      Err(err) => {
912        error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
913      }
914    }
915  } // end loop
916  // The last step is to create te symlink
917  let cali_link_dir = cali_base_dir.clone() + "latest";
918  match fs::remove_file(cali_link_dir.clone()) {
919    Ok(_) => {
920      println!("=> Symlink {} removed!", cali_link_dir);
921    },
922    Err(err) => {
923      error!("Unable to remove symlink to latest calibrations! {err}");
924    }
925  }
926  println!("=> Will create symlink {}", cali_link_dir);
927  match symlink(cali_output_dir, cali_link_dir) {
928    Err(err) => error!("Unable to create symlink for calibration data! {err}"),
929    Ok(_)    => ()
930  }
931}