liftof_scheduler/
liftof_scheduler.rs

1//! Liftof scheduler - this will run as an additional process
2//! on the TOF main computer to schedule run/start stop
3//! through the liftof-cc main program
4//!
5//! Features
6//!
7//! * receive TofCommmandV2 from telemetry packets (flight computer)
8//! * modifiy liftof-config file, recreate links to config files
9//! * start/stop liftof process
10//! * scedule run queue
11//!
12//! Run flow/staging:
13//!
14//! There are 3 directories in the staging directory:
15//! - current - the configuration which is run currently
16//! - next    - the configuration which shall be run next. 
17//!             This configuration can be edited until the 
18//!             next run start.
19//! - queue   - config files in here will get assesed and 
20//!             sorted every new run cycle and the one with 
21//!             the highest priority (number) will get 
22//!             executed first.
23//!
24//!
25
26#[macro_use] extern crate log;
27
28use std::fs::{
29  OpenOptions,
30};
31
32use std::thread;
33use std::io::Write;
34use std::process::Command;
35use std::path::{
36  Path
37};
38
39use chrono::Utc;
40use clap::{
41  arg,
42  command,
43  Parser
44};
45  
46use liftof_lib::{
47  init_env_logger,
48  LIFTOF_LOGO_SHOW,
49  LiftofSettings,
50};
51
52use std::time::{
53  Duration,
54};
55
56use tof_dataclasses::commands::{
57  TofCommandV2,
58  TofCommandCode,
59  TofReturnCode
60};
61use tof_dataclasses::serialization::{
62  Serialization,
63  Packable
64};
65use tof_dataclasses::packets::{
66  PacketType,
67  TofPacket
68};
69use tof_dataclasses::database::{
70  connect_to_db,
71  ReadoutBoard,
72};
73use tof_dataclasses::commands::config::{
74  TriggerConfig,
75  TOFEventBuilderConfig,
76  DataPublisherConfig,
77  TofRunConfig,
78  TofRBConfig
79};
80
81use telemetry_dataclasses::packets::AckBfsw;
82
83use liftof_cc::{
84  manage_liftof_cc_service,
85  ssh_command_rbs,
86  copy_file_rename_liftof,
87  LIFTOF_HOTWIRE,
88};
89
90
91
92#[derive(Parser, Debug)]
93#[command(author = "J.A.Stoessl", version, about, long_about = None)]
94#[command(propagate_version = true)]
95struct LiftofSchedArgs {
96  #[arg(short, long)]
97  config      : Option<String>,
98  /// Don't do anything, just tell us what 
99  /// would happen
100  #[arg(long, default_value_t = false)]
101  dry_run : bool,
102  /// Don't send ACK packets
103  #[arg(long, default_value_t = false)]
104  no_ack  : bool,
105}
106
107/// Send an ack packet to liftof-cc
108///
109/// Matroshka! Literally Ack in Pack, recursive packaging
110/// - I love this!
111///
112/// The purpose of this is to sneak an Bfsw ack packet 
113/// through the bfsw system. Well, that's how broken 
114/// we all are
115fn send_ack_packet(cc       : TofCommandCode,
116                   ret_code : TofReturnCode,
117                   socket   : &zmq::Socket) {
118  let mut ack = AckBfsw::new(); 
119  ack.ret_code1 = ret_code as u8;
120  ack.ret_code2 = cc as u8;
121  let tp = ack.pack();
122  match socket.send(tp.to_bytestream(), 0) {
123    Ok(_)    => (),
124    Err(err) => error!("Unable to send ACK! {err}")
125  }
126}
127
128
129fn main() {
130  init_env_logger();
131
132  // welcome banner!
133  println!("{}", LIFTOF_LOGO_SHOW);
134  println!("-----------------------------------------------");
135  println!(" >> Welcome to liftof-scheduler \u{1F680} \u{1F388} ");
136  println!(" >> liftof is a software suite for the time-of-flight detector (TOF) ");
137  println!(" >> for the GAPS experiment \u{1F496}");
138  println!(" >> This is the run scheduler (liftof-scheduler)");
139  println!(" >> It starts/stops the liftof-cc service and manages run configurations");
140  println!("-----------------------------------------------\n\n");
141  
142  let args            = LiftofSchedArgs::parse();
143  let config          : LiftofSettings;
144  let dry_run         = args.dry_run;
145  let no_ack          = args.no_ack;
146  match args.config {
147    None => panic!("No config file provided! Please provide a config file with --config or -c flag!"),
148    Some(cfg_file) => {
149      //cfg_file_str = cfg_file.clone();
150      match LiftofSettings::from_toml(&cfg_file) {
151        Err(err) => {
152          error!("CRITICAL! Unable to parse .toml settings file! {}", err);
153          panic!("Unable to parse config file!");
154        }
155        Ok(_cfg) => {
156          config = _cfg;
157        }
158      }
159    } // end Some
160  } // end match
161
162  let staging_dir = config.staging_dir; 
163  // This is the file we will edit 
164  let cfg_file         = format!("{}/next/liftof-config.toml", staging_dir.clone());
165  let next_dir         = format!("{}/next", staging_dir.clone());
166  let current_dir      = format!("{}/current", staging_dir.clone());
167  let default_cfg_file = format!("{}/default/liftof-config-default.toml", staging_dir.clone());
168  let db_path     = config.db_path.clone();
169  let mut conn    = connect_to_db(db_path).expect("Unable to establish a connection to the DB! CHeck db_path in the liftof settings (.toml) file!");
170  // if this call does not go through, we might as well fail early.
171  let rb_list     = ReadoutBoard::all(&mut conn).expect("Unable to retrieve RB information! Unable to continue, check db_path in the liftof settings (.toml) file and DB integrity!");
172  let mut all_rb_ids  = Vec::<u8>::new();
173  for rb in rb_list {
174    all_rb_ids.push(rb.rb_id as u8);
175  }
176
177  let sleep_time  = Duration::from_secs(config.cmd_dispatcher_settings.cmd_listener_interval_sec);
178  //let locked      = config.cmd_dispatcher_settings.deny_all_requests; // do not allow the reception of commands if true
179  
180  let fc_sub_addr = config.cmd_dispatcher_settings.fc_sub_address.clone();
181  let cc_pub_addr = config.cmd_dispatcher_settings.cc_server_address.clone();
182  let ctx = zmq::Context::new();
183  
184  // socket to receive commands
185  info!("Connecting to flight computer at {}", fc_sub_addr);
186  let cmd_receiver = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
187  cmd_receiver.set_subscribe(b"").expect("Unable to subscribe to empty topic!");
188  cmd_receiver.connect(&fc_sub_addr).expect("Unable to subscribe to flight computer PUB");
189  info!("ZMQ SUB Socket for flight cpu listener bound to {fc_sub_addr}");
190
191  // socket to send commands on the RB network
192  info!("Binding socket for command dispatching to rb network to {}", cc_pub_addr);
193  let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
194  if !dry_run || no_ack {
195    cmd_sender.bind(LIFTOF_HOTWIRE).expect("Unable to bind to (PUB) socket!");
196  }
197  // open the logfile for commands
198  let mut filename = config.cmd_dispatcher_settings.cmd_log_path.clone();
199  if !filename.ends_with("/") {
200    filename += "/";
201  }
202  filename        += "received-commands.log";
203  let path         = Path::new(&filename);
204  info!("Writing cmd log to file {filename}");
205  let mut log_file = OpenOptions::new().create(true).append(true).open("received-commands.log").expect("Unable to create file!");
206  match OpenOptions::new().create(true).append(true).open(path) {
207    Ok(_f) => {log_file = _f;},
208    Err(err) => { 
209      error!("Unable to write to path {filename}! {err} Falling back to default file path");
210    }
211  }
212
213  // All RBs
214
215  loop {
216    thread::sleep(sleep_time);
217    //println!("=> Cmd responder loop iteration!");
218    let mut success = TofReturnCode::Unknown;
219    match cmd_receiver.connect(&fc_sub_addr) {
220      Ok(_)    => (),
221      Err(err) => {
222        error!("Unable to connect to {}! {}", fc_sub_addr, err);
223        continue;
224      }
225    }
226    
227    let cmd_packet : TofPacket;
228    match cmd_receiver.recv_bytes(zmq::DONTWAIT) {
229      Err(err)   => {
230        trace!("ZMQ socket receiving error! {err}");
231        continue;
232      }
233      Ok(buffer) => {
234        info!("Received bytes {:?}", buffer);
235        // identfiy if we have a GAPS packet
236        if buffer.len() < 4 {
237          error!("Can't deal with commands shorter than 4 bytes@");
238          continue
239        }
240        // check on the buffer
241        if buffer[0] == 0x90 && buffer[1] == 0xeb {
242          if buffer[4] != 0x46 { //0x5a?
243            // We have a GAPS packet -> FIXME:
244            info!("We received something, but it does not seem to be address to us! We are only listening to address {} right now!", 0x46);
245            continue;
246          } else {
247            info!("Received command sent through (Cra-)BFSW system!");
248            if buffer.len() < 8 {
249              error!("Received command is too short! (Smaller than 8 bytes) {:?}", buffer);
250              success = TofReturnCode::GarbledCommand;
251              send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
252              continue;
253            }
254            match TofPacket::from_bytestream(&buffer, &mut 8) {
255              Err(err) => {
256                error!("Unable to decode bytestream {:?} for command ! {:?}", buffer, err);
257                success = TofReturnCode::GarbledCommand;
258                send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
259                continue;  
260              },
261              Ok(packet) => {
262                cmd_packet = packet;
263              }
264            }
265          }
266        } else if  buffer[0] == 170 && buffer[1] == 170 {
267          info!("Got a TofPacket!");
268          match TofPacket::from_bytestream(&buffer, &mut 0) {
269            Err(err) => {
270              error!("Unable to decode bytestream {:?} for command ! {:?}", buffer, err);
271              success = TofReturnCode::GarbledCommand;
272              send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
273              continue;  
274            },
275            Ok(packet) => {
276              cmd_packet = packet;
277            }
278          }
279        } else {
280          error!("Received bytestream, but don't know how to deal with it!");
281          continue;
282        }
283        debug!("Got packet {}!", cmd_packet);
284        match cmd_packet.packet_type {
285          PacketType::TofCommandV2 => {
286            let cmd : TofCommandV2;
287            match cmd_packet.unpack::<TofCommandV2>() {
288              Ok(_cmd) => {cmd = _cmd;},
289              Err(err) => {
290                error!("Unable to decode TofCommand! {err}");
291                success = TofReturnCode::GarbledCommand;
292                send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
293                continue;
294              }
295            }
296            println!("= => Received command {}!", cmd);
297            let now = Utc::now().to_string();
298            let write_to_file = format!("{:?}: {}\n",now, cmd);
299            match log_file.write_all(&write_to_file.into_bytes()) {
300              Err(err) => {
301                error!("Writing to file to path {} failed! {}", filename, err)
302              }
303              Ok(_)    => ()
304            }
305            match log_file.sync_all() {
306              Err(err) => {
307                error!("Unable to sync file to disc! {err}");
308              },
309              Ok(_) => ()
310            }
311          
312            // actual command tree
313            match cmd.command_code {
314              TofCommandCode::DataRunStop  => {
315                println!("= => Received DataRunStop!");
316                if !dry_run { 
317                  success =  manage_liftof_cc_service("stop"); 
318                }
319              },
320              TofCommandCode::DataRunStart  => {
321                info!("Received DataRunStart!");
322                // FIXME - factor out manage_liftof_cc_service here, otherwise
323                if !dry_run { 
324                  success =  manage_liftof_cc_service("restart");
325                }
326              }
327              TofCommandCode::ShutdownRB => {
328                let mut cmd_rb_list  = cmd.payload.clone();
329                if cmd_rb_list.is_empty() {
330                  cmd_rb_list = all_rb_ids.clone();
331                }
332                info!("Received Shutdown RB command for RBs {:?}", cmd_rb_list);
333                let cmd_args     = vec![String::from("sudo"),
334                                        String::from("shutdown"),
335                                        String::from("now")]; 
336                if !args.dry_run {
337                  match ssh_command_rbs(&cmd_rb_list, cmd_args) {
338                    Err(err) => {
339                      error!("SSh-ing into RBs {:?} failed! {err}", cmd_rb_list);
340                      success = TofReturnCode::GeneralFail;
341                    }
342                    Ok(_)    => {
343                      success = TofReturnCode::Success;
344                    }
345                  }
346                }
347              }
348              TofCommandCode::ResetConfigWDefault => {
349                info!("Will reset {} with {}", cfg_file, default_cfg_file);
350                match copy_file_rename_liftof(&default_cfg_file, &next_dir) {
351                  Ok(_)    => {
352                    info!("Copy successful!");
353                    success = TofReturnCode::Success;
354                  }
355                  Err(err) => {
356                    error!("Unable to copy! {err}");
357                    success = TofReturnCode::GeneralFail;
358                  }
359                }
360              }
361              TofCommandCode::SubmitConfig => {
362                info!("Submitting the worked on config!");
363                match copy_file_rename_liftof(&cfg_file, &current_dir) {
364                  Ok(_)    => {
365                    info!("Copy successful!");
366                    success = TofReturnCode::Success;
367                  }
368                  Err(err) => { 
369                    error!("Unable to copy! {err}");
370                    success = TofReturnCode::GeneralFail;
371                  }
372                }
373              }
374              TofCommandCode::ShutdownRAT => {
375                let cmd_rb_list  = cmd.payload.clone();
376                info!("Received Shutdown RAT command for RBs {:?}", cmd_rb_list);
377                let cmd_args     = vec![String::from("sudo"),
378                                        String::from("shutdown"),
379                                        String::from("now")]; 
380                if !args.dry_run {
381                  match ssh_command_rbs(&cmd_rb_list, cmd_args) {
382                    Err(err) => {
383                      error!("SSh-ing into RBs {:?} failed! {err}", cmd_rb_list);
384                      success = TofReturnCode::GeneralFail;
385                    }
386                    Ok(_)    => {
387                      success = TofReturnCode::Success;
388                    }
389                  }
390                }
391              }
392              TofCommandCode::ShutdownCPU => {
393                let cmd_args     = vec![String::from("shutdown"),
394                                        String::from("now")]; 
395                info!("Received Shutdown command for CPU");
396                if !args.dry_run {
397                  match Command::new("sudo")
398                    //.args([&rb_address, "sudo", "systemctl", "restart", "liftof"])
399                    .args(cmd_args)
400                    .spawn() {
401                    Err(err) => {
402                      error!("Unable to spawn shutdown process on TofCPU! {err}");
403                      success = TofReturnCode::GeneralFail;
404                    }
405                    // FIXME - timeout with try wait
406                    Ok(mut child) => {
407                      match child.wait() {
408                        Err(err) => error!("Waiting for the shutdown process failed! {err}"),
409                        Ok(_)    => ()
410                      }
411                    }
412                  }
413                }
414              }
415              TofCommandCode::RBCalibration => {
416                info!("Received RBCalibration command!");
417                if cmd.payload.len() < 3 {
418                  error!("Broken RBCalibration command!");
419                  continue;
420                }
421                let pre_run_cali   = cmd.payload[0] != 0;
422                let send_packets   = cmd.payload[1] != 0;  
423                let save_waveforms = cmd.payload[2] != 0;
424                match LiftofSettings::from_toml(&cfg_file) {
425                  Err(err) => {
426                    error!("CRITICAL! Unable to parse .toml settings file! {}", err);
427                    //panic!("Unable to parse config file!");
428                    success = TofReturnCode::GeneralFail;
429                  }
430                  Ok(mut config) => {
431                    config.data_publisher_settings.send_cali_packets = send_packets;
432                    config.save_cali_wf                              = save_waveforms;
433                    config.pre_run_calibration = pre_run_cali;
434                    config.to_toml(String::from(cfg_file.clone()));
435                    info!("We changed the data publisher settings to be this {}",config.data_publisher_settings);
436                    
437                    success = TofReturnCode::Success;
438                  }
439                }   
440              }
441              TofCommandCode::SetMTConfig => {
442                info!("Will change trigger config for next run!");
443                match TriggerConfig::from_bytestream(&cmd.payload, &mut 0) {
444                  Err(err) => error!("Unable to extract TriggerConfig from command! {err}"),
445                  Ok(tcf)  => {
446                    match LiftofSettings::from_toml(&cfg_file) {
447                      Err(err) => {
448                        error!("CRITICAL! Unable to parse .toml settings file! {}", err);
449                        success = TofReturnCode::GeneralFail;
450                      }
451                      Ok(mut config) => {
452                        println!("=> We received the following trigger config {}", tcf);
453                        config.mtb_settings.from_triggerconfig(&tcf);
454                        println!("=> We changed the mtb settings to be this {}",config.mtb_settings);
455                        config.to_toml(String::from(cfg_file.clone()));
456                        success = TofReturnCode::Success;
457                      }
458                    }   
459                  }
460                }
461              }
462              TofCommandCode::SetTOFEventBuilderConfig => {
463                info!("Will change tof event builder config for next run!");
464                match TOFEventBuilderConfig::from_bytestream(&cmd.payload, &mut 0) {
465                  Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
466                  Ok(tcf)  => {
467                    info!("Received config {}",tcf);
468                    match LiftofSettings::from_toml(&cfg_file) {
469                      Err(err) => {
470                        error!("CRITICAL! Unable to parse .toml settings file! {}", err);
471                        success = TofReturnCode::GeneralFail;
472                      }
473                      Ok(mut config) => {
474                        config.event_builder_settings.from_tofeventbuilderconfig(&tcf);
475                        info!("We changed the event builder settings to be this {}",config.event_builder_settings);
476                        config.to_toml(String::from(cfg_file.clone()));
477                        success = TofReturnCode::Success;
478                      }
479                    }   
480                  }
481                }
482              }
483              TofCommandCode::SetTofRunConfig => {
484                info!("Will change tof run config for next run!");
485                match TofRunConfig::from_bytestream(&cmd.payload, &mut 0) {
486                  Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
487                  Ok(tcf)  => {
488                    info!("Received config {}",tcf);
489                    match LiftofSettings::from_toml(&cfg_file) {
490                      Err(err) => {
491                        error!("CRITICAL! Unable to parse .toml settings file! {}", err);
492                        success = TofReturnCode::GeneralFail;
493                      }
494                      Ok(mut config) => {
495                        config.from_tofrunconfig(&tcf);
496                        info!("We changed the run config to be this {}",config);
497                        config.to_toml(String::from(cfg_file.clone()));
498                        success = TofReturnCode::Success;
499                      }
500                    }   
501                  }
502                }
503              }
504              TofCommandCode::SetTofRBConfig => {
505                info!("Will change tof rb config for next run!");
506                match TofRBConfig::from_bytestream(&cmd.payload, &mut 0) {
507                  Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
508                  Ok(tcf)  => {
509                    info!("Received config {}",tcf);
510                    match LiftofSettings::from_toml(&cfg_file) {
511                      Err(err) => {
512                        error!("CRITICAL! Unable to parse .toml settings file! {}", err);
513                        success = TofReturnCode::GeneralFail;
514                      }
515                      Ok(mut config) => {
516                        config.rb_settings.from_tofrbconfig(&tcf);
517                        info!("We changed the run config to be this {}",config);
518                        config.to_toml(String::from(cfg_file.clone()));
519                        success = TofReturnCode::Success;
520                      }
521                    }   
522                  }
523                }
524              }
525              TofCommandCode::SetDataPublisherConfig => {
526                info!("Will change data publisher config for next run!");
527                let cfg_file = format!("{}/next/liftof-config.toml", staging_dir.clone());
528                match DataPublisherConfig::from_bytestream(&cmd.payload, &mut 0) {
529                  Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
530                  Ok(tcf)  => {
531                    info!("Received config {}",tcf);
532                    match LiftofSettings::from_toml(&cfg_file) {
533                      Err(err) => {
534                        error!("CRITICAL! Unable to parse .toml settings file! {}", err);
535                        success = TofReturnCode::GeneralFail;
536                      }
537                      Ok(mut config) => {
538                        config.data_publisher_settings.from_datapublisherconfig(&tcf);
539                        info!("We changed the event builder settings to be this {}",config.data_publisher_settings);
540                        config.to_toml(String::from(cfg_file));
541                        success = TofReturnCode::Success;
542                      }
543                    }   
544                  }
545                }
546              }
547              _ => {
548                error!("Dealing with command code {} has not been implemented yet!", cmd.command_code);
549                success = TofReturnCode::GeneralFail;
550              }
551            }
552            if !args.no_ack {
553              send_ack_packet(cmd.command_code, success, &cmd_sender);
554            }
555          },
556          _ => {
557            error!("Unable to deal with packet type {}!", cmd_packet.packet_type)
558          }
559        }
560      }
561    }
562  }
563}