liftof_scheduler/
liftof_scheduler.rs

1//! Liftof scheduler - this will run as an additional process
2//! on the TOF main computer to schedule run/start stop
3//! through the liftof-cc main program
4//!
5//! Features
6//!
7//! * receive TofCommmandV2 from telemetry packets (flight computer)
8//! * modifiy liftof-config file, recreate links to config files
9//! * start/stop liftof process
10//! * scedule run queue
11//!
12//! Run flow/staging:
13//!
14//! There are 3 directories in the staging directory:
15//! - current - the configuration which is run currently
16//! - next    - the configuration which shall be run next. 
17//!             This configuration can be edited until the 
18//!             next run start.
19//! - queue   - config files in here will get assesed and 
20//!             sorted every new run cycle and the one with 
21//!             the highest priority (number) will get 
22//!             executed first.
23//!
24//!
25
26#[macro_use] extern crate log;
27
28use std::fs::{
29  OpenOptions,
30};
31
32use std::thread;
33use std::io::Write;
34use std::process::Command;
35use std::path::{
36  Path
37};
38
39use chrono::Utc;
40use clap::{
41  arg,
42  command,
43  Parser
44};
45  
46//use liftof_lib::{
47//  init_env_logger,
48//  LIFTOF_LOGO_SHOW,
49//  LiftofSettings,
50//};
51
52use std::time::{
53  Duration,
54};
55
56//use tof_dataclasses::commands::{
57//  TofCommandV2,
58//  TofCommandCode,
59//  TofReturnCode
60//};
61//use tof_dataclasses::serialization::{
62//  Serialization,
63//  Packable
64//};
65//use tof_dataclasses::packets::{
66//  PacketType,
67//  TofPacket
68//};
69//use tof_dataclasses::database::{
70//  connect_to_db,
71//  ReadoutBoard,
72//};
73//use tof_dataclasses::commands::config::{
74//  TriggerConfig,
75//  TOFEventBuilderConfig,
76//  DataPublisherConfig,
77//  TofRunConfig,
78//  TofRBConfig
79//};
80//
81//use telemetry_dataclasses::packets::AckBfsw;
82use gondola_core::prelude::*;
83use gondola_core::init_env_logger;
84
85use liftof_cc::{
86  manage_liftof_cc_service,
87  ssh_command_rbs,
88  copy_file_rename_liftof,
89  LIFTOF_HOTWIRE,
90};
91
92
93
94#[derive(Parser, Debug)]
95#[command(author = "J.A.Stoessl", version, about, long_about = None)]
96#[command(propagate_version = true)]
97struct LiftofSchedArgs {
98  #[arg(short, long)]
99  config      : Option<String>,
100  /// Don't do anything, just tell us what 
101  /// would happen
102  #[arg(long, default_value_t = false)]
103  dry_run : bool,
104  /// Don't send ACK packets
105  #[arg(long, default_value_t = false)]
106  no_ack  : bool,
107}
108
109/// Send an ack packet to liftof-cc
110///
111/// Matroshka! Literally Ack in Pack, recursive packaging
112/// - I love this!
113///
114/// The purpose of this is to sneak an Bfsw ack packet 
115/// through the bfsw system. Well, that's how broken 
116/// we all are
117fn send_ack_packet(cc       : TofCommandCode,
118                   ret_code : TofReturnCode,
119                   socket   : &zmq::Socket) {
120  let mut ack = AckBfsw::new(); 
121  ack.ret_code1 = ret_code as u8;
122  ack.ret_code2 = cc as u8;
123  let tp = ack.pack();
124  match socket.send(tp.to_bytestream(), 0) {
125    Ok(_)    => (),
126    Err(err) => error!("Unable to send ACK! {err}")
127  }
128}
129
130
131fn main() {
132  init_env_logger();
133
134  // welcome banner!
135  println!("{}", LIFTOF_LOGO_SHOW);
136  println!("-----------------------------------------------");
137  println!(" >> Welcome to liftof-scheduler \u{1F680} \u{1F388} ");
138  println!(" >> liftof is a software suite for the time-of-flight detector (TOF) ");
139  println!(" >> for the GAPS experiment \u{1F496}");
140  println!(" >> This is the run scheduler (liftof-scheduler)");
141  println!(" >> It starts/stops the liftof-cc service and manages run configurations");
142  println!("-----------------------------------------------\n\n");
143  
144  let args            = LiftofSchedArgs::parse();
145  let config          : LiftofSettings;
146  let dry_run         = args.dry_run;
147  let no_ack          = args.no_ack;
148  match args.config {
149    None => panic!("No config file provided! Please provide a config file with --config or -c flag!"),
150    Some(cfg_file) => {
151      //cfg_file_str = cfg_file.clone();
152      match LiftofSettings::from_toml(&cfg_file) {
153        Err(err) => {
154          error!("CRITICAL! Unable to parse .toml settings file! {}", err);
155          panic!("Unable to parse config file!");
156        }
157        Ok(_cfg) => {
158          config = _cfg;
159        }
160      }
161    } // end Some
162  } // end match
163
164  let staging_dir = config.staging_dir; 
165  // This is the file we will edit 
166  let cfg_file         = format!("{}/next/liftof-config.toml", staging_dir.clone());
167  let next_dir         = format!("{}/next", staging_dir.clone());
168  let current_dir      = format!("{}/current", staging_dir.clone());
169  let default_cfg_file = format!("{}/default/liftof-config-default.toml", staging_dir.clone());
170  let db_path     = config.db_path.clone();
171  let mut conn_    = connect_to_db_path(&db_path).expect("Unable to establish a connection to the DB! CHeck db_path in the liftof settings (.toml) file!");
172  // if this call does not go through, we might as well fail early.
173  let rb_list     = ReadoutBoard::all().expect("Unable to retrieve RB information! Unable to continue, check db_path in the liftof settings (.toml) file and DB integrity!");
174  let mut all_rb_ids  = Vec::<u8>::new();
175  for rb in rb_list {
176    all_rb_ids.push(rb.rb_id as u8);
177  }
178
179  let sleep_time  = Duration::from_secs(config.cmd_dispatcher_settings.cmd_listener_interval_sec);
180  //let locked      = config.cmd_dispatcher_settings.deny_all_requests; // do not allow the reception of commands if true
181  
182  let fc_sub_addr = config.cmd_dispatcher_settings.fc_sub_address.clone();
183  let cc_pub_addr = config.cmd_dispatcher_settings.cc_server_address.clone();
184  let ctx = zmq::Context::new();
185  
186  // socket to receive commands
187  info!("Connecting to flight computer at {}", fc_sub_addr);
188  let cmd_receiver = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
189  cmd_receiver.set_subscribe(b"").expect("Unable to subscribe to empty topic!");
190  cmd_receiver.connect(&fc_sub_addr).expect("Unable to subscribe to flight computer PUB");
191  info!("ZMQ SUB Socket for flight cpu listener bound to {fc_sub_addr}");
192
193  // socket to send commands on the RB network
194  info!("Binding socket for command dispatching to rb network to {}", cc_pub_addr);
195  let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
196  if !dry_run || no_ack {
197    cmd_sender.bind(LIFTOF_HOTWIRE).expect("Unable to bind to (PUB) socket!");
198  }
199  // open the logfile for commands
200  let mut filename = config.cmd_dispatcher_settings.cmd_log_path.clone();
201  if !filename.ends_with("/") {
202    filename += "/";
203  }
204  filename        += "received-commands.log";
205  let path         = Path::new(&filename);
206  info!("Writing cmd log to file {filename}");
207  let mut log_file = OpenOptions::new().create(true).append(true).open("received-commands.log").expect("Unable to create file!");
208  match OpenOptions::new().create(true).append(true).open(path) {
209    Ok(_f) => {log_file = _f;},
210    Err(err) => { 
211      error!("Unable to write to path {filename}! {err} Falling back to default file path");
212    }
213  }
214
215  // All RBs
216
217  loop {
218    thread::sleep(sleep_time);
219    //println!("=> Cmd responder loop iteration!");
220    let mut success = TofReturnCode::Unknown;
221    match cmd_receiver.connect(&fc_sub_addr) {
222      Ok(_)    => (),
223      Err(err) => {
224        error!("Unable to connect to {}! {}", fc_sub_addr, err);
225        continue;
226      }
227    }
228    
229    let cmd_packet : TofPacket;
230    match cmd_receiver.recv_bytes(zmq::DONTWAIT) {
231      Err(err)   => {
232        trace!("ZMQ socket receiving error! {err}");
233        continue;
234      }
235      Ok(buffer) => {
236        info!("Received bytes {:?}", buffer);
237        // identfiy if we have a GAPS packet
238        if buffer.len() < 4 {
239          error!("Can't deal with commands shorter than 4 bytes@");
240          continue
241        }
242        // check on the buffer
243        if buffer[0] == 0x90 && buffer[1] == 0xeb {
244          if buffer[4] != 0x46 { //0x5a?
245            // We have a GAPS packet -> FIXME:
246            info!("We received something, but it does not seem to be address to us! We are only listening to address {} right now!", 0x46);
247            continue;
248          } else {
249            info!("Received command sent through (Cra-)BFSW system!");
250            if buffer.len() < 8 {
251              error!("Received command is too short! (Smaller than 8 bytes) {:?}", buffer);
252              success = TofReturnCode::GarbledCommand;
253              send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
254              continue;
255            }
256            match TofPacket::from_bytestream(&buffer, &mut 8) {
257              Err(err) => {
258                error!("Unable to decode bytestream {:?} for command ! {:?}", buffer, err);
259                success = TofReturnCode::GarbledCommand;
260                send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
261                continue;  
262              },
263              Ok(packet) => {
264                cmd_packet = packet;
265              }
266            }
267          }
268        } else if  buffer[0] == 170 && buffer[1] == 170 {
269          info!("Got a TofPacket!");
270          match TofPacket::from_bytestream(&buffer, &mut 0) {
271            Err(err) => {
272              error!("Unable to decode bytestream {:?} for command ! {:?}", buffer, err);
273              success = TofReturnCode::GarbledCommand;
274              send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
275              continue;  
276            },
277            Ok(packet) => {
278              cmd_packet = packet;
279            }
280          }
281        } else {
282          error!("Received bytestream, but don't know how to deal with it!");
283          continue;
284        }
285        debug!("Got packet {}!", cmd_packet);
286        match cmd_packet.packet_type {
287          TofPacketType::TofCommand => {
288            let cmd : TofCommand;
289            match cmd_packet.unpack::<TofCommand>() {
290              Ok(_cmd) => {cmd = _cmd;},
291              Err(err) => {
292                error!("Unable to decode TofCommand! {err}");
293                success = TofReturnCode::GarbledCommand;
294                send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
295                continue;
296              }
297            }
298            println!("= => Received command {}!", cmd);
299            let now = Utc::now().to_string();
300            let write_to_file = format!("{:?}: {}\n",now, cmd);
301            match log_file.write_all(&write_to_file.into_bytes()) {
302              Err(err) => {
303                error!("Writing to file to path {} failed! {}", filename, err)
304              }
305              Ok(_)    => ()
306            }
307            match log_file.sync_all() {
308              Err(err) => {
309                error!("Unable to sync file to disc! {err}");
310              },
311              Ok(_) => ()
312            }
313          
314            // actual command tree
315            match cmd.command_code {
316              TofCommandCode::DataRunStop  => {
317                println!("= => Received DataRunStop!");
318                if !dry_run { 
319                  success =  manage_liftof_cc_service("stop"); 
320                }
321              },
322              TofCommandCode::DataRunStart  => {
323                info!("Received DataRunStart!");
324                // FIXME - factor out manage_liftof_cc_service here, otherwise
325                if !dry_run { 
326                  success =  manage_liftof_cc_service("restart");
327                }
328              }
329              TofCommandCode::ShutdownRB => {
330                let mut cmd_rb_list  = cmd.payload.clone();
331                if cmd_rb_list.is_empty() {
332                  cmd_rb_list = all_rb_ids.clone();
333                }
334                info!("Received Shutdown RB command for RBs {:?}", cmd_rb_list);
335                let cmd_args     = vec![String::from("sudo"),
336                                        String::from("shutdown"),
337                                        String::from("now")]; 
338                if !args.dry_run {
339                  match ssh_command_rbs(&cmd_rb_list, cmd_args) {
340                    Err(err) => {
341                      error!("SSh-ing into RBs {:?} failed! {err}", cmd_rb_list);
342                      success = TofReturnCode::GeneralFail;
343                    }
344                    Ok(_)    => {
345                      success = TofReturnCode::Success;
346                    }
347                  }
348                }
349              }
350              TofCommandCode::ResetConfigWDefault => {
351                info!("Will reset {} with {}", cfg_file, default_cfg_file);
352                match copy_file_rename_liftof(&default_cfg_file, &next_dir) {
353                  Ok(_)    => {
354                    info!("Copy successful!");
355                    success = TofReturnCode::Success;
356                  }
357                  Err(err) => {
358                    error!("Unable to copy! {err}");
359                    success = TofReturnCode::GeneralFail;
360                  }
361                }
362              }
363              TofCommandCode::SubmitConfig => {
364                info!("Submitting the worked on config!");
365                match copy_file_rename_liftof(&cfg_file, &current_dir) {
366                  Ok(_)    => {
367                    info!("Copy successful!");
368                    success = TofReturnCode::Success;
369                  }
370                  Err(err) => { 
371                    error!("Unable to copy! {err}");
372                    success = TofReturnCode::GeneralFail;
373                  }
374                }
375              }
376              TofCommandCode::ShutdownRAT => {
377                let cmd_rb_list  = cmd.payload.clone();
378                info!("Received Shutdown RAT command for RBs {:?}", cmd_rb_list);
379                let cmd_args     = vec![String::from("sudo"),
380                                        String::from("shutdown"),
381                                        String::from("now")]; 
382                if !args.dry_run {
383                  match ssh_command_rbs(&cmd_rb_list, cmd_args) {
384                    Err(err) => {
385                      error!("SSh-ing into RBs {:?} failed! {err}", cmd_rb_list);
386                      success = TofReturnCode::GeneralFail;
387                    }
388                    Ok(_)    => {
389                      success = TofReturnCode::Success;
390                    }
391                  }
392                }
393              }
394              TofCommandCode::ShutdownCPU => {
395                let cmd_args     = vec![String::from("shutdown"),
396                                        String::from("now")]; 
397                info!("Received Shutdown command for CPU");
398                if !args.dry_run {
399                  match Command::new("sudo")
400                    //.args([&rb_address, "sudo", "systemctl", "restart", "liftof"])
401                    .args(cmd_args)
402                    .spawn() {
403                    Err(err) => {
404                      error!("Unable to spawn shutdown process on TofCPU! {err}");
405                      success = TofReturnCode::GeneralFail;
406                    }
407                    // FIXME - timeout with try wait
408                    Ok(mut child) => {
409                      match child.wait() {
410                        Err(err) => error!("Waiting for the shutdown process failed! {err}"),
411                        Ok(_)    => ()
412                      }
413                    }
414                  }
415                }
416              }
417              TofCommandCode::RBCalibration => {
418                info!("Received RBCalibration command!");
419                if cmd.payload.len() < 3 {
420                  error!("Broken RBCalibration command!");
421                  continue;
422                }
423                let pre_run_cali   = cmd.payload[0] != 0;
424                let send_packets   = cmd.payload[1] != 0;  
425                let save_waveforms = cmd.payload[2] != 0;
426                match LiftofSettings::from_toml(&cfg_file) {
427                  Err(err) => {
428                    error!("CRITICAL! Unable to parse .toml settings file! {}", err);
429                    //panic!("Unable to parse config file!");
430                    success = TofReturnCode::GeneralFail;
431                  }
432                  Ok(mut config) => {
433                    config.data_publisher_settings.send_cali_packets = send_packets;
434                    config.save_cali_wf                              = save_waveforms;
435                    config.pre_run_calibration = pre_run_cali;
436                    config.to_toml(String::from(cfg_file.clone()));
437                    info!("We changed the data publisher settings to be this {}",config.data_publisher_settings);
438                    
439                    success = TofReturnCode::Success;
440                  }
441                }   
442              }
443              TofCommandCode::SetMTConfig => {
444                info!("Will change trigger config for next run!");
445                match TriggerConfig::from_bytestream(&cmd.payload, &mut 0) {
446                  Err(err) => error!("Unable to extract TriggerConfig from command! {err}"),
447                  Ok(tcf)  => {
448                    match LiftofSettings::from_toml(&cfg_file) {
449                      Err(err) => {
450                        error!("CRITICAL! Unable to parse .toml settings file! {}", err);
451                        success = TofReturnCode::GeneralFail;
452                      }
453                      Ok(mut config) => {
454                        println!("=> We received the following trigger config {}", tcf);
455                        config.mtb_settings.from_triggerconfig(&tcf);
456                        println!("=> We changed the mtb settings to be this {}",config.mtb_settings);
457                        config.to_toml(String::from(cfg_file.clone()));
458                        success = TofReturnCode::Success;
459                      }
460                    }   
461                  }
462                }
463              }
464              TofCommandCode::SetTOFEventBuilderConfig => {
465                info!("Will change tof event builder config for next run!");
466                match TOFEventBuilderConfig::from_bytestream(&cmd.payload, &mut 0) {
467                  Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
468                  Ok(tcf)  => {
469                    info!("Received config {}",tcf);
470                    match LiftofSettings::from_toml(&cfg_file) {
471                      Err(err) => {
472                        error!("CRITICAL! Unable to parse .toml settings file! {}", err);
473                        success = TofReturnCode::GeneralFail;
474                      }
475                      Ok(mut config) => {
476                        config.event_builder_settings.from_tofeventbuilderconfig(&tcf);
477                        info!("We changed the event builder settings to be this {}",config.event_builder_settings);
478                        config.to_toml(String::from(cfg_file.clone()));
479                        success = TofReturnCode::Success;
480                      }
481                    }   
482                  }
483                }
484              }
485              TofCommandCode::SetTofRunConfig => {
486                info!("Will change tof run config for next run!");
487                match TofRunConfig::from_bytestream(&cmd.payload, &mut 0) {
488                  Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
489                  Ok(tcf)  => {
490                    info!("Received config {}",tcf);
491                    match LiftofSettings::from_toml(&cfg_file) {
492                      Err(err) => {
493                        error!("CRITICAL! Unable to parse .toml settings file! {}", err);
494                        success = TofReturnCode::GeneralFail;
495                      }
496                      Ok(mut config) => {
497                        config.from_tofrunconfig(&tcf);
498                        info!("We changed the run config to be this {}",config);
499                        config.to_toml(String::from(cfg_file.clone()));
500                        success = TofReturnCode::Success;
501                      }
502                    }   
503                  }
504                }
505              }
506              TofCommandCode::SetTofRBConfig => {
507                info!("Will change tof rb config for next run!");
508                match TofRBConfig::from_bytestream(&cmd.payload, &mut 0) {
509                  Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
510                  Ok(tcf)  => {
511                    info!("Received config {}",tcf);
512                    match LiftofSettings::from_toml(&cfg_file) {
513                      Err(err) => {
514                        error!("CRITICAL! Unable to parse .toml settings file! {}", err);
515                        success = TofReturnCode::GeneralFail;
516                      }
517                      Ok(mut config) => {
518                        config.rb_settings.from_tofrbconfig(&tcf);
519                        info!("We changed the run config to be this {}",config);
520                        config.to_toml(String::from(cfg_file.clone()));
521                        success = TofReturnCode::Success;
522                      }
523                    }   
524                  }
525                }
526              }
527              TofCommandCode::SetDataPublisherConfig => {
528                info!("Will change data publisher config for next run!");
529                let cfg_file = format!("{}/next/liftof-config.toml", staging_dir.clone());
530                match DataPublisherConfig::from_bytestream(&cmd.payload, &mut 0) {
531                  Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
532                  Ok(tcf)  => {
533                    info!("Received config {}",tcf);
534                    match LiftofSettings::from_toml(&cfg_file) {
535                      Err(err) => {
536                        error!("CRITICAL! Unable to parse .toml settings file! {}", err);
537                        success = TofReturnCode::GeneralFail;
538                      }
539                      Ok(mut config) => {
540                        config.data_publisher_settings.from_datapublisherconfig(&tcf);
541                        info!("We changed the event builder settings to be this {}",config.data_publisher_settings);
542                        config.to_toml(String::from(cfg_file));
543                        success = TofReturnCode::Success;
544                      }
545                    }   
546                  }
547                }
548              }
549              _ => {
550                error!("Dealing with command code {} has not been implemented yet!", cmd.command_code);
551                success = TofReturnCode::GeneralFail;
552              }
553            }
554            if !args.no_ack {
555              send_ack_packet(cmd.command_code, success, &cmd_sender);
556            }
557          },
558          _ => {
559            error!("Unable to deal with packet type {}!", cmd_packet.packet_type)
560          }
561        }
562      }
563    }
564  }
565}