#[macro_use] extern crate log;
pub mod constants;
pub mod threads;
use std::fs;
use std::path::{
use std::collections::HashMap;
use std::os::unix::fs::symlink;
use std::sync::{
use std::process::{
use std::thread;
use std::fs::create_dir_all;
use std::time::{
use crossbeam_channel::Sender;
use indicatif::{
use comfy_table::modifiers::{
use comfy_table::presets::UTF8_FULL;
use comfy_table::*;
use liftof_lib::constants::{
use tof_dataclasses::constants::PAD_CMD_32BIT;
use tof_dataclasses::serialization::{
use tof_dataclasses::errors::{
use tof_dataclasses::commands::{
use tof_dataclasses::status::TofDetectorStatus;
use tof_dataclasses::packets::TofPacket;
use tof_dataclasses::database::ReadoutBoard;
use tof_dataclasses::io::{
use liftof_lib::settings::LiftofSettings;
use liftof_lib::thread_control::ThreadControl;
pub const LIFTOF_HOTWIRE : &str = "tcp://";
pub fn rb_table(counters : &HashMap<u8, u64>, label_is_hz : bool) -> Table {
let mut unit = "";
if label_is_hz {
unit = "Hz"
let mut table = Table::new();
Cell::new(&(format!("RB01 {:.1} {}", counters[&1], unit))),
Cell::new(&(format!("RB02 {:.1} {}", counters[&2], unit))),
Cell::new(&(format!("RB03 {:.1} {}", counters[&3], unit))),
Cell::new(&(format!("RB04 {:.1} {}", counters[&4], unit))),
Cell::new(&(format!("RB05 {:.1} {}", counters[&5], unit))),
Cell::new(&(format!("RB06 {:.1} {}", counters[&6], unit))),
Cell::new(&(format!("RB07 {:.1} {}", counters[&7], unit))),
Cell::new(&(format!("RB08 {:.1} {}", counters[&8], unit))),
Cell::new(&(format!("RB09 {:.1} {}", counters[&9], unit))),
Cell::new(&(format!("RB10 {}", "N.A."))),
Cell::new(&(format!("RB11 {:.1} Hz", counters[&11]))),
Cell::new(&(format!("RB12 {}", "N.A."))),
Cell::new(&(format!("RB13 {:.1} Hz", counters[&13]))),
Cell::new(&(format!("RB14 {:.1} Hz", counters[&14]))),
Cell::new(&(format!("RB15 {:.1} Hz", counters[&15]))),
Cell::new(&(format!("RB16 {:.1} Hz", counters[&16]))),
Cell::new(&(format!("RB17 {:.1} Hz", counters[&17]))),
Cell::new(&(format!("RB18 {:.1} Hz", counters[&18]))),
Cell::new(&(format!("RB19 {:.1} Hz", counters[&19]))),
Cell::new(&(format!("RB20 {:.1} Hz", counters[&20]))),
Cell::new(&(format!("RB21 {:.1} Hz", counters[&21]))),
Cell::new(&(format!("RB22 {:.1} Hz", counters[&22]))),
Cell::new(&(format!("RB23 {:.1} Hz", counters[&23]))),
Cell::new(&(format!("RB24 {:.1} Hz", counters[&24]))),
Cell::new(&(format!("RB25 {:.1} Hz", counters[&25]))),
Cell::new(&(format!("RB26 {:.1} Hz", counters[&26]))),
Cell::new(&(format!("RB27 {:.1} Hz", counters[&27]))),
Cell::new(&(format!("RB28 {:.1} Hz", counters[&28]))),
Cell::new(&(format!("RB29 {:.1} Hz", counters[&29]))),
Cell::new(&(format!("RB30 {:.1} Hz", counters[&30]))),
Cell::new(&(format!("RB31 {:.1} Hz", counters[&31]))),
Cell::new(&(format!("RB32 {:.1} Hz", counters[&32]))),
Cell::new(&(format!("RB33 {:.1} Hz", counters[&33]))),
Cell::new(&(format!("RB34 {:.1} Hz", counters[&34]))),
Cell::new(&(format!("RB35 {:.1} Hz", counters[&35]))),
Cell::new(&(format!("RB36 {:.1}", counters[&36]))),
Cell::new(&(format!("RB37 {}", "N.A."))),
Cell::new(&(format!("RB38 {}", "N.A."))),
Cell::new(&(format!("RB39 {:.1}", counters[&39]))),
Cell::new(&(format!("RB40 {:.1}", counters[&40]))),
Cell::new(&(format!("RB41 {:.1}", counters[&41]))),
Cell::new(&(format!("RB43 {:.1}", counters[&42]))),
Cell::new(&(format!("RB42 {}", "N.A."))),
Cell::new(&(format!("RB44 {:.1}", counters[&44]))),
Cell::new(&(format!("RB45 {}", "N.A."))),
Cell::new(&(format!("RB46 {:.1} Hz", counters[&46]))),
Cell::new(&(format!("{}", "N.A."))),
Cell::new(&(format!("{}", "N.A."))),
Cell::new(&(format!("{}", "N.A."))),
Cell::new(&(format!("{}", "N.A."))),
pub fn init_run_start(cc_pub_addr : &str) {
let one_second = Duration::from_secs(1);
let cmd_payload = PAD_CMD_32BIT | (255u32) << 16 | (255u32) << 8 | (255u32);
let cmd_depr = TofCommand::DataRunStart(cmd_payload);
let packet_depr = cmd_depr.pack();
let mut payload_depr = String::from("BRCT").into_bytes();
payload_depr.append(&mut packet_depr.to_bytestream());
let mut cmd = TofCommandV2::new();
cmd.command_code = TofCommandCode::DataRunStart;
let packet = cmd.pack();
let mut payload = String::from("BRCT").into_bytes();
payload.append(&mut packet.to_bytestream());
let ctx = zmq::Context::new();
let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
println!("=> Sending run start command to RBs ..");
for _ in 0..10 {
match cmd_sender.send(&payload_depr, 0) {
Err(err) => {
error!("Unable to send command! {err}");
Ok(_) => {
debug!("We sent {:?}", payload);
match cmd_sender.send(&payload, 0) {
Err(err) => {
error!("Unable to send command! {err}");
Ok(_) => {
debug!("We sent {:?}", payload);
pub fn end_run(cc_pub_addr : &str) {
let cmd_depr = TofCommand::DataRunStop(DEFAULT_RB_ID as u32);
let packet_depr = cmd_depr.pack();
let mut payload_depr = String::from("BRCT").into_bytes();
payload_depr.append(&mut packet_depr.to_bytestream());
let mut cmd = TofCommandV2::new();
cmd.command_code = TofCommandCode::DataRunStop;
let packet = cmd.pack();
let mut payload = String::from("BRCT").into_bytes();
payload.append(&mut packet.to_bytestream());
let ctx = zmq::Context::new();
let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
cmd_sender.bind(cc_pub_addr).expect("Unable to bind to (PUB) socket!");
println!("=> Sending run stop command to all RBs...");
println!("=> Waiting for RBs to stoop data acquisition..");
for _ in 0..10 {
match cmd_sender.send(&payload_depr, 0) {
Err(err) => {
error!("Unable to send command! {err}");
Ok(_) => {
debug!("We sent {:?}", payload);
match cmd_sender.send(&payload, 0) {
Err(err) => {
error!("Unable to send command! {err}");
Ok(_) => {
debug!("We sent {:?}", payload);
pub fn get_queue(dir_path : &String) -> Vec<String> {
let mut entries = fs::read_dir(dir_path)
.expect("Directory might not exist!")
.map(|entry| entry.unwrap().path())
entries.sort_by(|a, b| {
let meta_a = fs::metadata(a).unwrap();
let meta_b = fs::metadata(b).unwrap();
.map(|path| path.to_str().unwrap().to_string())
pub fn move_file_with_name(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
let old_path = Path::new(old_path);
let file_name = old_path.file_name().unwrap().to_str().unwrap(); let new_path = Path::new(new_dir).join(file_name); fs::rename(old_path, new_path) }
pub fn move_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<(), std::io::Error> {
let old_path = Path::new(old_path);
let new_path = Path::new(new_dir).join("liftof-config.toml"); fs::rename(old_path, new_path) }
pub fn copy_file(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
let old_path = Path::new(old_path);
let file_name = old_path.file_name().unwrap().to_str().unwrap(); let new_path = Path::new(new_dir).join(file_name); fs::copy(old_path, new_path)
pub fn copy_file_rename_liftof(old_path: &str, new_dir: &str) -> Result<u64, std::io::Error> {
let old_path = Path::new(old_path);
let new_path = Path::new(new_dir).join("liftof-config.toml"); fs::copy(old_path, new_path)
pub fn delete_file(file_path: &str) -> Result<(), std::io::Error> {
let path = Path::new(file_path);
fs::remove_file(path) }
pub fn run_cycler(staging_dir : String, dry_run : bool) -> Result<(),StagingError> {
let queue_dir = format!("{}/queue", staging_dir);
let next_dir = format!("{}/next", staging_dir);
let current_dir = format!("{}/current", staging_dir);
let queue = get_queue(&queue_dir);
let current = get_queue(¤t_dir);
let next = get_queue(&next_dir);
if current.len() == 0 {
error!("We don't have a current configuration. This is BAD!");
return Err(StagingError::NoCurrentConfig);
println!("= => Found {} files in run queue!", queue.len());
if next.len() == 0 && queue.len() == 0 {
println!("= => Nothing staged, will jusr repeat current run setting!");
if !dry_run {
return Ok(());
if next.len() == 0 && queue.len() != 0 {
error!("Empty next directory, but we have files in the queue!");
match copy_file_rename_liftof(&queue[0], &next_dir) {
Ok(_) => (),
Err(err) => {
error!("Unable to copy {} to {}! {}", next[0], next_dir, err);
match move_file_rename_liftof(&queue[0], ¤t_dir) {
Ok(_) => (),
Err(err) => {
error!("Unable to copy {} to {}! {}", queue[0], current_dir, err);
if next.len() != 0 {
match delete_file(¤t[0]) {
Ok(_) => (),
Err(err) => {
error!("Unable to delete {}! {}", current[0], err);
match move_file_rename_liftof(&next[0], ¤t_dir) {
Ok(_) => (),
Err(err) => {
error!("Unable to copy {} to {}! {}", next[0], current_dir, err);
if queue.len() != 0 {
match move_file_with_name(&queue[0], &next_dir) {
Ok(_) => (),
Err(err) => {
error!("Unable to move {} to {}! {}", queue[0], next_dir, err);
println!("=> Restarting liftof-cc!");
if !dry_run {
pub fn prepare_run(data_path : String,
config : &LiftofSettings,
run_id : Option<u32>,
create_dir : bool) -> Option<u32> {
let mut stream_files_path = PathBuf::from(data_path);
let paths = fs::read_dir(stream_files_path.clone()).unwrap();
let mut used_runids = Vec::<u32>::new();
for path in paths {
match format!("{}",path.as_ref().unwrap().path().iter().last().unwrap().to_str().unwrap()).parse::<u32>() {
Ok(this_run_id) => {
debug!("Extracted run id {}", this_run_id);
Err(err) => {
warn!("Can not get runid from {}! {}", path.unwrap().path().display(), err);
let mut max_run_id = 0u32;
match used_runids.iter().max() {
None => (),
Some(_r) => {
max_run_id = *_r;
println!("=> Found {} used run ids in {}. Largest run id is {}",used_runids.len(), stream_files_path.display(), max_run_id);
let new_run_id : Option<u32>;
if max_run_id == 0 {
new_run_id = run_id;
} else if run_id.is_some() {
if used_runids.contains(&run_id.unwrap()) {
error!("Duplicate run id ({})!", run_id.unwrap());
new_run_id = None;
} else {
new_run_id = run_id;
} else {
new_run_id = Some(max_run_id + 1);
if new_run_id.is_none() {
return new_run_id;
if create_dir {
if let Ok(metadata) = fs::metadata(&stream_files_path) {
if metadata.is_dir() {
println!("=> Directory {} for run number {} already consists and may contain files!", stream_files_path.display(), new_run_id.unwrap());
} else {
match fs::create_dir(&stream_files_path) {
Ok(()) => println!("=> Created {} to save stream data", stream_files_path.display()),
Err(err) => panic!("Failed to create directory: {}! {}", stream_files_path.display(), err),
let settings_fname = format!("{}/run{}.toml",
println!("=> Writing data to {}/{}!", stream_files_path.display(), new_run_id.unwrap());
println!("=> Writing settings to {}!", settings_fname);
return new_run_id;
pub fn manage_liftof_cc_service(mode : &str) -> TofReturnCode {
match Command::new("sudo")
.args(["systemctl", mode, "liftof"])
.spawn() {
Err(err) => {
error!("Unable to execute sudo systemctl {} liftof! {}", mode, err);
Ok(_) => {
println!("=> Executed sudo systemctl {} liftof", mode);
pub fn ssh_command_rbs(rb_list : &Vec<u8>,
cmd : Vec<String>) -> Result<Vec<u8>, TofError> {
let mut rb_handles = Vec::<thread::JoinHandle<_>>::new();
info!("=> Executing ssh command {:?} on {} RBs!", cmd, rb_list.len());
let mut children = Vec::<(u8,Child)>::new();
for rb in rb_list {
let rb_address = format!("tof-rb{:02}", rb);
let mut ssh_args = vec![rb_address];
let mut thisrb_cmd = cmd.clone();
ssh_args.append(&mut thisrb_cmd);
match Command::new("ssh")
.spawn() {
Err(err) => {
error!("Unable to spawn ssh process on RB {}! {}", rb, err);
Ok(child) => {
let mut issues = Vec::<u8>::new();
for rb_child in &mut children {
let timeout = Duration::from_secs(10);
let kill_t = Instant::now();
loop {
if kill_t.elapsed() > timeout {
error!("SSH process for board {} timed out!", rb_child.0);
match rb_child.1.kill() {
Err(err) => {
error!("Unable to kill the SSH process for RB {}! {err}", rb_child.0);
Ok(_) => {
error!("Killed SSH process for for RB {}", rb_child.0);
match rb_child.1.try_wait() {
Ok(None) => {
Ok(Some(status)) => {
if status.success() {
info!("Execution of command on {} successful!", rb_child.0);
} else {
error!("Execution of command on {} failed with exit code {:?}!", rb_child.0, status.code());
Err(err) => {
error!("Unable to wait for the SSH process! {err}");
if issues.len() == 0 {
println!("=> Executing ssh command {:?} on {} RBs successful!", cmd, rb_list.len());
pub fn restart_liftof_rb(rb_list : &Vec<u8>) {
let command = vec![String::from("sudo"),
println!("=> Restarting liftof-rb on RBs!");
match ssh_command_rbs(rb_list, command) {
Err(err) => error!("Restarting liftof-rb on all RBs failed! {err}"),
Ok(_) => ()
pub fn verification_run(timeout : u32,
tp_to_sink : Sender<TofPacket>,
thread_control : Arc<Mutex<ThreadControl>>) {
let mut write_state : bool = true; let mut config = LiftofSettings::new();
match thread_control.lock() {
Ok(mut tc) => {
write_state = tc.write_data_to_disk;
tc.write_data_to_disk = false;
tc.verification_active = true;
tc.thread_master_trg_active = true;
tc.calibration_active = false;
tc.thread_event_bldr_active = true;
config = tc.liftof_settings.clone();
Err(err) => {
error!("Can't acquire lock for ThreadControl! {err}");
let one_second = Duration::from_millis(1000);
let runtime = Instant::now();
let cmd_payload: u32 = PAD_CMD_32BIT | (255u32) << 16 | (255u32) << 8 | (255u32);
let cmd = TofCommand::DataRunStart(cmd_payload);
let packet = cmd.pack();
let mut payload = String::from("BRCT").into_bytes();
payload.append(&mut packet.to_bytestream());
let ctx = zmq::Context::new();
let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
let cc_pub_addr = config.cmd_dispatcher_settings.cc_server_address.clone();
cmd_sender.bind(&cc_pub_addr).expect("Unable to bind to (PUB) socket!");
println!("=> Give the RBs a chance to connect and wait a bit..");
match cmd_sender.send(&payload, 0) {
Err(err) => {
error!("Unable to send command, error{err}");
Ok(_) => {
debug!("We sent {:?}", payload);
println!("=> Verification run initialized!");
loop {
if runtime.elapsed().as_secs() > timeout as u64 {
println!("=> Ending verification run!");
println!("=> Sending run termination command to the RBs");
let cmd = TofCommand::DataRunStop(DEFAULT_RB_ID as u32);
let packet = cmd.pack();
let mut payload = String::from("BRCT").into_bytes();
payload.append(&mut packet.to_bytestream());
warn!("=> No command socket available! Can not shut down RBs..!");
println!("=> Give the RBs a chance to connect and wait a bit..");
match cmd_sender.send(&payload, 0) {
Err(err) => {
error!("Unable to send command! {err}");
Ok(_) => {
debug!("We sent {:?}", payload);
let mut detector_status = TofDetectorStatus::new();
match thread_control.lock() {
Ok(mut tc) => {
tc.write_data_to_disk = write_state;
tc.verification_active = false;
detector_status = tc.detector_status.clone();
Err(err) => {
error!("Can't acquire lock for ThreadControl! {err}");
println!("=> Acquired TofDetectorStatus!");
println!("{}", detector_status);
let pack = detector_status.pack();
match tp_to_sink.send(pack) {
Err(err) => error!("Unable to send TofDetectorStatus to data sink! {err}"),
Ok(_) => ()
pub fn calibrate_tof(thread_control : Arc<Mutex<ThreadControl>>,
rb_list : &Vec<ReadoutBoard>,
show_progress : bool) {
let one_second = Duration::from_millis(1000);
let mut cc_pub_addr = String::from("");
let calibration_timeout_fail = Duration::from_secs(300); let mut cali_dir_created = false;
let mut cali_output_dir = String::from("");
let mut cali_base_dir = String::from("");
match thread_control.lock() {
Ok(mut tc) => {
for rb in rb_list {
cali_base_dir = tc.liftof_settings.calibration_dir.clone();
cc_pub_addr = tc.liftof_settings.cmd_dispatcher_settings.cc_server_address.clone();
tc.write_data_to_disk = true;
Err(err) => {
error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
let voltage_level = DEFAULT_CALIB_VOLTAGE;
let rb_id = DEFAULT_RB_ID;
println!("=> Received calibration default command! Will init calibration run of all RBs...");
let cmd_payload: u32
= (voltage_level as u32) << 16 | (rb_id as u32) << 8 | (extra as u32);
let default_calib_depr = TofCommand::DefaultCalibration(cmd_payload);
let tp_depr = default_calib_depr.pack();
let mut payload_depr = String::from("BRCT").into_bytes();
payload_depr.append(&mut tp_depr.to_bytestream());
let mut default_calib = TofCommandV2::new();
default_calib.command_code = TofCommandCode::RBCalibration;
let tp = default_calib.pack();
let mut payload = String::from("BRCT").into_bytes();
payload.append(&mut tp.to_bytestream());
let ctx = zmq::Context::new();
let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
cmd_sender.bind(&cc_pub_addr).expect("Unable to bind to (PUB) socket!");
println!("=> Give the RBs a chance to connect and wait a bit..");
match cmd_sender.send(&payload_depr, 0) { Err(err) => {
error!("Unable to send command! {err}");
Ok(_) => {
println!("=> Calibration initialized!");
match cmd_sender.send(&payload, 0) { Err(err) => {
error!("Unable to send command! {err}");
Ok(_) => {
println!("=> Calibration initialized!");
match thread_control.lock() {
Ok(mut tc) => {
tc.thread_master_trg_active =false;
tc.calibration_active = true;
Err(err) => {
error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
let bar_style = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
let mut bar = ProgressBar::hidden();
println!("=> .. now we need to wait until the calibration is finished!");
if show_progress {
bar = ProgressBar::new(rb_list.len() as u64);
let bar_label = String::from("Acquiring RB calibration data");
bar.set_message (bar_label);
bar.set_prefix ("\u{2699}\u{1F4D0}");
bar.set_style (bar_style);
let timeout = Instant::now();
let mut cali_received = 0;
'main: loop {
if timeout.elapsed() > calibration_timeout_fail {
error!("Calibration timeout! Calibrations might not be complete!");
match thread_control.lock() {
Ok(mut tc) => {
tc.calibration_active = false;
Err(err) => {
error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
if show_progress {
match thread_control.lock() {
Ok(mut tc) => {
for rbid in rb_list {
let mut finished_keys = Vec::<u8>::new();
if tc.stop_flag {
println!("Stop signal received, exiting calibration routine!");
break 'main;
if tc.finished_calibrations[&rbid.rb_id] {
cali_received += 1;
let rbcali = tc.calibrations.get(&rbid.rb_id).expect("We got the signal tat this calibration is ready but it is not!");
let pack = rbcali.pack();
let file_type = FileType::CalibrationFile(rbid.rb_id);
if !cali_dir_created {
let today = get_utc_timestamp();
cali_output_dir = format!("{}/{}", cali_base_dir.clone(), today);
match create_dir_all(cali_output_dir.clone()) {
Ok(_) => info!("Created {} for calibration data!", cali_output_dir),
Err(err) => error!("Unable to create {} for calibration data! {}", cali_output_dir, err)
cali_dir_created = true;
let mut cali_writer = TofPacketWriter::new(cali_output_dir.clone(), file_type);
for rbid in &finished_keys {
*tc.finished_calibrations.get_mut(&rbid).unwrap() = false;
if cali_received as usize == rb_list.len() {
tc.calibration_active = false;
for rbid in rb_list {
*tc.finished_calibrations.get_mut(&rbid.rb_id).unwrap() = false;
if show_progress {
Err(err) => {
error!("Can't acquire lock for ThreadControl at this time! Unable to set calibration mode! {err}");
} let cali_link_dir = cali_base_dir.clone() + "latest";
match fs::remove_file(cali_link_dir.clone()) {
Ok(_) => {
println!("=> Symlink {} removed!", cali_link_dir);
Err(err) => {
error!("Unable to remove symlink to latest calibrations! {err}");
println!("=> Will create symlink {}", cali_link_dir);
match symlink(cali_output_dir, cali_link_dir) {
Err(err) => error!("Unable to create symlink for calibration data! {err}"),
Ok(_) => ()