liftof_cc/threads/
command_dispatcher.rs

1//! Command dispatcher - handle incoming requests
2//!
3//! Most requests will be satisfied by liftof-scheduler
4
5use std::thread;
6use std::sync::{
7  Arc,
8  Mutex,
9};
10
11use std::time::{
12  Duration,
13};
14
15use crossbeam_channel::{
16  Sender
17};
18
19//use tof_dataclasses::packets::{
20//  TofPacket
21//};
22//use tof_dataclasses::serialization::{
23//  Serialization,
24//};
25//
26//use liftof_lib::thread_control::ThreadControl;
27use gondola_core::prelude::*;
28
29use crate::LIFTOF_HOTWIRE;
30
31/// Deal with incoming requests. Most will be statisfied
32/// by the external liftof-scheduler. It will communicate
33/// with the liftof-scheduler over a dedicated, 
34/// non-configurable port
35///
36/// # Arguments:
37///
38///   * thread_control : inter-thread communications,
39///                      start/stop signals.
40///                      Keeps global settings.
41///   * tp_to_sink     : send packets to global data sink 
42pub fn command_dispatcher(thread_ctrl  : Arc<Mutex<ThreadControl>>,
43                          tp_to_sink   : &Sender<TofPacket>) { 
44  
45  
46  // socket to receive commands
47  // NEW: since we have the liftof-scheduler now, this basically just 
48  // listens in on the cc_pub_addr so that it can send ack packets.
49  // The actual commanding is done by the liftof-scheduler
50  let ctx = zmq::Context::new();
51  let cmd_receiver = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
52  cmd_receiver.set_subscribe(b"").expect("Unable to subscribe to empty topic!");
53  // the "HOTWIRE" is the direct connection to the PUB socekt of the 
54  // liftof-scheduler
55  cmd_receiver.connect(LIFTOF_HOTWIRE).expect("Unable to subscribe to flight computer PUB");
56  //info!("ZMQ SUB Socket for flight cpu listener bound to {fc_sub_addr}");
57  info!("Listening on {LIFTOF_HOTWIRE}!");
58
59  // ok to block here since we haven't started yet
60  let mut sleep_time = Duration::from_secs(1);
61  match thread_ctrl.lock() {
62    Ok(mut tc) => {
63        tc.thread_cmd_dispatch_active = true;
64        sleep_time   = Duration::from_secs(tc.liftof_settings.cmd_dispatcher_settings.cmd_listener_interval_sec);
65    }
66    Err(err) => {
67        trace!("Can't acquire lock! {err}");
68    }
69  }
70
71  loop {
72    // the frequency of incoming request should be
73    // small, so we take the heat out and nap a bit
74    thread::sleep(sleep_time);
75    match cmd_receiver.connect(LIFTOF_HOTWIRE) {
76      Ok(_)    => (),
77      Err(err) => {
78        error!("Unable to connect to {}! {}", LIFTOF_HOTWIRE, err);
79      }
80    }
81    match cmd_receiver.recv_bytes(zmq::DONTWAIT) {
82      Err(err)   => {
83        trace!("ZMQ socket receiving error! {err}");
84        continue;
85      }
86      Ok(buffer) => {
87        match TofPacket::from_bytestream(&buffer, &mut 0) {
88          Err(err) => {
89            error!("Unable to decode bytestream for command ! {:?}", err);
90            continue;  
91          }
92          Ok(packet) => {
93            debug!("Got packet {}!", packet);
94            match tp_to_sink.send(packet) {
95              Err(err) => {
96                error!("Unable to send ACK packet! {err}");
97              }
98              Ok(_)    => ()
99            }
100          }
101        }
102      }
103    }
104  }
105}