liftof_cc/threads/
command_dispatcher.rs1use std::thread;
6use std::sync::{
7 Arc,
8 Mutex,
9};
10
11use std::time::{
12 Duration,
13};
14
15use crossbeam_channel::{
16 Sender
17};
18
19use tof_dataclasses::packets::{
20 TofPacket
21};
22use tof_dataclasses::serialization::{
23 Serialization,
24};
25
26use liftof_lib::thread_control::ThreadControl;
27
28use crate::LIFTOF_HOTWIRE;
29
30pub fn command_dispatcher(thread_ctrl : Arc<Mutex<ThreadControl>>,
42 tp_to_sink : &Sender<TofPacket>) {
43
44
45 let ctx = zmq::Context::new();
50 let cmd_receiver = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
51 cmd_receiver.set_subscribe(b"").expect("Unable to subscribe to empty topic!");
52 cmd_receiver.connect(LIFTOF_HOTWIRE).expect("Unable to subscribe to flight computer PUB");
55 info!("Listening on {LIFTOF_HOTWIRE}!");
57
58 let mut sleep_time = Duration::from_secs(1);
60 match thread_ctrl.lock() {
61 Ok(mut tc) => {
62 tc.thread_cmd_dispatch_active = true;
63 sleep_time = Duration::from_secs(tc.liftof_settings.cmd_dispatcher_settings.cmd_listener_interval_sec);
64 }
65 Err(err) => {
66 trace!("Can't acquire lock! {err}");
67 }
68 }
69
70 loop {
71 thread::sleep(sleep_time);
74 match cmd_receiver.connect(LIFTOF_HOTWIRE) {
75 Ok(_) => (),
76 Err(err) => {
77 error!("Unable to connect to {}! {}", LIFTOF_HOTWIRE, err);
78 }
79 }
80 match cmd_receiver.recv_bytes(zmq::DONTWAIT) {
81 Err(err) => {
82 trace!("ZMQ socket receiving error! {err}");
83 continue;
84 }
85 Ok(buffer) => {
86 match TofPacket::from_bytestream(&buffer, &mut 0) {
87 Err(err) => {
88 error!("Unable to decode bytestream for command ! {:?}", err);
89 continue;
90 }
91 Ok(packet) => {
92 debug!("Got packet {}!", packet);
93 match tp_to_sink.send(packet) {
94 Err(err) => {
95 error!("Unable to send ACK packet! {err}");
96 }
97 Ok(_) => ()
98 }
99 }
100 }
101 }
102 }
103 }
104}