liftof_cc/
lib.rs

1#[macro_use] extern crate log;
2
3pub mod constants;
4pub mod threads;
5
6use std::fs;
7use std::path::{
8  PathBuf,
9  Path
10};
11
12use std::collections::HashMap;
13use std::os::unix::fs::symlink;
14use std::sync::{
15  Arc,
16  Mutex,
17};
18
19use std::process::{
20  Command,
21  Child,
22};
23
24use std::thread;
25use std::fs::create_dir_all;
26
27use std::time::{
28  Duration,
29  Instant,
30};
31
32use crossbeam_channel::Sender;
33
34use indicatif::{
35  ProgressBar,
36  ProgressStyle
37};
38
39use comfy_table::modifiers::{
40  UTF8_ROUND_CORNERS,
41  UTF8_SOLID_INNER_BORDERS,
42};
43
44use comfy_table::presets::UTF8_FULL;
45use comfy_table::*;
46
47use gondola_core::prelude::*;
48
49/// communicaton between liftof-scheduler and 
50/// liftof-cc
51pub const LIFTOF_HOTWIRE : &str = "tcp://127.0.0.1:54321";
52
53/// Produce a nicely formattable table with per RB information for scalar
54/// values
55pub fn rb_table(counters : &HashMap<u8, u64>, label_is_hz : bool) -> Table {
56  let mut unit = "";
57  if label_is_hz {
58    unit = "Hz"
59  }
60  let mut table = Table::new();
61  table
62    .load_preset(UTF8_FULL)
63    .apply_modifier(UTF8_ROUND_CORNERS)
64    .apply_modifier(UTF8_SOLID_INNER_BORDERS)
65    .set_content_arrangement(ContentArrangement::Dynamic)
66    .set_width(80)
67    //.set_header(vec!["Readoutboard Rates:"])
68    .add_row(vec![
69        Cell::new(&(format!("RB01 {:.1} {}", counters[&1], unit))),
70        Cell::new(&(format!("RB02 {:.1} {}", counters[&2], unit))),
71        Cell::new(&(format!("RB03 {:.1} {}", counters[&3], unit))),
72        Cell::new(&(format!("RB04 {:.1} {}", counters[&4], unit))),
73        Cell::new(&(format!("RB05 {:.1} {}", counters[&5], unit))),
74        //Cell::new("Center aligned").set_alignment(CellAlignment::Center),
75    ])
76    .add_row(vec![
77        Cell::new(&(format!("RB06 {:.1} {}", counters[&6], unit))),
78        Cell::new(&(format!("RB07 {:.1} {}", counters[&7], unit))),
79        Cell::new(&(format!("RB08 {:.1} {}", counters[&8], unit))),
80        Cell::new(&(format!("RB09 {:.1} {}", counters[&9], unit))),
81        Cell::new(&(format!("RB10 {}", "N.A."))),
82    ])
83    .add_row(vec![
84        Cell::new(&(format!("RB11 {:.1} Hz", counters[&11]))),
85        Cell::new(&(format!("RB12 {}", "N.A."))),
86        Cell::new(&(format!("RB13 {:.1} Hz", counters[&13]))),
87        Cell::new(&(format!("RB14 {:.1} Hz", counters[&14]))),
88        Cell::new(&(format!("RB15 {:.1} Hz", counters[&15]))),
89    ])
90    .add_row(vec![
91        Cell::new(&(format!("RB16 {:.1} Hz", counters[&16]))),
92        Cell::new(&(format!("RB17 {:.1} Hz", counters[&17]))),
93        Cell::new(&(format!("RB18 {:.1} Hz", counters[&18]))),
94        Cell::new(&(format!("RB19 {:.1} Hz", counters[&19]))),
95        Cell::new(&(format!("RB20 {:.1} Hz", counters[&20]))),
96    ])
97    .add_row(vec![
98        Cell::new(&(format!("RB21 {:.1} Hz", counters[&21]))),
99        Cell::new(&(format!("RB22 {:.1} Hz", counters[&22]))),
100        Cell::new(&(format!("RB23 {:.1} Hz", counters[&23]))),
101        Cell::new(&(format!("RB24 {:.1} Hz", counters[&24]))),
102        Cell::new(&(format!("RB25 {:.1} Hz", counters[&25]))),
103    ])
104    .add_row(vec![
105        Cell::new(&(format!("RB26 {:.1} Hz", counters[&26]))),
106        Cell::new(&(format!("RB27 {:.1} Hz", counters[&27]))),
107        Cell::new(&(format!("RB28 {:.1} Hz", counters[&28]))),
108        Cell::new(&(format!("RB29 {:.1} Hz", counters[&29]))),
109        Cell::new(&(format!("RB30 {:.1} Hz", counters[&30]))),
110    ])
111    .add_row(vec![
112        Cell::new(&(format!("RB31 {:.1} Hz", counters[&31]))),
113        Cell::new(&(format!("RB32 {:.1} Hz", counters[&32]))),
114        Cell::new(&(format!("RB33 {:.1} Hz", counters[&33]))),
115        Cell::new(&(format!("RB34 {:.1} Hz", counters[&34]))),
116        Cell::new(&(format!("RB35 {:.1} Hz", counters[&35]))),
117    ])
118    .add_row(vec![
119        Cell::new(&(format!("RB36 {:.1}", counters[&36]))),
120        Cell::new(&(format!("RB37 {}", "N.A."))),
121        Cell::new(&(format!("RB38 {}", "N.A."))),
122        Cell::new(&(format!("RB39 {:.1}", counters[&39]))),
123        Cell::new(&(format!("RB40 {:.1}", counters[&40]))),
124    ])
125    .add_row(vec![
126        Cell::new(&(format!("RB41 {:.1}", counters[&41]))),
127        Cell::new(&(format!("RB43 {:.1}", counters[&42]))),
128        Cell::new(&(format!("RB42 {}", "N.A."))),
129        Cell::new(&(format!("RB44 {:.1}", counters[&44]))),
130        Cell::new(&(format!("RB45 {}", "N.A."))),
131    ])
132    .add_row(vec![
133        Cell::new(&(format!("RB46 {:.1} Hz", counters[&46]))),
134        Cell::new(&(format!("{}", "N.A."))),
135        Cell::new(&(format!("{}", "N.A."))),
136        Cell::new(&(format!("{}", "N.A."))),
137        Cell::new(&(format!("{}", "N.A."))),
138    ]);
139  table
140}
141
142/// Regular run start sequence
143pub fn init_run_start(cc_pub_addr : &str) {
144  let one_second   = Duration::from_secs(1);
145  // deprecated way of sending commands, however as long as we might have RBS
146  // with old software, we do want to send the "old style" as well
147  //let cmd_payload  = PAD_CMD_32BIT | (255u32) << 16 | (255u32) << 8 | (255u32);
148  //let cmd_depr     = TofCommand::DataRunStart(cmd_payload);
149  //let packet_depr  = cmd_depr.pack();
150  //let mut payload_depr = String::from("BRCT").into_bytes();
151  //payload_depr.append(&mut packet_depr.to_bytestream());
152  
153  let mut cmd      = TofCommand::new();
154  cmd.command_code = TofCommandCode::DataRunStart;
155  let packet       = cmd.pack();
156  let mut payload  = String::from("BRCT").into_bytes();
157  payload.append(&mut packet.to_bytestream());
158  
159  // open 0MQ socket here
160  let ctx         = zmq::Context::new();
161  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
162  cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
163  // after we opened the socket, give the RBs a chance to connect
164  println!("=> Sending run start command to RBs ..");
165  for _ in 0..10 {
166    thread::sleep(one_second);
167    print!("..");
168  }
169  //// send old and new commands
170  //match cmd_sender.send(&payload_depr, 0) {
171  //  Err(err) => {
172  //    error!("Unable to send command! {err}");
173  //  },
174  //  Ok(_) => {
175  //    debug!("We sent {:?}", payload);
176  //  }
177  //}
178  match cmd_sender.send(&payload, 0) {
179    Err(err) => {
180      error!("Unable to send command! {err}");
181    },
182    Ok(_) => {
183      debug!("We sent {:?}", payload);
184    }
185  }
186  print!("done!\n");
187}
188
189/// Regular run stop sequence
190pub fn end_run(cc_pub_addr : &str) {
191  //let cmd_depr     = TofCommand::DataRunStop(DEFAULT_RB_ID as u32);
192  //let packet_depr  = cmd_depr.pack();
193  //let mut payload_depr = String::from("BRCT").into_bytes();
194  //payload_depr.append(&mut packet_depr.to_bytestream());
195
196  let mut cmd      = TofCommand::new();
197  cmd.command_code = TofCommandCode::DataRunStop;
198  let packet       = cmd.pack();
199  let mut payload  = String::from("BRCT").into_bytes();
200  payload.append(&mut packet.to_bytestream());
201  let ctx         = zmq::Context::new();
202  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
203  cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
204  // after we opened the socket, give the RBs a chance to connect
205  println!("=> Sending run stop command to all RBs...");
206  println!("=> Waiting for RBs to stoop data acquisition..");
207  for _ in 0..10 {
208    print!("..");
209  }
210  //match cmd_sender.send(&payload_depr, 0) {
211  //  Err(err) => {
212  //    error!("Unable to send command! {err}");
213  //  },
214  //  Ok(_) => {
215  //    debug!("We sent {:?}", payload);
216  //  }
217  //}
218  match cmd_sender.send(&payload, 0) {
219    Err(err) => {
220      error!("Unable to send command! {err}");
221    },
222    Ok(_) => {
223      debug!("We sent {:?}", payload);
224    }
225  }
226  print!("..done!\n");
227}
228
229/// Get the files in the queue and sort them by number
230pub fn get_queue(dir_path : &String) -> Vec<String> {
231  let mut entries = fs::read_dir(dir_path)
232    .expect("Directory might not exist!")
233    .map(|entry| entry.unwrap().path())
234    .collect::<Vec<PathBuf>>();
235  entries.sort_by(|a, b| {
236    let meta_a = fs::metadata(a).unwrap();
237    let meta_b = fs::metadata(b).unwrap();
238    meta_a.modified().unwrap().cmp(&meta_b.modified().unwrap())
239  });
240  entries.iter()
241    .map(|path| path.to_str().unwrap().to_string())
242    .collect()
243}
244
245pub fn move_file_with_name(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
246  let old_path  = Path::new(old_path);
247  let file_name = old_path.file_name().unwrap().to_str().unwrap(); // Extract filename
248  let new_path  = Path::new(new_dir).join(file_name); // Combine new directory with filename
249  fs::rename(old_path, new_path) // Move the file
250}
251
252pub fn move_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
253  let old_path  = Path::new(old_path);
254  let new_path  = Path::new(new_dir).join("liftof-config.toml"); // Combine new directory with filename
255  fs::rename(old_path, new_path) // Move the file
256}
257
258pub fn copy_file(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
259  let old_path  = Path::new(old_path);
260  let file_name = old_path.file_name().unwrap().to_str().unwrap(); // Extract filename
261  let new_path  = Path::new(new_dir).join(file_name); // Combine new directory with filename
262  fs::copy(old_path, new_path) 
263}
264
265pub fn copy_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
266  let old_path  = Path::new(old_path);
267  let new_path  = Path::new(new_dir).join("liftof-config.toml"); // Combine new directory with filename
268  fs::copy(old_path, new_path) 
269}
270
271pub fn delete_file(file_path: &str) -> Result<(), std::io::Error> {
272  let path = Path::new(file_path);
273  fs::remove_file(path) // Attempts to delete the file at the given path
274}
275
276/// Copy a config file from the queue to the current and 
277/// next directories and restart liftof-cc. 
278///
279/// As soon as the run is started, prepare the next run
280pub fn run_cycler(staging_dir : String, dry_run : bool) -> Result<(),StagingError> {
281  let queue_dir   = format!("{}/queue", staging_dir);
282  let next_dir    = format!("{}/next",  staging_dir);
283  let current_dir = format!("{}/current", staging_dir);
284
285  let queue   = get_queue(&queue_dir);
286  let current = get_queue(&current_dir);
287  let next    = get_queue(&next_dir); 
288  
289  if current.len() == 0 {
290    // we are f***ed
291    error!("We don't have a current configuration. This is BAD!");
292    return Err(StagingError::NoCurrentConfig);
293  }
294  
295  println!("= => Found {} files in run queue!", queue.len());
296  if next.len() == 0 && queue.len() == 0 {
297    println!("= => Nothing staged, will jusr repeat current run setting!");
298    if !dry_run {
299      manage_liftof_cc_service("restart");
300    }
301    thread::sleep(Duration::from_secs(20));
302    return Ok(());
303  }
304  if next.len() == 0 && queue.len() != 0 {
305    error!("Empty next directory, but we have files in the queue!");
306    match copy_file_rename_liftof(&queue[0], &next_dir) {
307      Ok(_) => (),
308      Err(err) => {
309        error!("Unable to copy {} to {}! {}", next[0], next_dir, err);
310      }
311    }
312    match move_file_rename_liftof(&queue[0], &current_dir) {
313      Ok(_) => (),
314      Err(err) => {
315        error!("Unable to copy {} to {}! {}", queue[0], current_dir, err);
316      }
317    }
318  }
319  if next.len() != 0  {
320    match delete_file(&current[0]) {
321      Ok(_)    => (),
322      Err(err) => {
323        error!("Unable to delete {}! {}", current[0], err);
324      }
325    }
326    match move_file_rename_liftof(&next[0], &current_dir) {
327      Ok(_) => (),
328      Err(err) => {
329        error!("Unable to copy {} to {}! {}", next[0], current_dir, err);
330      }
331    }
332    if queue.len() != 0 {
333      match move_file_with_name(&queue[0], &next_dir) {
334        Ok(_) => (),
335        Err(err) => {
336          error!("Unable to move {} to {}! {}", queue[0], next_dir, err);
337        }
338      }
339    }
340    println!("=> Restarting liftof-cc!");
341    if !dry_run {
342      manage_liftof_cc_service("restart");
343    }
344    thread::sleep(Duration::from_secs(20));
345  }
346  Ok(())
347}
348
349/// Prepare a new folder with the run id
350///
351/// This will assign a run id based on the 
352/// run ids in that folder. Additionally, 
353/// we will copy the current settings into
354/// that folder
355///
356/// TODO - connect it to the run database
357///
358/// # Arguments:
359///
360/// * data_path  : The global path on the inflight 
361///                tof computer where to store data.
362/// * config     : The current configuration, to be 
363///                copied into the new folder
364/// * run_id     : Optionally define a pre-given 
365///                run-id
366/// * create_dir : Create the directory for runfiles
367pub fn prepare_run(data_path  : String,
368                   config     : &LiftofSettings,
369                   run_id     : Option<u32>,
370                   create_dir : bool) -> Option<u32> {
371  let mut stream_files_path = PathBuf::from(data_path);
372  // Unwrap is ok here, since this should only happen at run start.
373  // Also, if the path is wrong, we are going to fail catastrophically
374  // anyway
375  let paths = fs::read_dir(stream_files_path.clone()).unwrap();
376  
377  let mut used_runids = Vec::<u32>::new();
378  for path in paths {
379    // this is hideous, I am so sorry. May the rust gods have mercy on my soul...
380    match format!("{}",path.as_ref().unwrap().path().iter().last().unwrap().to_str().unwrap()).parse::<u32>() {
381      Ok(this_run_id) => {
382        debug!("Extracted run id {}", this_run_id);
383        used_runids.push(this_run_id);
384      },
385      Err(err)        => {
386        warn!("Can not get runid from {}! {}", path.unwrap().path().display(), err);
387      }
388    }
389  }
390  let mut max_run_id = 0u32;
391  match used_runids.iter().max() {
392    None => (),
393    Some(_r) => {
394      max_run_id = *_r;
395    }
396  }
397  println!("=> Found {} used run ids in {}. Largest run id is {}",used_runids.len(), stream_files_path.display(), max_run_id);
398  let new_run_id : Option<u32>;
399  if max_run_id == 0 {
400    // we can use the run)given as the argument
401    // return run_id;
402    new_run_id = run_id;
403  } else if run_id.is_some() {
404    if used_runids.contains(&run_id.unwrap()) {
405      // the assigned run id has been used already
406      error!("Duplicate run id ({})!", run_id.unwrap());
407      //return None;
408      new_run_id = None;
409    } else {
410      //return run_id;
411      new_run_id = run_id;
412    }
413  } else {
414      new_run_id = Some(max_run_id + 1);
415  }
416  // We were not able to assign a new run id
417  if new_run_id.is_none() {
418    return new_run_id;
419  }
420  stream_files_path.push(new_run_id.unwrap().to_string().as_str());
421  if create_dir {
422    // Create directory if it does not exist
423    // Check if the directory exists
424    if let Ok(metadata) = fs::metadata(&stream_files_path) {
425      if metadata.is_dir() {
426        println!("=> Directory {} for run number {} already consists and may contain files!", stream_files_path.display(), new_run_id.unwrap());
427        // FILXME - in flight, we can not have interactivity.
428        // But the whole system with the run ids might change 
429      } 
430    } else {
431      match fs::create_dir(&stream_files_path) {
432        Ok(())   => println!("=> Created {} to save stream data", stream_files_path.display()),
433        Err(err) => panic!("Failed to create directory: {}! {}", stream_files_path.display(), err),
434      }
435    }
436  }
437  let settings_fname = format!("{}/run{}.toml",
438    stream_files_path.display(),
439    new_run_id.unwrap()); 
440  println!("=> Writing data to {}/{}!", stream_files_path.display(), new_run_id.unwrap());
441  println!("=> Writing settings to {}!", settings_fname);
442  config.to_toml(settings_fname);
443  return new_run_id;
444}
445
446
447///// Taka a data run. This can be either verification or physcis
448//pub fn run(with_calibration : bool, verification : bool) {
449//  //prepare_run(data_path  : String,
450//  //                 config     : &LiftofSettings,
451//  //                 run_id     : Option<u32>,
452//  //                 create_dir : bool) -> Option<u32> {
453//  if with_calibration {
454//    //calibrate_tof()
455//  }
456//}
457
458/// Trigger a restart of liftof-cc and start a new run
459///
460///
461/// # Arguments
462///
463///   * mode     : The argument given to the systemd service 
464///                - either "start", "stop", "restart", etc.
465/// # Returns:
466///   * success : true for succes
467pub fn manage_liftof_cc_service(mode : &str) -> TofReturnCode {
468  match Command::new("sudo")
469    .args(["systemctl", mode, "liftof"])
470    .spawn() {
471    Err(err) => {
472      error!("Unable to execute sudo systemctl {} liftof! {}", mode, err);
473      TofReturnCode::GeneralFail
474    }
475    Ok(_) => {
476      println!("=> Executed sudo systemctl {} liftof", mode);
477      TofReturnCode::Success
478    }
479  }
480}
481
482
483/// Trigger a general command on the Readoutboards 
484/// remotly through ssh.
485///
486/// Ssh keys and aliases (e.g. tof-rb02) must be 
487/// set up for this to work
488///
489/// # Arguments:
490///
491///   * rb_list : The list of ReadoutBoard ids the commands
492///               will get executed
493///   * cmd     : The actual command without 'ssh \<ip\>'
494///
495/// # Returns:
496///
497///   * A list of rb ids where the process failed
498pub fn ssh_command_rbs(rb_list : &Vec<u8>,
499                       cmd     : Vec<String>) -> Result<Vec<u8>, TofError> {
500  let mut rb_handles       = Vec::<thread::JoinHandle<_>>::new();
501  info!("=> Executing ssh command {:?} on {} RBs!", cmd, rb_list.len());
502  let mut children = Vec::<(u8,Child)>::new();
503  for rb in rb_list {
504    // also populate the rb thread nandles
505    rb_handles.push(thread::spawn(||{}));
506    let rb_address   = format!("tof-rb{:02}", rb);
507    let mut ssh_args = vec![rb_address];
508    let mut thisrb_cmd = cmd.clone();
509    ssh_args.append(&mut thisrb_cmd);
510    match Command::new("ssh")
511      //.args([&rb_address, "sudo", "systemctl", "restart", "liftof"])
512      .args(ssh_args)
513      .spawn() {
514      Err(err) => {
515        error!("Unable to spawn ssh process on RB {}! {}", rb, err);
516      }
517      Ok(child) => {
518        children.push((*rb,child));
519      }
520    }
521  }
522  let mut issues = Vec::<u8>::new();
523  for rb_child in &mut children {
524    // this is not optimal, since this will take as much 
525    // time as the slowest child, but at the moment we 
526    // have bigger fish to fry.
527    let timeout = Duration::from_secs(10);
528    let kill_t  = Instant::now();
529    loop {
530      if kill_t.elapsed() > timeout {
531        error!("SSH process for board {} timed out!", rb_child.0);
532        // Duuu hast aber einen schöönen Ball! [M. eine Stadt sucht einen Moerder]
533        match rb_child.1.kill() {
534          Err(err) => {
535            error!("Unable to kill the SSH process for RB {}! {err}", rb_child.0);
536          }
537          Ok(_) => {
538            error!("Killed SSH process for for RB {}", rb_child.0);
539          }
540        }
541        issues.push(rb_child.0);
542        // FIXME
543        break
544      }
545      // non-blocking
546      match rb_child.1.try_wait() {
547        Ok(None) => {
548          // the child is still busy
549          thread::sleep(Duration::from_secs(1));
550          continue;
551        }
552        Ok(Some(status)) => {
553          if status.success() {
554            info!("Execution of command on {} successful!", rb_child.0);
555            break;
556          } else {
557            error!("Execution of command on {} failed with exit code {:?}!", rb_child.0, status.code());
558            issues.push(rb_child.0);
559            break;
560          }
561        }
562        Err(err) => {
563          error!("Unable to wait for the SSH process! {err}");
564          break;
565        }
566      }
567    }
568  }
569  if issues.len() == 0 {
570    println!("=> Executing ssh command {:?} on {} RBs successful!", cmd, rb_list.len());
571  }
572  Ok(issues)
573}
574
575/// Restart liftof-rb on RBs
576pub fn restart_liftof_rb(rb_list : &Vec<u8>) {
577  let command = vec![String::from("sudo"),
578                     String::from("systemctl"),
579                     String::from("restart"),
580                     String::from("liftof")];
581  println!("=> Restarting liftof-rb on RBs!");
582  match ssh_command_rbs(rb_list, command) {
583    Err(err) => error!("Restarting liftof-rb on all RBs failed! {err}"),
584    Ok(_)    => ()
585  }
586}
587
588/// A "verification" run describes an any trigger/track trigger
589/// run which should iluminate the entire tof so that we can do 
590/// a working channel inventory. 
591///
592/// A verification run will not safe data to disk, but instead 
593/// run it through a small analysis engine and count the active 
594/// channels
595///
596/// # Arguments:
597///   * show_progress  : if true, it will show a progressbar with 
598///                      indicatif
599///
600pub fn verification_run(timeout        : u32,
601                        tp_to_sink     : Sender<TofPacket>,
602                        thread_control : Arc<Mutex<ThreadControl>>,
603                        show_progress  : bool) {
604  let mut write_state : bool = true; // when in doubt, write data to disk
605  let mut config      = LiftofSettings::new();
606  match thread_control.lock() {
607    Ok(mut tc) => {
608      write_state = tc.write_data_to_disk;
609      tc.write_data_to_disk       = false;
610      tc.verification_active      = true;
611      tc.thread_master_trg_active = true;
612      tc.calibration_active       = false;
613      tc.thread_event_bldr_active = true;
614      config = tc.liftof_settings.clone();
615    }
616    Err(err) => {
617      error!("Can't acquire lock for ThreadControl! {err}");
618    },
619  }
620  let one_second  = Duration::from_millis(1000);
621  let runtime     = Instant::now();
622  // technically, it is run_typ, rb_id, event number
623  // all to the max means run start for all
624  // We don't need this - just need to make sure it gets broadcasted
625  //let cmd_payload: u32 =  PAD_CMD_32BIT | (255u32) << 16 | (255u32) << 8 | (255u32);
626  //let cmd          = TofCommand::DataRunStart(cmd_payload);
627  //let packet       = cmd.pack();
628  //let mut payload  = String::from("BRCT").into_bytes();
629  //payload.append(&mut packet.to_bytestream());
630  let mut cmd      = TofCommand::new();
631  cmd.command_code = TofCommandCode::DataRunStart;
632  let packet       = cmd.pack();
633  let mut payload  = String::from("BRCT").into_bytes();
634  payload.append(&mut packet.to_bytestream());
635  
636  // open 0MQ socket here
637  let ctx = zmq::Context::new();
638  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
639  let cc_pub_addr = config.cmd_dispatcher_settings.cc_server_address.clone();
640  cmd_sender.bind(&cc_pub_addr).expect("Unable to bind to (PUB) socket!");
641  // after we opened the socket, give the RBs a chance to connect
642  println!("=> Give the RBs a chance to connect and wait a bit..");
643  thread::sleep(10*one_second);
644  match cmd_sender.send(&payload, 0) {
645    Err(err) => {
646      error!("Unable to send command, error{err}");
647    },
648    Ok(_) => {
649      debug!("We sent {:?}", payload);
650    }
651  }
652  
653  println!("=> Verification run initialized!");
654  let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
655  let bar_style  = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
656  let mut bar    = ProgressBar::hidden();
657  // just wait until the run is finisehd
658  if show_progress {
659    bar = ProgressBar::new(timeout as u64); 
660    bar.set_position(0);
661    let bar_label  = String::from("Performing verification run");
662    bar.set_message (bar_label);
663    bar.set_prefix  ("\u{2699}\u{1F4D0}");
664    bar.set_style   (bar_style);
665  }
666  loop {
667    if runtime.elapsed().as_secs() >= timeout as u64 {
668      break;
669    }
670    thread::sleep(1*one_second);
671    bar.set_position(runtime.elapsed().as_secs());
672  }
673  if show_progress {
674    bar.finish_with_message("Done");
675  }
676  
677  println!("=> Ending verification run!");
678  println!("=> Sending run termination command to the RBs");
679  let mut cmd      = TofCommand::new();
680  cmd.command_code = TofCommandCode::DataRunStop;
681  let packet       = cmd.pack();
682  let mut payload  = String::from("BRCT").into_bytes();
683  payload.append(&mut packet.to_bytestream());
684  //let cmd          = TofCommand::DataRunStop(DEFAULT_RB_ID as u32);
685  //let packet       = cmd.pack();
686  //let mut payload  = String::from("BRCT").into_bytes();
687  //payload.append(&mut packet.to_bytestream());
688  
689  warn!("=> No command socket available! Can not shut down RBs..!");
690  // after we opened the socket, give the RBs a chance to connect
691  println!("=> Give the RBs a chance to connect and wait a bit..");
692  thread::sleep(10*one_second);
693  match cmd_sender.send(&payload, 0) {
694    Err(err) => {
695      error!("Unable to send command! {err}");
696    },
697    Ok(_) => {
698      debug!("We sent {:?}", payload);
699    }
700  }
701
702  // move the socket out of here for further use
703  let mut detector_status = TofDetectorStatus::new();
704  match thread_control.lock() {
705    Ok(mut tc) => {
706      tc.write_data_to_disk = write_state;
707      tc.verification_active = false;
708      detector_status = tc.detector_status.clone();
709    },
710    Err(err) => {
711      error!("Can't acquire lock for ThreadControl! {err}");
712    },
713  }
714  println!("=> Acquired TofDetectorStatus!");
715  println!("{}", detector_status);
716  let pack = detector_status.pack();
717  match tp_to_sink.send(pack) {
718    Err(err) => error!("Unable to send TofDetectorStatus to data sink! {err}"),
719    Ok(_)    => ()
720  }
721}
722
723
724/// Run a full tof calibration - RBCalibration
725/// 
726/// The purpose of the RB calibration to is to create a 
727/// relationship between the adc/timing bins and voltages
728/// and nanoseconds
729///
730/// This function is blocking, until a certain (configurable)
731/// timeout is expired. The timeout can be set in the configuration
732/// file
733///
734/// # Argumeents:
735///
736///   * thread_control : general shared memory to hold configuration
737///                      settings, program st ate
738///   * rb_list        : List of active readoutboards
739///   * show_progress  : if true, it will show a progressbar with 
740///                      indicatif
741///
742pub fn calibrate_tof(thread_control : Arc<Mutex<ThreadControl>>,
743                     rb_list        : &Vec<ReadoutBoard>,
744                     show_progress  : bool) {
745
746  let one_second               = Duration::from_millis(1000);
747  let mut cc_pub_addr          = String::from("");
748  let calibration_timeout_fail = Duration::from_secs(300); // in seconds
749 
750  let mut cali_dir_created = false;
751  let mut cali_output_dir  = String::from("");
752  let mut cali_base_dir        = String::from("");
753
754  match thread_control.lock() {
755    Ok(mut tc) => {
756      for rb in rb_list {
757        tc.finished_calibrations.insert(rb.rb_id,false); 
758      }
759      cali_base_dir = tc.liftof_settings.calibration_dir.clone();
760      cc_pub_addr   = tc.liftof_settings.cmd_dispatcher_settings.cc_server_address.clone();
761      tc.write_data_to_disk = true;
762    },
763    Err(err) => {
764      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
765    },
766  }
767
768  // deprecated commanding
769  //let voltage_level = DEFAULT_CALIB_VOLTAGE;
770  //let rb_id         = DEFAULT_RB_ID;
771  //let extra         = DEFAULT_CALIB_EXTRA;
772  //println!("=> Received calibration default command! Will init calibration run of all RBs...");
773  //let cmd_payload: u32
774  //  = (voltage_level as u32) << 16 | (rb_id as u32) << 8 | (extra as u32);
775  //let default_calib_depr = TofCommand::DefaultCalibration(cmd_payload);
776  //let tp_depr = default_calib_depr.pack();
777  //let mut payload_depr = String::from("BRCT").into_bytes();
778  //payload_depr.append(&mut tp_depr.to_bytestream());
779
780  let mut default_calib      = TofCommand::new();
781  default_calib.command_code = TofCommandCode::RBCalibration;
782  let tp                     = default_calib.pack();
783  let mut payload            = String::from("BRCT").into_bytes();
784  payload.append(&mut tp.to_bytestream());
785  // open 0MQ socket here
786  let ctx = zmq::Context::new();
787  let cmd_sender  = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
788
789  cmd_sender.bind(&cc_pub_addr).expect("Unable to bind to (PUB) socket!");
790  println!("=> Give the RBs a chance to connect and wait a bit..");
791  thread::sleep(10*one_second);
792  //match cmd_sender.send(&payload_depr, 0) { Err(err) => {
793  //    error!("Unable to send command! {err}");
794  //  },
795  //  Ok(_) => {
796  //    println!("=> Calibration  initialized!");
797  //  }
798  //}
799  match cmd_sender.send(&payload, 0) { Err(err) => {
800      error!("Unable to send command! {err}");
801    },
802    Ok(_) => {
803      println!("=> Calibration  initialized!");
804    }
805  }
806  match thread_control.lock() {
807    Ok(mut tc) => {
808      // deactivate the master trigger thread
809      tc.thread_master_trg_active =false;
810      tc.calibration_active = true;
811    },
812    Err(err) => {
813      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
814    },
815  }
816
817  println!("=> .. now we need to wait until the calibration is finished!");
818  let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
819  let bar_style  = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
820  let mut bar    = ProgressBar::hidden();
821  if show_progress {
822    bar = ProgressBar::new(rb_list.len() as u64); 
823    bar.set_position(0);
824    let bar_label  = String::from("Acquiring RB calibration data");
825    bar.set_message (bar_label);
826    bar.set_prefix  ("\u{2699}\u{1F4D0}");
827    bar.set_style   (bar_style);
828  }
829
830  // now block until the calibrations are done or we time outu
831  // FIXME - set timeout parameter in settings
832  let timeout = Instant::now();
833  let mut cali_received = 0;
834  'main: loop {
835    thread::sleep(10*one_second);
836    if timeout.elapsed() > calibration_timeout_fail {
837      error!("Calibration timeout! Calibrations might not be complete!");
838      match thread_control.lock() {
839        Ok(mut tc) => {
840          tc.calibration_active = false;
841        }
842        Err(err) => {
843          error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
844        }
845      }
846      if show_progress {
847        bar.finish_with_message("Done");
848      }
849      break;
850    }
851    //let mut rbcali = RBCalibrations::new();
852    match thread_control.lock() {
853      Ok(mut tc) => {
854        for rbid in rb_list {
855          // the global data sink sets these flags
856          let mut finished_keys = Vec::<u8>::new();
857          if tc.stop_flag {
858            println!("Stop signal received, exiting calibration routine!");
859            break 'main;
860          }
861          if tc.finished_calibrations[&rbid.rb_id] {
862            cali_received += 1;
863            let rbcali = tc.calibrations.get(&rbid.rb_id).expect("We got the signal tat this calibration is ready but it is not!");
864            let pack   = rbcali.pack();
865            // See RBCalibration reference
866            let file_type  = FileType::CalibrationFile(rbid.rb_id);
867            //println!("==> Writing stream to file with prefix {}", streamfile_name);
868            //let mut cali_writer = TofPacketWriter::new(write_stream_path.clone(), file_type);
869            if !cali_dir_created {
870              let today           = get_utc_timestamp();
871              cali_output_dir     = format!("{}/{}", cali_base_dir.clone(), today);
872              match create_dir_all(cali_output_dir.clone()) {
873                Ok(_)    => info!("Created {} for calibration data!", cali_output_dir),
874                Err(err) => error!("Unable to create {} for calibration data! {}", cali_output_dir, err)
875              }
876              cali_dir_created = true;
877            }
878            let mut cali_writer = TofPacketWriter::new(cali_output_dir.clone(), file_type);
879            cali_writer.add_tof_packet(&pack);
880            drop(cali_writer);
881
882            bar.set_position(cali_received);
883            finished_keys.push(rbid.rb_id);
884          }
885          for rbid in &finished_keys {
886            *tc.finished_calibrations.get_mut(&rbid).unwrap() = false; 
887          }
888        }
889        // FIXME - this or a timer
890        if cali_received as usize == rb_list.len() {
891          // cali_received = 0;
892          // if we want to redo a calibration, 
893          // somebody else has to set this 
894          // flag again.
895          tc.calibration_active = false;
896          // reset the counters
897          for rbid in rb_list {
898            *tc.finished_calibrations.get_mut(&rbid.rb_id).unwrap() = false; 
899          }
900          if show_progress {
901            bar.finish_with_message("Done");
902          }
903          break;
904        }
905      }
906      Err(err) => {
907        error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
908      }
909    }
910  } // end loop
911  // The last step is to create te symlink
912  let cali_link_dir = cali_base_dir.clone() + "latest";
913  match fs::remove_file(cali_link_dir.clone()) {
914    Ok(_) => {
915      println!("=> Symlink {} removed!", cali_link_dir);
916    },
917    Err(err) => {
918      error!("Unable to remove symlink to latest calibrations! {err}");
919    }
920  }
921  println!("=> Will create symlink {}", cali_link_dir);
922  match symlink(cali_output_dir, cali_link_dir) {
923    Err(err) => error!("Unable to create symlink for calibration data! {err}"),
924    Ok(_)    => ()
925  }
926}