1#[macro_use] extern crate log;
27
28use std::fs::{
29 OpenOptions,
30};
31
32use std::thread;
33use std::io::Write;
34use std::process::Command;
35use std::path::{
36 Path
37};
38
39use chrono::Utc;
40use clap::{
41 arg,
42 command,
43 Parser
44};
45
46use std::time::{
53 Duration,
54};
55
56use gondola_core::prelude::*;
83use gondola_core::init_env_logger;
84
85use liftof_cc::{
86 manage_liftof_cc_service,
87 ssh_command_rbs,
88 copy_file_rename_liftof,
89 LIFTOF_HOTWIRE,
90};
91
92
93
94#[derive(Parser, Debug)]
95#[command(author = "J.A.Stoessl", version, about, long_about = None)]
96#[command(propagate_version = true)]
97struct LiftofSchedArgs {
98 #[arg(short, long)]
99 config : Option<String>,
100 #[arg(long, default_value_t = false)]
103 dry_run : bool,
104 #[arg(long, default_value_t = false)]
106 no_ack : bool,
107}
108
109fn send_ack_packet(cc : TofCommandCode,
118 ret_code : TofReturnCode,
119 socket : &zmq::Socket) {
120 let mut ack = AckBfsw::new();
121 ack.ret_code1 = ret_code as u8;
122 ack.ret_code2 = cc as u8;
123 let tp = ack.pack();
124 match socket.send(tp.to_bytestream(), 0) {
125 Ok(_) => (),
126 Err(err) => error!("Unable to send ACK! {err}")
127 }
128}
129
130
131fn main() {
132 init_env_logger();
133
134 println!("{}", LIFTOF_LOGO_SHOW);
136 println!("-----------------------------------------------");
137 println!(" >> Welcome to liftof-scheduler \u{1F680} \u{1F388} ");
138 println!(" >> liftof is a software suite for the time-of-flight detector (TOF) ");
139 println!(" >> for the GAPS experiment \u{1F496}");
140 println!(" >> This is the run scheduler (liftof-scheduler)");
141 println!(" >> It starts/stops the liftof-cc service and manages run configurations");
142 println!("-----------------------------------------------\n\n");
143
144 let args = LiftofSchedArgs::parse();
145 let config : LiftofSettings;
146 let dry_run = args.dry_run;
147 let no_ack = args.no_ack;
148 match args.config {
149 None => panic!("No config file provided! Please provide a config file with --config or -c flag!"),
150 Some(cfg_file) => {
151 match LiftofSettings::from_toml(&cfg_file) {
153 Err(err) => {
154 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
155 panic!("Unable to parse config file!");
156 }
157 Ok(_cfg) => {
158 config = _cfg;
159 }
160 }
161 } } let staging_dir = config.staging_dir;
165 let cfg_file = format!("{}/next/liftof-config.toml", staging_dir.clone());
167 let next_dir = format!("{}/next", staging_dir.clone());
168 let current_dir = format!("{}/current", staging_dir.clone());
169 let default_cfg_file = format!("{}/default/liftof-config-default.toml", staging_dir.clone());
170 let db_path = config.db_path.clone();
171 let mut conn_ = connect_to_db_path(&db_path).expect("Unable to establish a connection to the DB! CHeck db_path in the liftof settings (.toml) file!");
172 let rb_list = ReadoutBoard::all().expect("Unable to retrieve RB information! Unable to continue, check db_path in the liftof settings (.toml) file and DB integrity!");
174 let mut all_rb_ids = Vec::<u8>::new();
175 for rb in rb_list {
176 all_rb_ids.push(rb.rb_id as u8);
177 }
178
179 let sleep_time = Duration::from_secs(config.cmd_dispatcher_settings.cmd_listener_interval_sec);
180 let fc_sub_addr = config.cmd_dispatcher_settings.fc_sub_address.clone();
183 let cc_pub_addr = config.cmd_dispatcher_settings.cc_server_address.clone();
184 let ctx = zmq::Context::new();
185
186 info!("Connecting to flight computer at {}", fc_sub_addr);
188 let cmd_receiver = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
189 cmd_receiver.set_subscribe(b"").expect("Unable to subscribe to empty topic!");
190 cmd_receiver.connect(&fc_sub_addr).expect("Unable to subscribe to flight computer PUB");
191 info!("ZMQ SUB Socket for flight cpu listener bound to {fc_sub_addr}");
192
193 info!("Binding socket for command dispatching to rb network to {}", cc_pub_addr);
195 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
196 if !dry_run || no_ack {
197 cmd_sender.bind(LIFTOF_HOTWIRE).expect("Unable to bind to (PUB) socket!");
198 }
199 let mut filename = config.cmd_dispatcher_settings.cmd_log_path.clone();
201 if !filename.ends_with("/") {
202 filename += "/";
203 }
204 filename += "received-commands.log";
205 let path = Path::new(&filename);
206 info!("Writing cmd log to file {filename}");
207 let mut log_file = OpenOptions::new().create(true).append(true).open("received-commands.log").expect("Unable to create file!");
208 match OpenOptions::new().create(true).append(true).open(path) {
209 Ok(_f) => {log_file = _f;},
210 Err(err) => {
211 error!("Unable to write to path {filename}! {err} Falling back to default file path");
212 }
213 }
214
215 loop {
218 thread::sleep(sleep_time);
219 let mut success = TofReturnCode::Unknown;
221 match cmd_receiver.connect(&fc_sub_addr) {
222 Ok(_) => (),
223 Err(err) => {
224 error!("Unable to connect to {}! {}", fc_sub_addr, err);
225 continue;
226 }
227 }
228
229 let cmd_packet : TofPacket;
230 match cmd_receiver.recv_bytes(zmq::DONTWAIT) {
231 Err(err) => {
232 trace!("ZMQ socket receiving error! {err}");
233 continue;
234 }
235 Ok(buffer) => {
236 info!("Received bytes {:?}", buffer);
237 if buffer.len() < 4 {
239 error!("Can't deal with commands shorter than 4 bytes@");
240 continue
241 }
242 if buffer[0] == 0x90 && buffer[1] == 0xeb {
244 if buffer[4] != 0x46 { info!("We received something, but it does not seem to be address to us! We are only listening to address {} right now!", 0x46);
247 continue;
248 } else {
249 info!("Received command sent through (Cra-)BFSW system!");
250 if buffer.len() < 8 {
251 error!("Received command is too short! (Smaller than 8 bytes) {:?}", buffer);
252 success = TofReturnCode::GarbledCommand;
253 send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
254 continue;
255 }
256 match TofPacket::from_bytestream(&buffer, &mut 8) {
257 Err(err) => {
258 error!("Unable to decode bytestream {:?} for command ! {:?}", buffer, err);
259 success = TofReturnCode::GarbledCommand;
260 send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
261 continue;
262 },
263 Ok(packet) => {
264 cmd_packet = packet;
265 }
266 }
267 }
268 } else if buffer[0] == 170 && buffer[1] == 170 {
269 info!("Got a TofPacket!");
270 match TofPacket::from_bytestream(&buffer, &mut 0) {
271 Err(err) => {
272 error!("Unable to decode bytestream {:?} for command ! {:?}", buffer, err);
273 success = TofReturnCode::GarbledCommand;
274 send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
275 continue;
276 },
277 Ok(packet) => {
278 cmd_packet = packet;
279 }
280 }
281 } else {
282 error!("Received bytestream, but don't know how to deal with it!");
283 continue;
284 }
285 debug!("Got packet {}!", cmd_packet);
286 match cmd_packet.packet_type {
287 TofPacketType::TofCommand => {
288 let cmd : TofCommand;
289 match cmd_packet.unpack::<TofCommand>() {
290 Ok(_cmd) => {cmd = _cmd;},
291 Err(err) => {
292 error!("Unable to decode TofCommand! {err}");
293 success = TofReturnCode::GarbledCommand;
294 send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
295 continue;
296 }
297 }
298 println!("= => Received command {}!", cmd);
299 let now = Utc::now().to_string();
300 let write_to_file = format!("{:?}: {}\n",now, cmd);
301 match log_file.write_all(&write_to_file.into_bytes()) {
302 Err(err) => {
303 error!("Writing to file to path {} failed! {}", filename, err)
304 }
305 Ok(_) => ()
306 }
307 match log_file.sync_all() {
308 Err(err) => {
309 error!("Unable to sync file to disc! {err}");
310 },
311 Ok(_) => ()
312 }
313
314 match cmd.command_code {
316 TofCommandCode::DataRunStop => {
317 println!("= => Received DataRunStop!");
318 if !dry_run {
319 success = manage_liftof_cc_service("stop");
320 }
321 },
322 TofCommandCode::DataRunStart => {
323 info!("Received DataRunStart!");
324 if !dry_run {
326 success = manage_liftof_cc_service("restart");
327 }
328 }
329 TofCommandCode::ShutdownRB => {
330 let mut cmd_rb_list = cmd.payload.clone();
331 if cmd_rb_list.is_empty() {
332 cmd_rb_list = all_rb_ids.clone();
333 }
334 info!("Received Shutdown RB command for RBs {:?}", cmd_rb_list);
335 let cmd_args = vec![String::from("sudo"),
336 String::from("shutdown"),
337 String::from("now")];
338 if !args.dry_run {
339 match ssh_command_rbs(&cmd_rb_list, cmd_args) {
340 Err(err) => {
341 error!("SSh-ing into RBs {:?} failed! {err}", cmd_rb_list);
342 success = TofReturnCode::GeneralFail;
343 }
344 Ok(_) => {
345 success = TofReturnCode::Success;
346 }
347 }
348 }
349 }
350 TofCommandCode::ResetConfigWDefault => {
351 info!("Will reset {} with {}", cfg_file, default_cfg_file);
352 match copy_file_rename_liftof(&default_cfg_file, &next_dir) {
353 Ok(_) => {
354 info!("Copy successful!");
355 success = TofReturnCode::Success;
356 }
357 Err(err) => {
358 error!("Unable to copy! {err}");
359 success = TofReturnCode::GeneralFail;
360 }
361 }
362 }
363 TofCommandCode::SubmitConfig => {
364 info!("Submitting the worked on config!");
365 match copy_file_rename_liftof(&cfg_file, ¤t_dir) {
366 Ok(_) => {
367 info!("Copy successful!");
368 success = TofReturnCode::Success;
369 }
370 Err(err) => {
371 error!("Unable to copy! {err}");
372 success = TofReturnCode::GeneralFail;
373 }
374 }
375 }
376 TofCommandCode::ShutdownRAT => {
377 let cmd_rb_list = cmd.payload.clone();
378 info!("Received Shutdown RAT command for RBs {:?}", cmd_rb_list);
379 let cmd_args = vec![String::from("sudo"),
380 String::from("shutdown"),
381 String::from("now")];
382 if !args.dry_run {
383 match ssh_command_rbs(&cmd_rb_list, cmd_args) {
384 Err(err) => {
385 error!("SSh-ing into RBs {:?} failed! {err}", cmd_rb_list);
386 success = TofReturnCode::GeneralFail;
387 }
388 Ok(_) => {
389 success = TofReturnCode::Success;
390 }
391 }
392 }
393 }
394 TofCommandCode::ShutdownCPU => {
395 let cmd_args = vec![String::from("shutdown"),
396 String::from("now")];
397 info!("Received Shutdown command for CPU");
398 if !args.dry_run {
399 match Command::new("sudo")
400 .args(cmd_args)
402 .spawn() {
403 Err(err) => {
404 error!("Unable to spawn shutdown process on TofCPU! {err}");
405 success = TofReturnCode::GeneralFail;
406 }
407 Ok(mut child) => {
409 match child.wait() {
410 Err(err) => error!("Waiting for the shutdown process failed! {err}"),
411 Ok(_) => ()
412 }
413 }
414 }
415 }
416 }
417 TofCommandCode::RBCalibration => {
418 info!("Received RBCalibration command!");
419 if cmd.payload.len() < 3 {
420 error!("Broken RBCalibration command!");
421 continue;
422 }
423 let pre_run_cali = cmd.payload[0] != 0;
424 let send_packets = cmd.payload[1] != 0;
425 let save_waveforms = cmd.payload[2] != 0;
426 match LiftofSettings::from_toml(&cfg_file) {
427 Err(err) => {
428 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
429 success = TofReturnCode::GeneralFail;
431 }
432 Ok(mut config) => {
433 config.data_publisher_settings.send_cali_packets = send_packets;
434 config.save_cali_wf = save_waveforms;
435 config.pre_run_calibration = pre_run_cali;
436 config.to_toml(String::from(cfg_file.clone()));
437 info!("We changed the data publisher settings to be this {}",config.data_publisher_settings);
438
439 success = TofReturnCode::Success;
440 }
441 }
442 }
443 TofCommandCode::SetMTConfig => {
444 info!("Will change trigger config for next run!");
445 match TriggerConfig::from_bytestream(&cmd.payload, &mut 0) {
446 Err(err) => error!("Unable to extract TriggerConfig from command! {err}"),
447 Ok(tcf) => {
448 match LiftofSettings::from_toml(&cfg_file) {
449 Err(err) => {
450 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
451 success = TofReturnCode::GeneralFail;
452 }
453 Ok(mut config) => {
454 println!("=> We received the following trigger config {}", tcf);
455 config.mtb_settings.from_triggerconfig(&tcf);
456 println!("=> We changed the mtb settings to be this {}",config.mtb_settings);
457 config.to_toml(String::from(cfg_file.clone()));
458 success = TofReturnCode::Success;
459 }
460 }
461 }
462 }
463 }
464 TofCommandCode::SetTOFEventBuilderConfig => {
465 info!("Will change tof event builder config for next run!");
466 match TOFEventBuilderConfig::from_bytestream(&cmd.payload, &mut 0) {
467 Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
468 Ok(tcf) => {
469 info!("Received config {}",tcf);
470 match LiftofSettings::from_toml(&cfg_file) {
471 Err(err) => {
472 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
473 success = TofReturnCode::GeneralFail;
474 }
475 Ok(mut config) => {
476 config.event_builder_settings.from_tofeventbuilderconfig(&tcf);
477 info!("We changed the event builder settings to be this {}",config.event_builder_settings);
478 config.to_toml(String::from(cfg_file.clone()));
479 success = TofReturnCode::Success;
480 }
481 }
482 }
483 }
484 }
485 TofCommandCode::SetTofRunConfig => {
486 info!("Will change tof run config for next run!");
487 match TofRunConfig::from_bytestream(&cmd.payload, &mut 0) {
488 Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
489 Ok(tcf) => {
490 info!("Received config {}",tcf);
491 match LiftofSettings::from_toml(&cfg_file) {
492 Err(err) => {
493 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
494 success = TofReturnCode::GeneralFail;
495 }
496 Ok(mut config) => {
497 config.from_tofrunconfig(&tcf);
498 info!("We changed the run config to be this {}",config);
499 config.to_toml(String::from(cfg_file.clone()));
500 success = TofReturnCode::Success;
501 }
502 }
503 }
504 }
505 }
506 TofCommandCode::SetTofRBConfig => {
507 info!("Will change tof rb config for next run!");
508 match TofRBConfig::from_bytestream(&cmd.payload, &mut 0) {
509 Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
510 Ok(tcf) => {
511 info!("Received config {}",tcf);
512 match LiftofSettings::from_toml(&cfg_file) {
513 Err(err) => {
514 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
515 success = TofReturnCode::GeneralFail;
516 }
517 Ok(mut config) => {
518 config.rb_settings.from_tofrbconfig(&tcf);
519 info!("We changed the run config to be this {}",config);
520 config.to_toml(String::from(cfg_file.clone()));
521 success = TofReturnCode::Success;
522 }
523 }
524 }
525 }
526 }
527 TofCommandCode::SetDataPublisherConfig => {
528 info!("Will change data publisher config for next run!");
529 let cfg_file = format!("{}/next/liftof-config.toml", staging_dir.clone());
530 match DataPublisherConfig::from_bytestream(&cmd.payload, &mut 0) {
531 Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
532 Ok(tcf) => {
533 info!("Received config {}",tcf);
534 match LiftofSettings::from_toml(&cfg_file) {
535 Err(err) => {
536 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
537 success = TofReturnCode::GeneralFail;
538 }
539 Ok(mut config) => {
540 config.data_publisher_settings.from_datapublisherconfig(&tcf);
541 info!("We changed the event builder settings to be this {}",config.data_publisher_settings);
542 config.to_toml(String::from(cfg_file));
543 success = TofReturnCode::Success;
544 }
545 }
546 }
547 }
548 }
549 _ => {
550 error!("Dealing with command code {} has not been implemented yet!", cmd.command_code);
551 success = TofReturnCode::GeneralFail;
552 }
553 }
554 if !args.no_ack {
555 send_ack_packet(cmd.command_code, success, &cmd_sender);
556 }
557 },
558 _ => {
559 error!("Unable to deal with packet type {}!", cmd_packet.packet_type)
560 }
561 }
562 }
563 }
564 }
565}