1#[macro_use] extern crate log;
27
28use std::fs::{
29 OpenOptions,
30};
31
32use std::thread;
33use std::io::Write;
34use std::process::Command;
35use std::path::{
36 Path
37};
38
39use chrono::Utc;
40use clap::{
41 arg,
42 command,
43 Parser
44};
45
46use liftof_lib::{
47 init_env_logger,
48 LIFTOF_LOGO_SHOW,
49 LiftofSettings,
50};
51
52use std::time::{
53 Duration,
54};
55
56use tof_dataclasses::commands::{
57 TofCommandV2,
58 TofCommandCode,
59 TofReturnCode
60};
61use tof_dataclasses::serialization::{
62 Serialization,
63 Packable
64};
65use tof_dataclasses::packets::{
66 PacketType,
67 TofPacket
68};
69use tof_dataclasses::database::{
70 connect_to_db,
71 ReadoutBoard,
72};
73use tof_dataclasses::commands::config::{
74 TriggerConfig,
75 TOFEventBuilderConfig,
76 DataPublisherConfig,
77 TofRunConfig,
78 TofRBConfig
79};
80
81use telemetry_dataclasses::packets::AckBfsw;
82
83use liftof_cc::{
84 manage_liftof_cc_service,
85 ssh_command_rbs,
86 copy_file_rename_liftof,
87 LIFTOF_HOTWIRE,
88};
89
90
91
92#[derive(Parser, Debug)]
93#[command(author = "J.A.Stoessl", version, about, long_about = None)]
94#[command(propagate_version = true)]
95struct LiftofSchedArgs {
96 #[arg(short, long)]
97 config : Option<String>,
98 #[arg(long, default_value_t = false)]
101 dry_run : bool,
102 #[arg(long, default_value_t = false)]
104 no_ack : bool,
105}
106
107fn send_ack_packet(cc : TofCommandCode,
116 ret_code : TofReturnCode,
117 socket : &zmq::Socket) {
118 let mut ack = AckBfsw::new();
119 ack.ret_code1 = ret_code as u8;
120 ack.ret_code2 = cc as u8;
121 let tp = ack.pack();
122 match socket.send(tp.to_bytestream(), 0) {
123 Ok(_) => (),
124 Err(err) => error!("Unable to send ACK! {err}")
125 }
126}
127
128
129fn main() {
130 init_env_logger();
131
132 println!("{}", LIFTOF_LOGO_SHOW);
134 println!("-----------------------------------------------");
135 println!(" >> Welcome to liftof-scheduler \u{1F680} \u{1F388} ");
136 println!(" >> liftof is a software suite for the time-of-flight detector (TOF) ");
137 println!(" >> for the GAPS experiment \u{1F496}");
138 println!(" >> This is the run scheduler (liftof-scheduler)");
139 println!(" >> It starts/stops the liftof-cc service and manages run configurations");
140 println!("-----------------------------------------------\n\n");
141
142 let args = LiftofSchedArgs::parse();
143 let config : LiftofSettings;
144 let dry_run = args.dry_run;
145 let no_ack = args.no_ack;
146 match args.config {
147 None => panic!("No config file provided! Please provide a config file with --config or -c flag!"),
148 Some(cfg_file) => {
149 match LiftofSettings::from_toml(&cfg_file) {
151 Err(err) => {
152 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
153 panic!("Unable to parse config file!");
154 }
155 Ok(_cfg) => {
156 config = _cfg;
157 }
158 }
159 } } let staging_dir = config.staging_dir;
163 let cfg_file = format!("{}/next/liftof-config.toml", staging_dir.clone());
165 let next_dir = format!("{}/next", staging_dir.clone());
166 let current_dir = format!("{}/current", staging_dir.clone());
167 let default_cfg_file = format!("{}/default/liftof-config-default.toml", staging_dir.clone());
168 let db_path = config.db_path.clone();
169 let mut conn = connect_to_db(db_path).expect("Unable to establish a connection to the DB! CHeck db_path in the liftof settings (.toml) file!");
170 let rb_list = ReadoutBoard::all(&mut conn).expect("Unable to retrieve RB information! Unable to continue, check db_path in the liftof settings (.toml) file and DB integrity!");
172 let mut all_rb_ids = Vec::<u8>::new();
173 for rb in rb_list {
174 all_rb_ids.push(rb.rb_id as u8);
175 }
176
177 let sleep_time = Duration::from_secs(config.cmd_dispatcher_settings.cmd_listener_interval_sec);
178 let fc_sub_addr = config.cmd_dispatcher_settings.fc_sub_address.clone();
181 let cc_pub_addr = config.cmd_dispatcher_settings.cc_server_address.clone();
182 let ctx = zmq::Context::new();
183
184 info!("Connecting to flight computer at {}", fc_sub_addr);
186 let cmd_receiver = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
187 cmd_receiver.set_subscribe(b"").expect("Unable to subscribe to empty topic!");
188 cmd_receiver.connect(&fc_sub_addr).expect("Unable to subscribe to flight computer PUB");
189 info!("ZMQ SUB Socket for flight cpu listener bound to {fc_sub_addr}");
190
191 info!("Binding socket for command dispatching to rb network to {}", cc_pub_addr);
193 let cmd_sender = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
194 if !dry_run || no_ack {
195 cmd_sender.bind(LIFTOF_HOTWIRE).expect("Unable to bind to (PUB) socket!");
196 }
197 let mut filename = config.cmd_dispatcher_settings.cmd_log_path.clone();
199 if !filename.ends_with("/") {
200 filename += "/";
201 }
202 filename += "received-commands.log";
203 let path = Path::new(&filename);
204 info!("Writing cmd log to file {filename}");
205 let mut log_file = OpenOptions::new().create(true).append(true).open("received-commands.log").expect("Unable to create file!");
206 match OpenOptions::new().create(true).append(true).open(path) {
207 Ok(_f) => {log_file = _f;},
208 Err(err) => {
209 error!("Unable to write to path {filename}! {err} Falling back to default file path");
210 }
211 }
212
213 loop {
216 thread::sleep(sleep_time);
217 let mut success = TofReturnCode::Unknown;
219 match cmd_receiver.connect(&fc_sub_addr) {
220 Ok(_) => (),
221 Err(err) => {
222 error!("Unable to connect to {}! {}", fc_sub_addr, err);
223 continue;
224 }
225 }
226
227 let cmd_packet : TofPacket;
228 match cmd_receiver.recv_bytes(zmq::DONTWAIT) {
229 Err(err) => {
230 trace!("ZMQ socket receiving error! {err}");
231 continue;
232 }
233 Ok(buffer) => {
234 info!("Received bytes {:?}", buffer);
235 if buffer.len() < 4 {
237 error!("Can't deal with commands shorter than 4 bytes@");
238 continue
239 }
240 if buffer[0] == 0x90 && buffer[1] == 0xeb {
242 if buffer[4] != 0x46 { info!("We received something, but it does not seem to be address to us! We are only listening to address {} right now!", 0x46);
245 continue;
246 } else {
247 info!("Received command sent through (Cra-)BFSW system!");
248 if buffer.len() < 8 {
249 error!("Received command is too short! (Smaller than 8 bytes) {:?}", buffer);
250 success = TofReturnCode::GarbledCommand;
251 send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
252 continue;
253 }
254 match TofPacket::from_bytestream(&buffer, &mut 8) {
255 Err(err) => {
256 error!("Unable to decode bytestream {:?} for command ! {:?}", buffer, err);
257 success = TofReturnCode::GarbledCommand;
258 send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
259 continue;
260 },
261 Ok(packet) => {
262 cmd_packet = packet;
263 }
264 }
265 }
266 } else if buffer[0] == 170 && buffer[1] == 170 {
267 info!("Got a TofPacket!");
268 match TofPacket::from_bytestream(&buffer, &mut 0) {
269 Err(err) => {
270 error!("Unable to decode bytestream {:?} for command ! {:?}", buffer, err);
271 success = TofReturnCode::GarbledCommand;
272 send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
273 continue;
274 },
275 Ok(packet) => {
276 cmd_packet = packet;
277 }
278 }
279 } else {
280 error!("Received bytestream, but don't know how to deal with it!");
281 continue;
282 }
283 debug!("Got packet {}!", cmd_packet);
284 match cmd_packet.packet_type {
285 PacketType::TofCommandV2 => {
286 let cmd : TofCommandV2;
287 match cmd_packet.unpack::<TofCommandV2>() {
288 Ok(_cmd) => {cmd = _cmd;},
289 Err(err) => {
290 error!("Unable to decode TofCommand! {err}");
291 success = TofReturnCode::GarbledCommand;
292 send_ack_packet(TofCommandCode::Unknown, success, &cmd_sender);
293 continue;
294 }
295 }
296 println!("= => Received command {}!", cmd);
297 let now = Utc::now().to_string();
298 let write_to_file = format!("{:?}: {}\n",now, cmd);
299 match log_file.write_all(&write_to_file.into_bytes()) {
300 Err(err) => {
301 error!("Writing to file to path {} failed! {}", filename, err)
302 }
303 Ok(_) => ()
304 }
305 match log_file.sync_all() {
306 Err(err) => {
307 error!("Unable to sync file to disc! {err}");
308 },
309 Ok(_) => ()
310 }
311
312 match cmd.command_code {
314 TofCommandCode::DataRunStop => {
315 println!("= => Received DataRunStop!");
316 if !dry_run {
317 success = manage_liftof_cc_service("stop");
318 }
319 },
320 TofCommandCode::DataRunStart => {
321 info!("Received DataRunStart!");
322 if !dry_run {
324 success = manage_liftof_cc_service("restart");
325 }
326 }
327 TofCommandCode::ShutdownRB => {
328 let mut cmd_rb_list = cmd.payload.clone();
329 if cmd_rb_list.is_empty() {
330 cmd_rb_list = all_rb_ids.clone();
331 }
332 info!("Received Shutdown RB command for RBs {:?}", cmd_rb_list);
333 let cmd_args = vec![String::from("sudo"),
334 String::from("shutdown"),
335 String::from("now")];
336 if !args.dry_run {
337 match ssh_command_rbs(&cmd_rb_list, cmd_args) {
338 Err(err) => {
339 error!("SSh-ing into RBs {:?} failed! {err}", cmd_rb_list);
340 success = TofReturnCode::GeneralFail;
341 }
342 Ok(_) => {
343 success = TofReturnCode::Success;
344 }
345 }
346 }
347 }
348 TofCommandCode::ResetConfigWDefault => {
349 info!("Will reset {} with {}", cfg_file, default_cfg_file);
350 match copy_file_rename_liftof(&default_cfg_file, &next_dir) {
351 Ok(_) => {
352 info!("Copy successful!");
353 success = TofReturnCode::Success;
354 }
355 Err(err) => {
356 error!("Unable to copy! {err}");
357 success = TofReturnCode::GeneralFail;
358 }
359 }
360 }
361 TofCommandCode::SubmitConfig => {
362 info!("Submitting the worked on config!");
363 match copy_file_rename_liftof(&cfg_file, ¤t_dir) {
364 Ok(_) => {
365 info!("Copy successful!");
366 success = TofReturnCode::Success;
367 }
368 Err(err) => {
369 error!("Unable to copy! {err}");
370 success = TofReturnCode::GeneralFail;
371 }
372 }
373 }
374 TofCommandCode::ShutdownRAT => {
375 let cmd_rb_list = cmd.payload.clone();
376 info!("Received Shutdown RAT command for RBs {:?}", cmd_rb_list);
377 let cmd_args = vec![String::from("sudo"),
378 String::from("shutdown"),
379 String::from("now")];
380 if !args.dry_run {
381 match ssh_command_rbs(&cmd_rb_list, cmd_args) {
382 Err(err) => {
383 error!("SSh-ing into RBs {:?} failed! {err}", cmd_rb_list);
384 success = TofReturnCode::GeneralFail;
385 }
386 Ok(_) => {
387 success = TofReturnCode::Success;
388 }
389 }
390 }
391 }
392 TofCommandCode::ShutdownCPU => {
393 let cmd_args = vec![String::from("shutdown"),
394 String::from("now")];
395 info!("Received Shutdown command for CPU");
396 if !args.dry_run {
397 match Command::new("sudo")
398 .args(cmd_args)
400 .spawn() {
401 Err(err) => {
402 error!("Unable to spawn shutdown process on TofCPU! {err}");
403 success = TofReturnCode::GeneralFail;
404 }
405 Ok(mut child) => {
407 match child.wait() {
408 Err(err) => error!("Waiting for the shutdown process failed! {err}"),
409 Ok(_) => ()
410 }
411 }
412 }
413 }
414 }
415 TofCommandCode::RBCalibration => {
416 info!("Received RBCalibration command!");
417 if cmd.payload.len() < 3 {
418 error!("Broken RBCalibration command!");
419 continue;
420 }
421 let pre_run_cali = cmd.payload[0] != 0;
422 let send_packets = cmd.payload[1] != 0;
423 let save_waveforms = cmd.payload[2] != 0;
424 match LiftofSettings::from_toml(&cfg_file) {
425 Err(err) => {
426 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
427 success = TofReturnCode::GeneralFail;
429 }
430 Ok(mut config) => {
431 config.data_publisher_settings.send_cali_packets = send_packets;
432 config.save_cali_wf = save_waveforms;
433 config.pre_run_calibration = pre_run_cali;
434 config.to_toml(String::from(cfg_file.clone()));
435 info!("We changed the data publisher settings to be this {}",config.data_publisher_settings);
436
437 success = TofReturnCode::Success;
438 }
439 }
440 }
441 TofCommandCode::SetMTConfig => {
442 info!("Will change trigger config for next run!");
443 match TriggerConfig::from_bytestream(&cmd.payload, &mut 0) {
444 Err(err) => error!("Unable to extract TriggerConfig from command! {err}"),
445 Ok(tcf) => {
446 match LiftofSettings::from_toml(&cfg_file) {
447 Err(err) => {
448 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
449 success = TofReturnCode::GeneralFail;
450 }
451 Ok(mut config) => {
452 println!("=> We received the following trigger config {}", tcf);
453 config.mtb_settings.from_triggerconfig(&tcf);
454 println!("=> We changed the mtb settings to be this {}",config.mtb_settings);
455 config.to_toml(String::from(cfg_file.clone()));
456 success = TofReturnCode::Success;
457 }
458 }
459 }
460 }
461 }
462 TofCommandCode::SetTOFEventBuilderConfig => {
463 info!("Will change tof event builder config for next run!");
464 match TOFEventBuilderConfig::from_bytestream(&cmd.payload, &mut 0) {
465 Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
466 Ok(tcf) => {
467 info!("Received config {}",tcf);
468 match LiftofSettings::from_toml(&cfg_file) {
469 Err(err) => {
470 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
471 success = TofReturnCode::GeneralFail;
472 }
473 Ok(mut config) => {
474 config.event_builder_settings.from_tofeventbuilderconfig(&tcf);
475 info!("We changed the event builder settings to be this {}",config.event_builder_settings);
476 config.to_toml(String::from(cfg_file.clone()));
477 success = TofReturnCode::Success;
478 }
479 }
480 }
481 }
482 }
483 TofCommandCode::SetTofRunConfig => {
484 info!("Will change tof run config for next run!");
485 match TofRunConfig::from_bytestream(&cmd.payload, &mut 0) {
486 Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
487 Ok(tcf) => {
488 info!("Received config {}",tcf);
489 match LiftofSettings::from_toml(&cfg_file) {
490 Err(err) => {
491 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
492 success = TofReturnCode::GeneralFail;
493 }
494 Ok(mut config) => {
495 config.from_tofrunconfig(&tcf);
496 info!("We changed the run config to be this {}",config);
497 config.to_toml(String::from(cfg_file.clone()));
498 success = TofReturnCode::Success;
499 }
500 }
501 }
502 }
503 }
504 TofCommandCode::SetTofRBConfig => {
505 info!("Will change tof rb config for next run!");
506 match TofRBConfig::from_bytestream(&cmd.payload, &mut 0) {
507 Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
508 Ok(tcf) => {
509 info!("Received config {}",tcf);
510 match LiftofSettings::from_toml(&cfg_file) {
511 Err(err) => {
512 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
513 success = TofReturnCode::GeneralFail;
514 }
515 Ok(mut config) => {
516 config.rb_settings.from_tofrbconfig(&tcf);
517 info!("We changed the run config to be this {}",config);
518 config.to_toml(String::from(cfg_file.clone()));
519 success = TofReturnCode::Success;
520 }
521 }
522 }
523 }
524 }
525 TofCommandCode::SetDataPublisherConfig => {
526 info!("Will change data publisher config for next run!");
527 let cfg_file = format!("{}/next/liftof-config.toml", staging_dir.clone());
528 match DataPublisherConfig::from_bytestream(&cmd.payload, &mut 0) {
529 Err(err) => error!("Unable to extract TofEventBuilderConfig from command! {err}"),
530 Ok(tcf) => {
531 info!("Received config {}",tcf);
532 match LiftofSettings::from_toml(&cfg_file) {
533 Err(err) => {
534 error!("CRITICAL! Unable to parse .toml settings file! {}", err);
535 success = TofReturnCode::GeneralFail;
536 }
537 Ok(mut config) => {
538 config.data_publisher_settings.from_datapublisherconfig(&tcf);
539 info!("We changed the event builder settings to be this {}",config.data_publisher_settings);
540 config.to_toml(String::from(cfg_file));
541 success = TofReturnCode::Success;
542 }
543 }
544 }
545 }
546 }
547 _ => {
548 error!("Dealing with command code {} has not been implemented yet!", cmd.command_code);
549 success = TofReturnCode::GeneralFail;
550 }
551 }
552 if !args.no_ack {
553 send_ack_packet(cmd.command_code, success, &cmd_sender);
554 }
555 },
556 _ => {
557 error!("Unable to deal with packet type {}!", cmd_packet.packet_type)
558 }
559 }
560 }
561 }
562 }
563}