liftof_cc/threads/command_dispatcher.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
//! Command dispatcher - handle incoming requests
//!
//! Most requests will be satisfied by liftof-scheduler
use std::thread;
use std::sync::{
Arc,
Mutex,
};
use std::time::{
Duration,
};
use crossbeam_channel::{
Sender
};
use tof_dataclasses::packets::{
TofPacket
};
use tof_dataclasses::serialization::{
Serialization,
};
use liftof_lib::thread_control::ThreadControl;
use crate::LIFTOF_HOTWIRE;
/// Deal with incoming requests. Most will be statisfied
/// by the external liftof-scheduler. It will communicate
/// with the liftof-scheduler over a dedicated,
/// non-configurable port
///
/// # Arguments:
///
/// * thread_control : inter-thread communications,
/// start/stop signals.
/// Keeps global settings.
/// * tp_to_sink : send packets to global data sink
pub fn command_dispatcher(thread_ctrl : Arc<Mutex<ThreadControl>>,
tp_to_sink : &Sender<TofPacket>) {
// socket to receive commands
// NEW: since we have the liftof-scheduler now, this basically just
// listens in on the cc_pub_addr so that it can send ack packets.
// The actual commanding is done by the liftof-scheduler
let ctx = zmq::Context::new();
let cmd_receiver = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
cmd_receiver.set_subscribe(b"").expect("Unable to subscribe to empty topic!");
// the "HOTWIRE" is the direct connection to the PUB socekt of the
// liftof-scheduler
cmd_receiver.connect(LIFTOF_HOTWIRE).expect("Unable to subscribe to flight computer PUB");
//info!("ZMQ SUB Socket for flight cpu listener bound to {fc_sub_addr}");
info!("Listening on {LIFTOF_HOTWIRE}!");
// ok to block here since we haven't started yet
let mut sleep_time = Duration::from_secs(1);
match thread_ctrl.lock() {
Ok(mut tc) => {
tc.thread_cmd_dispatch_active = true;
sleep_time = Duration::from_secs(tc.liftof_settings.cmd_dispatcher_settings.cmd_listener_interval_sec);
}
Err(err) => {
trace!("Can't acquire lock! {err}");
}
}
loop {
// the frequency of incoming request should be
// small, so we take the heat out and nap a bit
thread::sleep(sleep_time);
match cmd_receiver.connect(LIFTOF_HOTWIRE) {
Ok(_) => (),
Err(err) => {
error!("Unable to connect to {}! {}", LIFTOF_HOTWIRE, err);
}
}
match cmd_receiver.recv_bytes(zmq::DONTWAIT) {
Err(err) => {
trace!("ZMQ socket receiving error! {err}");
continue;
}
Ok(buffer) => {
match TofPacket::from_bytestream(&buffer, &mut 0) {
Err(err) => {
error!("Unable to decode bytestream for command ! {:?}", err);
continue;
}
Ok(packet) => {
debug!("Got packet {}!", packet);
match tp_to_sink.send(packet) {
Err(err) => {
error!("Unable to send ACK packet! {err}");
}
Ok(_) => ()
}
}
}
}
}
}
}