liftof_cc/threads/
command_dispatcher.rs

1//! Command dispatcher - handle incoming requests
2//!
3//! Most requests will be satisfied by liftof-scheduler
4
5use std::thread;
6use std::sync::{
7  Arc,
8  Mutex,
9};
10
11use std::time::{
12  Duration,
13};
14
15use crossbeam_channel::{
16  Sender
17};
18
19use tof_dataclasses::packets::{
20  TofPacket
21};
22use tof_dataclasses::serialization::{
23  Serialization,
24};
25
26use liftof_lib::thread_control::ThreadControl;
27
28use crate::LIFTOF_HOTWIRE;
29
30/// Deal with incoming requests. Most will be statisfied
31/// by the external liftof-scheduler. It will communicate
32/// with the liftof-scheduler over a dedicated, 
33/// non-configurable port
34///
35/// # Arguments:
36///
37///   * thread_control : inter-thread communications,
38///                      start/stop signals.
39///                      Keeps global settings.
40///   * tp_to_sink     : send packets to global data sink 
41pub fn command_dispatcher(thread_ctrl  : Arc<Mutex<ThreadControl>>,
42                          tp_to_sink   : &Sender<TofPacket>) { 
43  
44  
45  // socket to receive commands
46  // NEW: since we have the liftof-scheduler now, this basically just 
47  // listens in on the cc_pub_addr so that it can send ack packets.
48  // The actual commanding is done by the liftof-scheduler
49  let ctx = zmq::Context::new();
50  let cmd_receiver = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
51  cmd_receiver.set_subscribe(b"").expect("Unable to subscribe to empty topic!");
52  // the "HOTWIRE" is the direct connection to the PUB socekt of the 
53  // liftof-scheduler
54  cmd_receiver.connect(LIFTOF_HOTWIRE).expect("Unable to subscribe to flight computer PUB");
55  //info!("ZMQ SUB Socket for flight cpu listener bound to {fc_sub_addr}");
56  info!("Listening on {LIFTOF_HOTWIRE}!");
57
58  // ok to block here since we haven't started yet
59  let mut sleep_time = Duration::from_secs(1);
60  match thread_ctrl.lock() {
61    Ok(mut tc) => {
62        tc.thread_cmd_dispatch_active = true;
63        sleep_time   = Duration::from_secs(tc.liftof_settings.cmd_dispatcher_settings.cmd_listener_interval_sec);
64    }
65    Err(err) => {
66        trace!("Can't acquire lock! {err}");
67    }
68  }
69
70  loop {
71    // the frequency of incoming request should be
72    // small, so we take the heat out and nap a bit
73    thread::sleep(sleep_time);
74    match cmd_receiver.connect(LIFTOF_HOTWIRE) {
75      Ok(_)    => (),
76      Err(err) => {
77        error!("Unable to connect to {}! {}", LIFTOF_HOTWIRE, err);
78      }
79    }
80    match cmd_receiver.recv_bytes(zmq::DONTWAIT) {
81      Err(err)   => {
82        trace!("ZMQ socket receiving error! {err}");
83        continue;
84      }
85      Ok(buffer) => {
86        match TofPacket::from_bytestream(&buffer, &mut 0) {
87          Err(err) => {
88            error!("Unable to decode bytestream for command ! {:?}", err);
89            continue;  
90          }
91          Ok(packet) => {
92            debug!("Got packet {}!", packet);
93            match tp_to_sink.send(packet) {
94              Err(err) => {
95                error!("Unable to send ACK packet! {err}");
96              }
97              Ok(_)    => ()
98            }
99          }
100        }
101      }
102    }
103  }
104}