liftof_rb/threads/cmd_responder.rs
1//! React to commands while running. This feature has been stripped down, due to
2//! a change in philosophy - whenever a setting is changed, a new run (and thus
3//! a new config file) should be created.
4//!
5//! However, there are some things which still need to be done during runtime, e.g.
6//! taking calibration data
7// This file is part of gaps-online-software and published
8// under the GPLv3 license
9
10use std::time::{
11 //Instant,
12 Duration,
13};
14use std::thread;
15use std::sync::{
16 Arc,
17 Mutex,
18};
19
20use crossbeam_channel::Sender;
21//use tof_control::helper::pa_type::PASetBias;
22use gondola_core::prelude::*;
23
24use crate::api::{rb_calibration};
25use crate::control::get_board_id_string;
26
27/// Centrailized command management
28///
29/// Maintain 0MQ command connection and faciliate
30/// forwarding of commands and responses
31///
32/// # Arguments
33///
34/// * cmd_server_address : The full address string e.g. tcp://1.1.1.1:12345
35/// where the command server is publishing commands..
36/// * run_config : The default runconfig. Defined by reading in the
37/// config file when the code boots up.
38/// When we receive a simple DataRunStartCommand,
39/// we will run this configuration
40/// * run_config_sender : A sender to send the dedicated run config to the
41/// runner
42/// * tp_to_pub : Send TofPackets to the data pub.
43/// Some TOF commands might trigger
44/// additional information to get
45/// send.
46/// * address_for_cali : The local (self) PUB address, so that the rb_calibratoin,
47/// can subscribe to it to loop itself the event packets
48/// * thread_control : Manage thread control signals, e.g. stop
49pub fn cmd_responder(cmd_server_address : String,
50 run_config : &RunConfig,
51 run_config_sender : &Sender<RunConfig>,
52 tp_to_pub : &Sender<TofPacket>,
53 address_for_cali : String,
54 thread_control : Arc<Mutex<ThreadControl>>) {
55 // create 0MQ sockedts
56 //let one_milli = time::Duration::from_millis(1);
57 //let port = DATAPORT.to_string();
58 //let cmd_address = build_tcp_from_ip(cmd_server_ip,port);
59 //// we will subscribe to two types of messages, BRCT and RB + 2 digits
60 //// of board id
61 let topic_board = get_board_id_string().expect("Can not get board id!");
62 let topic_broadcast = String::from("BRCT");
63 let ctx = zmq::Context::new();
64 // I guess expect is fine here, see above
65 let cmd_socket = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
66 info!("Will set up 0MQ SUB socket to listen for commands at address {cmd_server_address}");
67 let mut is_connected = false;
68 match cmd_socket.connect(&cmd_server_address) {
69 Err(err) => warn!("Not able to connect to {}, Error {err}", cmd_server_address),
70 Ok(_) => {
71 info!("Connected to CnC server at {}", cmd_server_address);
72 match cmd_socket.set_subscribe(&topic_broadcast.as_bytes()) {
73 Err(err) => error!("Can not subscribe to {topic_broadcast}, err {err}"),
74 Ok(_) => ()
75 }
76 match cmd_socket.set_subscribe(&topic_board.as_bytes()) {
77 Err(err) => error!("Can not subscribe to {topic_board}, err {err}"),
78 Ok(_) => ()
79 }
80 is_connected = true;
81 }
82 }
83
84 //let mut heartbeat = Instant::now();
85
86 // I don't know if we need this, maybe the whole block can go away.
87 // Originally I thought the RBs get pinged every x seconds and if we
88 // don't see the ping, we reconnect to the socket. But I don't know
89 // if that scenario actually occurs.
90 // Paolo: instead of leaving the connection always open we might
91 // want to reopen it if its not reachable anymore (so like command-oriented)...
92 //warn!("TODO: Heartbeat feature not yet implemented on C&C side");
93 //let heartbeat_received = false;
94 let mut save_cali_wf = false;
95 loop {
96 match thread_control.lock() {
97 Ok(tc) => {
98 save_cali_wf = tc.liftof_settings.save_cali_wf;
99 if tc.stop_flag {
100 info!("Received stop signal. Will stop thread!");
101 break;
102 }
103 },
104 Err(err) => {
105 trace!("Can't acquire lock! {err}");
106 },
107 }
108 // we need to make sure to connect to our server
109 // The startup is a bit tricky... FIXME
110 if !is_connected {
111 match cmd_socket.connect(&cmd_server_address) {
112 Err(err) => {
113 debug!("Not able to connect to {}! {err}", cmd_server_address);
114 thread::sleep(Duration::from_millis(200));
115 continue;
116 }
117 Ok(_) => {
118 info!("Connected to CnC server at {}", cmd_server_address);
119 match cmd_socket.set_subscribe(&topic_broadcast.as_bytes()) {
120 Err(err) => error!("Can not subscribe to {topic_broadcast}, err {err}"),
121 Ok(_) => ()
122 }
123 match cmd_socket.set_subscribe(&topic_board.as_bytes()) {
124 Err(err) => error!("Can not subscribe to {topic_board}, err {err}"),
125 Ok(_) => ()
126 }
127 is_connected = true;
128 }
129 }
130 }
131 match cmd_socket.recv_bytes(0) {
132 //match cmd_socket.recv_bytes(zmq::DONTWAIT) {
133 Err(err) => trace!("Problem receiving command over 0MQ ! Err {err}"),
134 Ok(cmd_bytes) => {
135 debug!("Received bytes {}", cmd_bytes.len());
136 // it will always be a tof packet
137 match TofPacket::from_bytestream(&cmd_bytes, &mut 4) {
138 Err(err) => {
139 error!("Can not decode TofPacket! bytes {:?}, error {err}", cmd_bytes);
140 },
141 Ok(tp) => {
142 match tp.packet_type {
143 // Not sure about that yet
144 //PacketType::BfswAckPacket => {
145 // // just forward this for now
146 // match tp_to_pub.send(tp) {
147 // Err(err) => error!("Unable to forward Bfsw Ack packet! {err}"),
148 // Ok(_) => ()
149 // }
150 // //return_val = Ok(TofCommandCode::Ping);
151 //}
152 TofPacketType::TofCommand => {
153 let cmd : TofCommand;
154 match tp.unpack::<TofCommand>() {
155 Ok(_cmd) => {cmd = _cmd;}
156 Err(err) => {
157 error!("Unable to unpack TofCommand! {err}");
158 continue;
159 }
160 }
161 match cmd.command_code {
162 TofCommandCode::DataRunStart => {
163 let rc = run_config.clone();
164 match run_config_sender.send(rc) {
165 Ok(_) => {
166 info!("Run started successfully!");
167 //return_val = Ok(TofCommandCode::DataRunStart);
168 },
169 Err(err) => {
170 error!("Error starting run! {err}");
171 //return_val = Err(CmdError::RunStartError);
172 }
173 }
174 }
175 TofCommandCode::DataRunStop => {
176 let rc = RunConfig::new();
177 match run_config_sender.send(rc) {
178 Ok(_) => {
179 info!("Run stopped successfully!");
180 //return_val = Ok(TofCommandCode::DataRunStop);
181 },
182 Err(err) => {
183 error!("Error stopping run! {err}");
184 //return_val = Err(CmdError::RunStopError);
185 }
186 }
187 }
188 TofCommandCode::RBCalibration => {
189 match rb_calibration(&run_config_sender,
190 &tp_to_pub,
191 save_cali_wf,
192 address_for_cali.clone()) {
193 Ok(_) => {
194 println!("== ==> [cmd-responder] Calibration successful!");
195 info!("Default calibration data taking successful!");
196 //return_val = Ok(TofCommandCode::RBCalibration);
197 },
198 Err(err) => {
199 error!("Default calibration data taking failed! Error {err}!");
200 //return_val = Err(CmdError::CalibrationError);
201 }
202 }
203 }
204 _ => error!("Not able to execute command for command code {}", cmd.command_code)
205 }
206 }
207 //TofPacketType::TofCommand => {
208 // // we have to strip off the topic
209 // match TofCommand::from_bytestream(&tp.payload, &mut 0) {
210 // Err(err) => {
211 // error!("Problem decoding command {err}");
212 // }
213 // Ok(cmd) => {
214 // // we got a valid tof command, forward it and wait for the
215 // // response
216 // //let tof_resp = TofResponse::GeneralFail(TofResponseCode::RespErrNotImplemented as u32);
217 // //let resp_not_implemented = prefix_board_id(&mut tof_resp.to_bytestream());
218 // //let resp_not_implemented = TofResponse::GeneralFail(RESP_ERR_NOTIMPLEMENTED);
219 // let return_val: Result<TofCommandCode, CmdError>;
220 // match cmd {
221 // TofCommand::Unknown (_) => {
222 // info!("Received unknown command");
223 // error!("Cannot interpret unknown command");
224 // return_val = Err(CmdError::UnknownError);
225 // },
226 // TofCommand::Ping (_) => {
227 // info!("Received ping command");
228 // let mut ping = RBPing::new();
229 // ping.rb_id = get_board_id().unwrap_or(0) as u8;
230 // let tp = ping.pack();
231 // match tp_to_pub.send(tp) {
232 // Err(err) => error!("Unable to send ping response! {err}"),
233 // Ok(_) => ()
234 // }
235 // return_val = Ok(TofCommandCode::Ping);
236 // },
237 // TofCommand::Moni (value) => {
238 // // MSB third 8 bits are
239 // let tof_component: TofComponent = TofComponent::from(((value | MASK_CMD_8BIT << 8) >> 8) as u8);
240 // // MSB fourth 8 bits are
241 // let id: u8 = (value | MASK_CMD_8BIT) as u8;
242 // // Function that just replies to a ping command send to tofcpu
243 // // get_board_id PANICS!! TODO
244 // let rb_id = get_board_id().unwrap() as u8;
245
246 // if tof_component != TofComponent::MT &&
247 // tof_component != TofComponent::TofCpu &&
248 // tof_component != TofComponent::Unknown &&
249 // rb_id != id {
250 // // The packet was not for this RB so bye
251 // continue;
252 // } else {
253 // match tof_component {
254 // TofComponent::RB => {
255 // info!("Received RB moni command");
256 // let moni = get_rb_moni(id).unwrap();
257 // let tp = moni.pack();
258
259 // match tp_to_pub.send(tp) {
260 // Err(err) => {
261 // error!("RB moni sending failed! Err {err}");
262 // return_val = Err(CmdError::MoniError);
263 // }
264 // Ok(_) => {
265 // info!("RB moni sent");
266 // return_val = Ok(TofCommandCode::Moni);
267 // }
268 // };
269 // },
270 // TofComponent::PB => {
271 // info!("Received PB moni command");
272 // let moni = get_pb_moni(id).unwrap();
273 // let tp = moni.pack();
274 // match tp_to_pub.send(tp) {
275 // Err(err) => {
276 // error!("PB moni sending failed! Err {err}");
277 // return_val = Err(CmdError::MoniError);
278 // }
279 // Ok(_) => {
280 // info!("PB moni sent");
281 // return_val = Ok(TofCommandCode::Moni);
282 // }
283 // };
284 // },
285 // TofComponent::LTB => {
286 // info!("Received LTB moni command");
287 // let moni = get_ltb_moni(id).unwrap();
288 // let tp = moni.pack();
289 // match tp_to_pub.send(tp) {
290 // Err(err) => {
291 // error!("LTB moni sending failed! Err {err}");
292 // return_val = Err(CmdError::MoniError);
293 // }
294 // Ok(_) => {
295 // info!("LTB moni sent");
296 // return_val = Ok(TofCommandCode::Moni);
297 // }
298 // };
299 // },
300 // _ => {
301 // return_val = Err(CmdError::MoniError);
302 // error!("An RB can control just PBs and LTBs.")
303 // }
304 // }
305 // }
306 // },
307 // TofCommand::DataRunStop(value) => {
308 // // MSB fourth 8 bits are RB ID
309 // let rb_id: u8 = (value | MASK_CMD_8BIT) as u8;
310 // println!("=> Received command to end run for board ids {rb_id}!");
311
312 // let my_rb_id = get_board_id().unwrap() as u8;
313 // // if this RB is the one then do stuff
314 // if rb_id == DEFAULT_RB_ID || rb_id == my_rb_id {
315 // println!("=> Received command to end run!");
316 // // default is not active for run config
317
318 // let rc = RunConfig::new();
319 // match run_config_sender.send(rc) {
320 // Ok(_) => {
321 // info!("Run stopped successfully!");
322 // return_val = Ok(TofCommandCode::DataRunStop);
323 // },
324 // Err(err) => {
325 // error!("Error stopping run! {err}");
326 // return_val = Err(CmdError::RunStopError);
327 // }
328 // }
329 // } else {
330 // // The packet was not for this RB so bye
331 // continue;
332 // }
333 // },
334 // TofCommand::DataRunStart (value) => {
335 // // MSB second 8 bits are run_type
336 // //let run_type: u8 = ((value | (MASK_CMD_8BIT << 16)) >> 16) as u8;
337 // // MSB third 8 bits are RB ID
338 // let rb_id: u8 = ((value | (MASK_CMD_8BIT << 8)) >> 8) as u8;
339 // // MSB fourth 8 bits are event number
340 // //let event_no: u8 = (value | MASK_CMD_8BIT) as u8;
341 // // let's start a run. The value of the TofCommnad shall be
342 // // nevents
343
344 // let my_rb_id = get_board_id().unwrap() as u8;
345 // // if this RB is the one then do stuff
346 // if rb_id == DEFAULT_RB_ID || rb_id == my_rb_id {
347 // println!("==> Will initialize new run!");
348 // //let rc = get_runconfig(&run_config_file);
349 // let rc = run_config.clone();
350 // match run_config_sender.send(rc) {
351 // Ok(_) => {
352 // info!("Run started successfully!");
353 // return_val = Ok(TofCommandCode::DataRunStart);
354 // },
355 // Err(err) => {
356 // error!("Error starting run! {err}");
357 // return_val = Err(CmdError::RunStartError);
358 // }
359 // };
360 // } else {
361 // // The packet was not for this RB so bye
362 // continue;
363 // }
364 // },
365 // TofCommand::DefaultCalibration (value) => {
366 // // MSB first 16 bits are voltage level
367 // //let voltage_val: u16 = ((value | (MASK_CMD_16BIT << 16)) >> 16) as u16;
368 // // MSB third 8 bits are RB ID
369 // let rb_id: u8 = ((value | (MASK_CMD_8BIT << 8)) >> 8) as u8;
370 // // MSB fourth 8 bits are extra (not used)
371 // //let extra: u8 = (value | MASK_CMD_8BIT) as u8;
372
373 // let my_rb_id = get_board_id().unwrap() as u8;
374 // // if this RB is the one then do stuff
375 // if rb_id == DEFAULT_RB_ID || rb_id == my_rb_id {
376 // // FIXME - time delay? When we start all calibrations at the
377 // // same time, then the nw might get too busy?
378 // match rb_calibration(&run_config_sender,
379 // &tp_to_pub,
380 // save_cali_wf,
381 // address_for_cali.clone()) {
382 // Ok(_) => {
383 // println!("== ==> [cmd-responder] Calibration successful!");
384 // info!("Default calibration data taking successful!");
385 // return_val = Ok(TofCommandCode::RBCalibration);
386 // },
387 // Err(err) => {
388 // error!("Default calibration data taking failed! Error {err}!");
389 // return_val = Err(CmdError::CalibrationError);
390 // }
391 // }
392 // } else {
393 // // The packet was not for this RB so bye
394 // continue;
395 // }
396 // }
397 // _ => {
398 // error!("{} is not implemented!", cmd);
399 // return_val = Err(CmdError::NotImplementedError);
400 // }
401 // }
402 // // deal with return values
403 // match return_val {
404 // Err(cmd_error) => {
405 // let r = TofResponse::GeneralFail(TofResponseCode::RespErrUnexecutable as u32);
406 // match cmd_socket.send(r.to_bytestream(),0) {
407 // Err(err) => warn!("Can not send response!, Err {err}"),
408 // Ok(_) => info!("Responded to {cmd_error}!")
409 // }
410 // },
411 // Ok(tof_command) => {
412 // let r = TofResponse::Success(TofResponseCode::RespSuccFingersCrossed as u32);
413 // match cmd_socket.send(r.to_bytestream(),0) {
414 // Err(err) => warn!("Can not send response!! {err}"),
415 // Ok(_) => info!("Responded to {tof_command}!")
416 // }
417 // }
418 // }
419 // }
420 // }
421 //},
422 _ => {
423 error!("Can not respond to {}", tp);
424 }
425 }
426 }
427 }
428 }
429 }
430 }
431}