liftof_cc/threads/
command_dispatcher.rs1use std::thread;
6use std::sync::{
7 Arc,
8 Mutex,
9};
10
11use std::time::{
12 Duration,
13};
14
15use crossbeam_channel::{
16 Sender
17};
18
19use gondola_core::prelude::*;
28
29use crate::LIFTOF_HOTWIRE;
30
31pub fn command_dispatcher(thread_ctrl : Arc<Mutex<ThreadControl>>,
43 tp_to_sink : &Sender<TofPacket>) {
44
45
46 let ctx = zmq::Context::new();
51 let cmd_receiver = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
52 cmd_receiver.set_subscribe(b"").expect("Unable to subscribe to empty topic!");
53 cmd_receiver.connect(LIFTOF_HOTWIRE).expect("Unable to subscribe to flight computer PUB");
56 info!("Listening on {LIFTOF_HOTWIRE}!");
58
59 let mut sleep_time = Duration::from_secs(1);
61 match thread_ctrl.lock() {
62 Ok(mut tc) => {
63 tc.thread_cmd_dispatch_active = true;
64 sleep_time = Duration::from_secs(tc.liftof_settings.cmd_dispatcher_settings.cmd_listener_interval_sec);
65 }
66 Err(err) => {
67 trace!("Can't acquire lock! {err}");
68 }
69 }
70
71 loop {
72 thread::sleep(sleep_time);
75 match cmd_receiver.connect(LIFTOF_HOTWIRE) {
76 Ok(_) => (),
77 Err(err) => {
78 error!("Unable to connect to {}! {}", LIFTOF_HOTWIRE, err);
79 }
80 }
81 match cmd_receiver.recv_bytes(zmq::DONTWAIT) {
82 Err(err) => {
83 trace!("ZMQ socket receiving error! {err}");
84 continue;
85 }
86 Ok(buffer) => {
87 match TofPacket::from_bytestream(&buffer, &mut 0) {
88 Err(err) => {
89 error!("Unable to decode bytestream for command ! {:?}", err);
90 continue;
91 }
92 Ok(packet) => {
93 debug!("Got packet {}!", packet);
94 match tp_to_sink.send(packet) {
95 Err(err) => {
96 error!("Unable to send ACK packet! {err}");
97 }
98 Ok(_) => ()
99 }
100 }
101 }
102 }
103 }
104 }
105}