1#[macro_use] extern crate log;
27
28use std::fs::{
29 OpenOptions,
30};
31
32use std::thread;
33use std::io::Write;
34use std::process::Command;
35use std::path::{
36 Path,
37 PathBuf
38};
39
40use chrono::Utc;
41use clap::{
42 Parser
43};
44
45use std::time::{
46 Duration,
47};
48use gondola_core::prelude::*;
50use gondola_core::init_env_logger;
51
52use liftof_cc::{
53 manage_liftof_cc_service,
54 ssh_command_rbs,
55 copy_file_rename_liftof,
56 LIFTOF_HOTWIRE,
57};
58
59
60
61#[derive(Parser, Debug)]
62#[command(author = "J.A.Stoessl", version, about, long_about = None)]
63#[command(propagate_version = true)]
64struct LiftofSchedArgs {
65 #[arg(short, long)]
66 config : Option<String>,
67 #[arg(long, default_value_t = false)]
70 dry_run : bool,
71 #[arg(long, default_value_t = false)]
73 no_ack : bool,
74}
75
76fn send_ack_packet(cc : TofCommandCode,
85 ret_code : TofReturnCode,
86 socket : &zmq::Socket) {
87 let mut ack = AckBfsw::new();
88 ack.ret_code1 = ret_code as u8;
89 ack.ret_code2 = cc as u8;
90 let tp = ack.pack();
91 match socket.send(tp.to_bytestream(), 0) {
92 Ok(_) => (),
93 Err(err) => error!("Unable to send ACK! {err}")
94 }
95}
96
97
98fn main() {
99 init_env_logger();
100
101 println!("{}", LIFTOF_LOGO_SHOW);
103 println!("-----------------------------------------------");
104 println!(" >> Welcome to liftof-scheduler \u{1F680} \u{1F388} ");
105 println!(" >> liftof is a software suite for the time-of-flight detector (TOF) ");
106 println!(" >> for the GAPS experiment \u{1F496}");
107 println!(" >> This is the run scheduler (liftof-scheduler)");
108 println!(" >> It starts/stops the liftof-cc service and manages run configurations");
109 println!("-----------------------------------------------\n\n");
110
111 let args = LiftofSchedArgs::parse();
112 let config : LiftofSettings;
113 let dry_run = args.dry_run;
114 let no_ack = args.no_ack;
115 match args.config {
116 None => panic!("No config file provided! Please provide a config file with --config or -c flag!"),
117 Some(cfg_file) => {
118 match LiftofSettings::from_toml(&cfg_file) {
120 Err(err) => {
121 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
122 panic!("Unable to parse config file!");
123 }
124 Ok(_cfg) => {
125 config = _cfg;
126 }
127 }
128 } } let staging_dir = config.staging_dir;
132 let cfg_file = format!("{}/next/liftof-config.toml", staging_dir.clone());
134 let current_dir = format!("{}/current", staging_dir.clone());
136 let default_cfg_file = format!("{}/default/liftof-config-default.toml", staging_dir.clone());
137 let db_path = config.db_path.clone();
138 let mut _conn = connect_to_db_path(&db_path).expect("Unable to establish a connection to the DB! CHeck db_path in the liftof settings (.toml) file!");
140 let rb_list = ReadoutBoard::all().expect("Unable to retrieve RB information! Unable to continue, check db_path in the liftof settings (.toml) file and DB integrity!");
142 let mut all_rb_ids = Vec::<u8>::new();
143 for rb in rb_list {
144 all_rb_ids.push(rb.rb_id as u8);
145 }
146
147 let sleep_time = Duration::from_secs(config.cmd_dispatcher_settings.cmd_listener_interval_sec);
148 let fc_sub_addr = config.cmd_dispatcher_settings.fc_sub_address.clone();
151 let cc_pub_addr = config.cmd_dispatcher_settings.cc_server_address.clone();
152 let ctx = zmq::Context::new();
153
154 info!("Connecting to flight computer at {}", fc_sub_addr);
156 let cmd_receiver = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
157 cmd_receiver.set_subscribe(b"").expect("Unable to subscribe to empty topic!");
158 cmd_receiver.connect(&fc_sub_addr).expect("Unable to subscribe to flight computer PUB");
159 info!("ZMQ SUB Socket for flight cpu listener bound to {fc_sub_addr}");
160
161 info!("Binding socket for command dispatching to rb network to {}", cc_pub_addr);
163 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
164 if !dry_run || no_ack {
165 cmd_sender.bind(LIFTOF_HOTWIRE).expect("Unable to bind to (PUB) socket!");
166 }
167 let mut filename = config.cmd_dispatcher_settings.cmd_log_path.clone();
169 if !filename.ends_with("/") {
170 filename += "/";
171 }
172 filename += "received-commands.log";
173 let path = Path::new(&filename);
174 info!("Writing cmd log to file {filename}");
175 let mut log_file = OpenOptions::new().create(true).append(true).open("received-commands.log").expect("Unable to create file!");
176 match OpenOptions::new().create(true).append(true).open(path) {
177 Ok(_f) => {log_file = _f;},
178 Err(err) => {
179 error!("Unable to write to path {filename}! {err} Falling back to default file path");
180 }
181 }
182
183 loop {
186 thread::sleep(sleep_time);
187 let mut success = TofReturnCode::Unknown;
189 match cmd_receiver.connect(&fc_sub_addr) {
190 Ok(_) => (),
191 Err(err) => {
192 error!("Unable to connect to {}! {}", fc_sub_addr, err);
193 continue;
194 }
195 }
196
197 let cmd_packet : TofPacket;
198 match cmd_receiver.recv_bytes(zmq::DONTWAIT) {
199 Err(err) => {
200 trace!("ZMQ socket receiving error! {err}");
201 continue;
202 }
203 Ok(buffer) => {
204 info!("Received bytes {:?}", buffer);
205 if buffer.len() < 4 {
207 error!("Can't deal with commands shorter than 4 bytes@");
208 continue
209 }
210 if buffer[0] == 0x90 && buffer[1] == 0xeb {
212 if buffer[4] != 0x46 { info!("We received something, but it does not seem to be address to us! We are only listening to address {} right now!", 0x46);
215 continue;
216 } else {
217 info!("Received command sent through (Cra-)BFSW system!");
218 if buffer.len() < 8 {
219 error!("Received command is too short! (Smaller than 8 bytes) {:?}", buffer);
220 success = TofReturnCode::GarbledCommand;
221 send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
222 continue;
223 }
224 match TofPacket::from_bytestream(&buffer, &mut 8) {
225 Err(err) => {
226 error!("Unable to decode bytestream {:?} for command ! {:?}", buffer, err);
227 success = TofReturnCode::GarbledCommand;
228 send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
229 continue;
230 },
231 Ok(packet) => {
232 cmd_packet = packet;
233 }
234 }
235 }
236 } else if buffer[0] == 170 && buffer[1] == 170 {
237 info!("Got a TofPacket!");
238 match TofPacket::from_bytestream(&buffer, &mut 0) {
239 Err(err) => {
240 error!("Unable to decode bytestream {:?} for command ! {:?}", buffer, err);
241 success = TofReturnCode::GarbledCommand;
242 send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
243 continue;
244 },
245 Ok(packet) => {
246 cmd_packet = packet;
247 }
248 }
249 } else {
250 error!("Received bytestream, but don't know how to deal with it!");
251 continue;
252 }
253 debug!("Got packet {}!", cmd_packet);
254 match cmd_packet.packet_type {
255 TofPacketType::TofCommand => {
256 let cmd : TofCommand;
257 match cmd_packet.unpack::<TofCommand>() {
258 Ok(_cmd) => {cmd = _cmd;},
259 Err(err) => {
260 error!("Unable to decode TofCommand! {err}");
261 success = TofReturnCode::GarbledCommand;
262 send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
263 continue;
264 }
265 }
266 println!("= => Received command {}!", cmd);
267 let now = Utc::now().to_string();
268 let write_to_file = format!("{:?}: {}\n",now, cmd);
269 match log_file.write_all(&write_to_file.into_bytes()) {
270 Err(err) => {
271 error!("Writing to file to path {} failed! {}", filename, err)
272 }
273 Ok(_) => ()
274 }
275 match log_file.sync_all() {
276 Err(err) => {
277 error!("Unable to sync file to disc! {err}");
278 },
279 Ok(_) => ()
280 }
281
282 match cmd.command_code {
284 TofCommandCode::DataRunStop => {
285 println!("= => Received DataRunStop!");
286 if !dry_run {
287 success = manage_liftof_cc_service("stop");
288 }
289 },
290 TofCommandCode::DataRunStart => {
291 info!("Received DataRunStart!");
292 if !dry_run {
294 success = manage_liftof_cc_service("restart");
295 }
296 }
297 TofCommandCode::ShutdownRB => {
298 let mut cmd_rb_list = cmd.payload.clone();
299 if cmd_rb_list.is_empty() {
300 cmd_rb_list = all_rb_ids.clone();
301 }
302 info!("Received Shutdown RB command for RBs {:?}", cmd_rb_list);
303 let cmd_args = vec![String::from("sudo"),
304 String::from("shutdown"),
305 String::from("now")];
306 if !args.dry_run {
307 match ssh_command_rbs(&cmd_rb_list, cmd_args, false) {
308 Err(err) => {
309 error!("SSh-ing into RBs {:?} failed! {err}", cmd_rb_list);
310 success = TofReturnCode::GeneralFail;
311 }
312 Ok(_) => {
313 success = TofReturnCode::Success;
314 }
315 }
316 }
317 }
318 TofCommandCode::ResetConfigWDefault => {
321 let this_cfg_file = "/home/gaps/staging/current/liftof-config.toml";
322 info!("Will reset {} with {}", this_cfg_file, default_cfg_file);
323 match copy_file_rename_liftof(&default_cfg_file, ¤t_dir) {
324 Ok(_) => {
325 info!("Copy successful!");
326 success = TofReturnCode::Success;
327 }
328 Err(err) => {
329 error!("Unable to copy! {err}");
330 success = TofReturnCode::GeneralFail;
331 }
332 }
333 }
334 TofCommandCode::SubmitConfig => {
335 info!("Submitting the worked on config!");
336 match copy_file_rename_liftof(&cfg_file, ¤t_dir) {
337 Ok(_) => {
338 info!("Copy successful!");
339 success = TofReturnCode::Success;
340 }
341 Err(err) => {
342 error!("Unable to copy! {err}");
343 success = TofReturnCode::GeneralFail;
344 }
345 }
346 }
347 TofCommandCode::ShutdownRAT => {
348 let cmd_rb_list = cmd.payload.clone();
349 info!("Received Shutdown RAT command for RBs {:?}", cmd_rb_list);
350 let cmd_args = vec![String::from("sudo"),
351 String::from("shutdown"),
352 String::from("now")];
353 if !args.dry_run {
354 match ssh_command_rbs(&cmd_rb_list, cmd_args, false) {
355 Err(err) => {
356 error!("SSh-ing into RBs {:?} failed! {err}", cmd_rb_list);
357 success = TofReturnCode::GeneralFail;
358 }
359 Ok(_) => {
360 success = TofReturnCode::Success;
361 }
362 }
363 }
364 }
365 TofCommandCode::ShutdownCPU => {
366 let cmd_args = vec![String::from("shutdown"),
367 String::from("now")];
368 info!("Received Shutdown command for CPU");
369 if !args.dry_run {
370 match Command::new("sudo")
371 .args(cmd_args)
373 .spawn() {
374 Err(err) => {
375 error!("Unable to spawn shutdown process on TofCPU! {err}");
376 success = TofReturnCode::GeneralFail;
377 }
378 Ok(mut child) => {
380 match child.wait() {
381 Err(err) => error!("Waiting for the shutdown process failed! {err}"),
382 Ok(_) => ()
383 }
384 }
385 }
386 }
387 }
388 TofCommandCode::RBCalibration => {
389 info!("Received RBCalibration command!");
390 if cmd.payload.len() < 3 {
391 error!("Broken RBCalibration command!");
392 continue;
393 }
394 let pre_run_cali = cmd.payload[0] != 0;
395 let send_packets = cmd.payload[1] != 0;
396 let save_waveforms = cmd.payload[2] != 0;
397 match LiftofSettings::from_toml(&cfg_file) {
398 Err(err) => {
399 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
400 success = TofReturnCode::GeneralFail;
402 }
403 Ok(mut config) => {
404 config.data_publisher_settings.send_cali_packets = send_packets;
405 config.save_cali_wf = save_waveforms;
406 config.pre_run_calibration = pre_run_cali;
407 config.to_toml(String::from(cfg_file.clone()));
408 info!("We changed the data publisher settings to be this {}",config.data_publisher_settings);
409
410 success = TofReturnCode::Success;
411 }
412 }
413 }
414 TofCommandCode::SetMTConfig => {
415 info!("Will change trigger config for next run!");
416 match TriggerConfig::from_bytestream(&cmd.payload, &mut 0) {
417 Err(err) => error!("Unable to extract TriggerConfig from command! {err}"),
418 Ok(tcf) => {
419 match LiftofSettings::from_toml(&cfg_file) {
420 Err(err) => {
421 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
422 success = TofReturnCode::GeneralFail;
423 }
424 Ok(mut config) => {
425 println!("=> We received the following trigger config {}", tcf);
426 config.mtb_settings.from_triggerconfig(&tcf);
427 println!("=> We changed the mtb settings to be this {}",config.mtb_settings);
428 config.to_toml(String::from(cfg_file.clone()));
429 success = TofReturnCode::Success;
430 }
431 }
432 }
433 }
434 }
435 TofCommandCode::SetTofRunConfig => {
457 info!("Will change tof run config for next run!");
458 match TofRunConfig::from_bytestream(&cmd.payload, &mut 0) {
459 Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
460 Ok(tcf) => {
461 info!("Received config {}",tcf);
462 match LiftofSettings::from_toml(&cfg_file) {
463 Err(err) => {
464 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
465 success = TofReturnCode::GeneralFail;
466 }
467 Ok(mut config) => {
468 config.from_tofrunconfig(&tcf);
469 info!("We changed the run config to be this {}",config);
470 config.to_toml(String::from(cfg_file.clone()));
471 success = TofReturnCode::Success;
472 }
473 }
474 }
475 }
476 }
477 TofCommandCode::SetTofRBConfig => {
478 info!("Will change tof rb config for next run!");
479 match TofRBConfig::from_bytestream(&cmd.payload, &mut 0) {
480 Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
481 Ok(tcf) => {
482 info!("Received config {}",tcf);
483 match LiftofSettings::from_toml(&cfg_file) {
484 Err(err) => {
485 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
486 success = TofReturnCode::GeneralFail;
487 }
488 Ok(mut config) => {
489 config.rb_settings.from_tofrbconfig(&tcf);
490 info!("We changed the run config to be this {}",config);
491 config.to_toml(String::from(cfg_file.clone()));
492 success = TofReturnCode::Success;
493 }
494 }
495 }
496 }
497 }
498 TofCommandCode::SetDataPublisherConfig => {
499 info!("Will change data publisher config for next run!");
500 let cfg_file = format!("{}/next/liftof-config.toml", staging_dir.clone());
501 match DataPublisherConfig::from_bytestream(&cmd.payload, &mut 0) {
502 Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
503 Ok(tcf) => {
504 info!("Received config {}",tcf);
505 match LiftofSettings::from_toml(&cfg_file) {
506 Err(err) => {
507 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
508 success = TofReturnCode::GeneralFail;
509 }
510 Ok(mut config) => {
511 config.data_publisher_settings.from_datapublisherconfig(&tcf);
512 info!("We changed the event builder settings to be this {}",config.data_publisher_settings);
513 config.to_toml(String::from(cfg_file));
514 success = TofReturnCode::Success;
515 }
516 }
517 }
518 }
519 }
520 TofCommandCode::RunScriptAlfa => {
522 info!("Will execute run script action Alfa!");
523 let script_path = "/home/gaps/bin/scripts/run-script-alfa.sh";
524 let mut command = Command::new("bash");
525 command.arg(script_path);
526 let output = command.output().expect("Failed to execute script");
527 info!("Script stdout:\n{}", String::from_utf8_lossy(&output.stdout));
528 info!("Script stderr:\n{}", String::from_utf8_lossy(&output.stderr));
529 if output.status.success() {
530 info!("Script executed successfully!");
531 success = TofReturnCode::GeneralFail;
532 } else {
533 error!("Script execution failed with status: {:?}", output.status.code());
534 success = TofReturnCode::Success;
535 }
536 }
537 TofCommandCode::RunScriptBravo => {
538 info!("Will execute run script action Bravo!");
539 let script_path = "/home/gaps/bin/scripts/run-script-bravo.sh";
540 let mut command = Command::new("bash");
541 command.arg(script_path);
542 let output = command.output().expect("Failed to execute script");
543 info!("Script stdout:\n{}", String::from_utf8_lossy(&output.stdout));
544 info!("Script stderr:\n{}", String::from_utf8_lossy(&output.stderr));
545 if output.status.success() {
546 info!("Script executed successfully!");
547 success = TofReturnCode::GeneralFail;
548 } else {
549 error!("Script execution failed with status: {:?}", output.status.code());
550 success = TofReturnCode::Success;
551 }
552 }
553 TofCommandCode::RunScriptCharlie => {
554 info!("Will execute run script action Charlie!");
555 let script_path = "/home/gaps/bin/scripts/run-script-charlie.sh";
556 let mut command = Command::new("bash");
557 command.arg(script_path);
558 let output = command.output().expect("Failed to execute script");
559 info!("Script stdout:\n{}", String::from_utf8_lossy(&output.stdout));
560 info!("Script stderr:\n{}", String::from_utf8_lossy(&output.stderr));
561 if output.status.success() {
562 info!("Script executed successfully!");
563 success = TofReturnCode::GeneralFail;
564 } else {
565 error!("Script execution failed with status: {:?}", output.status.code());
566 success = TofReturnCode::Success;
567 }
568 }
569 TofCommandCode::RunScriptWhiskey => {
570 info!("Will execute run script action Whiskey!");
571 let script_path = "/home/gaps/bin/scripts/run-script-whiskey.sh";
572 let mut command = Command::new("bash");
573 command.arg(script_path);
574 let output = command.output().expect("Failed to execute script");
575 info!("Script stdout:\n{}", String::from_utf8_lossy(&output.stdout));
576 info!("Script stderr:\n{}", String::from_utf8_lossy(&output.stderr));
577 if output.status.success() {
578 info!("Script executed successfully!");
579 success = TofReturnCode::GeneralFail;
580 } else {
581 error!("Script execution failed with status: {:?}", output.status.code());
582 success = TofReturnCode::Success;
583 }
584 }
585 TofCommandCode::RunScriptTango => {
586 info!("Will execute run script action Tango!");
587 let script_path = "/home/gaps/bin/scripts/run-script-tango.sh";
588 let mut command = Command::new("bash");
589 command.arg(script_path);
590 let output = command.output().expect("Failed to execute script");
591 info!("Script stdout:\n{}", String::from_utf8_lossy(&output.stdout));
592 info!("Script stderr:\n{}", String::from_utf8_lossy(&output.stderr));
593 if output.status.success() {
594 info!("Script executed successfully!");
595 success = TofReturnCode::GeneralFail;
596 } else {
597 error!("Script execution failed with status: {:?}", output.status.code());
598 success = TofReturnCode::Success;
599 }
600 }
601 TofCommandCode::RunScriptFoxtrott => {
602 info!("Will execute run script action Foxtrott!");
603 let script_path = "/home/gaps/bin/scripts/run-script-foxtrott.sh";
604 let mut command = Command::new("bash");
605 command.arg(script_path);
606 let output = command.output().expect("Failed to execute script");
607 info!("Script stdout:\n{}", String::from_utf8_lossy(&output.stdout));
608 info!("Script stderr:\n{}", String::from_utf8_lossy(&output.stderr));
609 if output.status.success() {
610 info!("Script executed successfully!");
611 success = TofReturnCode::GeneralFail;
612 } else {
613 error!("Script execution failed with status: {:?}", output.status.code());
614 success = TofReturnCode::Success;
615 }
616 }
617 TofCommandCode::UploadConfigDiff => {
618 info!("Applying received patch to config file");
619 let compressed_diff = cmd.payload.clone();
620 info!("Received compressed diff {:?}", compressed_diff);
621 info!("Using config {}", "/home/gaps/staging/current/liftof-config.toml");
622 match apply_diff_to_file(compressed_diff, "/home/gaps/staging/current/liftof-config.toml") {
623 Ok(_) => {
624 success = TofReturnCode::Success;
625 }
626 Err(err) => {
627 error!("Applying the patch failed! {err}");
628 success = TofReturnCode::GeneralFail;
629 }
630 }
631 }
632 TofCommandCode::RequestLiftofSettings => {
633 info!("Will stop/restart run to be able to send config file over socket");
634 if !dry_run {
635 let mut pack = TofPacket::new();
636 pack.packet_type = TofPacketType::LiftofSettings;
637 let current_config = PathBuf::from("/home/gaps/staging/current/liftof-config.toml");
638 match compress_toml(¤t_config) {
639 Ok(payload) => {
640 pack.payload = payload;
641 success = TofReturnCode::Success;
642 }
643 Err(err) => {
644 error!("Unable to compress liftof-settings! {err}");
645 success = TofReturnCode::GeneralFail;
646 }
647 }
648 match cmd_sender.send(pack.to_bytestream(), 0) {
649 Ok(_) => (),
650 Err(err) => error!("Unable to send Liftof settings! {err}")
651 }
652 }
653 }
654 _ => {
655 error!("Dealing with command code {} has not been implemented yet!", cmd.command_code);
656 success = TofReturnCode::GeneralFail;
657 }
658 }
659 if !args.no_ack {
660 send_ack_packet(cmd.command_code, success, &cmd_sender);
661 }
662 },
663 _ => {
664 error!("Unable to deal with packet type {}!", cmd_packet.packet_type)
665 }
666 }
667 }
668 }
669 }
670}