liftof_rb/threads/cmd_responder.rs
1use std::time::{
2 //Instant,
3 Duration,
4};
5use std::thread;
6use std::sync::{
7 Arc,
8 Mutex,
9};
10
11use crossbeam_channel::Sender;
12
13use tof_control::helper::pa_type::PASetBias;
14
15use tof_dataclasses::commands::{
16 TofCommand,
17 TofCommandV2,
18 TofCommandCode,
19 TofResponse,
20 TofResponseCode,
21};
22
23use tof_dataclasses::commands::config::{
24 PreampBiasConfig,
25 RunConfig,
26};
27
28use tof_dataclasses::errors::CmdError;
29use tof_dataclasses::packets::{TofPacket,
30 PacketType};
31use tof_dataclasses::heartbeats::RBPing;
32use tof_dataclasses::serialization::Serialization;
33use tof_dataclasses::serialization::Packable;
34
35use liftof_lib::{
36 //build_tcp_from_ip,
37 TofComponent,
38 LTBThresholdName
39};
40
41use tof_dataclasses::constants::{MASK_CMD_8BIT,
42 MASK_CMD_16BIT};
43
44use crate::api::{rb_calibration,
45 //set_preamp_biases,
46 //send_preamp_bias_set,
47 send_ltb_threshold_set,
48};
49use crate::threads::monitoring::{
50 get_ltb_moni,
51 get_pb_moni,
52 get_rb_moni
53};
54
55use liftof_lib::constants::DEFAULT_RB_ID;
56use liftof_lib::thread_control::ThreadControl;
57
58//use tof_dataclasses::threading::ThreadControl;
59
60use crate::control::{get_board_id_string,
61 get_board_id};
62
63/// Centrailized command management
64///
65/// Maintain 0MQ command connection and faciliate
66/// forwarding of commands and responses
67///
68/// # Arguments
69///
70/// * cmd_server_address : The full address string e.g. tcp://1.1.1.1:12345
71/// where the command server is publishing commands..
72/// * run_config : The default runconfig. Defined by reading in the
73/// config file when the code boots up.
74/// When we receive a simple DataRunStartCommand,
75/// we will run this configuration
76/// * run_config_sender : A sender to send the dedicated run config to the
77/// runner
78/// * tp_to_pub : Send TofPackets to the data pub.
79/// Some TOF commands might trigger
80/// additional information to get
81/// send.
82/// * address_for_cali : The local (self) PUB address, so that the rb_calibratoin,
83/// can subscribe to it to loop itself the event packets
84/// * thread_control : Manage thread control signals, e.g. stop
85pub fn cmd_responder(cmd_server_address : String,
86 run_config : &RunConfig,
87 run_config_sender : &Sender<RunConfig>,
88 tp_to_pub : &Sender<TofPacket>,
89 address_for_cali : String,
90 thread_control : Arc<Mutex<ThreadControl>>) {
91 // create 0MQ sockedts
92 //let one_milli = time::Duration::from_millis(1);
93 //let port = DATAPORT.to_string();
94 //let cmd_address = build_tcp_from_ip(cmd_server_ip,port);
95 //// we will subscribe to two types of messages, BRCT and RB + 2 digits
96 //// of board id
97 let topic_board = get_board_id_string().expect("Can not get board id!");
98 let topic_broadcast = String::from("BRCT");
99 let ctx = zmq::Context::new();
100 // I guess expect is fine here, see above
101 let cmd_socket = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
102 info!("Will set up 0MQ SUB socket to listen for commands at address {cmd_server_address}");
103 let mut is_connected = false;
104 match cmd_socket.connect(&cmd_server_address) {
105 Err(err) => warn!("Not able to connect to {}, Error {err}", cmd_server_address),
106 Ok(_) => {
107 info!("Connected to CnC server at {}", cmd_server_address);
108 match cmd_socket.set_subscribe(&topic_broadcast.as_bytes()) {
109 Err(err) => error!("Can not subscribe to {topic_broadcast}, err {err}"),
110 Ok(_) => ()
111 }
112 match cmd_socket.set_subscribe(&topic_board.as_bytes()) {
113 Err(err) => error!("Can not subscribe to {topic_board}, err {err}"),
114 Ok(_) => ()
115 }
116 is_connected = true;
117 }
118 }
119
120 //let mut heartbeat = Instant::now();
121
122 // I don't know if we need this, maybe the whole block can go away.
123 // Originally I thought the RBs get pinged every x seconds and if we
124 // don't see the ping, we reconnect to the socket. But I don't know
125 // if that scenario actually occurs.
126 // Paolo: instead of leaving the connection always open we might
127 // want to reopen it if its not reachable anymore (so like command-oriented)...
128 //warn!("TODO: Heartbeat feature not yet implemented on C&C side");
129 //let heartbeat_received = false;
130 let mut save_cali_wf = false;
131 loop {
132 match thread_control.lock() {
133 Ok(tc) => {
134 save_cali_wf = tc.liftof_settings.save_cali_wf;
135 if tc.stop_flag {
136 info!("Received stop signal. Will stop thread!");
137 break;
138 }
139 },
140 Err(err) => {
141 trace!("Can't acquire lock! {err}");
142 },
143 }
144 // we need to make sure to connect to our server
145 // The startup is a bit tricky... FIXME
146 if !is_connected {
147 match cmd_socket.connect(&cmd_server_address) {
148 Err(err) => {
149 debug!("Not able to connect to {}! {err}", cmd_server_address);
150 thread::sleep(Duration::from_millis(200));
151 continue;
152 }
153 Ok(_) => {
154 info!("Connected to CnC server at {}", cmd_server_address);
155 match cmd_socket.set_subscribe(&topic_broadcast.as_bytes()) {
156 Err(err) => error!("Can not subscribe to {topic_broadcast}, err {err}"),
157 Ok(_) => ()
158 }
159 match cmd_socket.set_subscribe(&topic_board.as_bytes()) {
160 Err(err) => error!("Can not subscribe to {topic_board}, err {err}"),
161 Ok(_) => ()
162 }
163 is_connected = true;
164 }
165 }
166 }
167 match cmd_socket.recv_bytes(0) {
168 //match cmd_socket.recv_bytes(zmq::DONTWAIT) {
169 Err(err) => trace!("Problem receiving command over 0MQ ! Err {err}"),
170 Ok(cmd_bytes) => {
171 debug!("Received bytes {}", cmd_bytes.len());
172 // it will always be a tof packet
173 match TofPacket::from_bytestream(&cmd_bytes, &mut 4) {
174 Err(err) => {
175 error!("Can not decode TofPacket! bytes {:?}, error {err}", cmd_bytes);
176 },
177 Ok(tp) => {
178 match tp.packet_type {
179 // Not sure about that yet
180 //PacketType::BfswAckPacket => {
181 // // just forward this for now
182 // match tp_to_pub.send(tp) {
183 // Err(err) => error!("Unable to forward Bfsw Ack packet! {err}"),
184 // Ok(_) => ()
185 // }
186 // //return_val = Ok(TofCommandCode::Ping);
187 //}
188 PacketType::TofCommandV2 => {
189 let cmd : TofCommandV2;
190 match tp.unpack::<TofCommandV2>() {
191 Ok(_cmd) => {cmd = _cmd;}
192 Err(err) => {
193 error!("Unable to unpack TofCommand! {err}");
194 continue;
195 }
196 }
197 match cmd.command_code {
198 TofCommandCode::DataRunStart => {
199 let rc = run_config.clone();
200 match run_config_sender.send(rc) {
201 Ok(_) => {
202 info!("Run started successfully!");
203 //return_val = Ok(TofCommandCode::DataRunStart);
204 },
205 Err(err) => {
206 error!("Error starting run! {err}");
207 //return_val = Err(CmdError::RunStartError);
208 }
209 }
210 }
211 TofCommandCode::DataRunStop => {
212 let rc = RunConfig::new();
213 match run_config_sender.send(rc) {
214 Ok(_) => {
215 info!("Run stopped successfully!");
216 //return_val = Ok(TofCommandCode::DataRunStop);
217 },
218 Err(err) => {
219 error!("Error stopping run! {err}");
220 //return_val = Err(CmdError::RunStopError);
221 }
222 }
223 }
224 TofCommandCode::RBCalibration => {
225 match rb_calibration(&run_config_sender,
226 &tp_to_pub,
227 save_cali_wf,
228 address_for_cali.clone()) {
229 Ok(_) => {
230 println!("== ==> [cmd-responder] Calibration successful!");
231 info!("Default calibration data taking successful!");
232 //return_val = Ok(TofCommandCode::RBCalibration);
233 },
234 Err(err) => {
235 error!("Default calibration data taking failed! Error {err}!");
236 //return_val = Err(CmdError::CalibrationError);
237 }
238 }
239 }
240 _ => error!("Not able to execute command for command code {}", cmd.command_code)
241 }
242 }
243 PacketType::TofCommand => {
244 // we have to strip off the topic
245 match TofCommand::from_bytestream(&tp.payload, &mut 0) {
246 Err(err) => {
247 error!("Problem decoding command {err}");
248 }
249 Ok(cmd) => {
250 // we got a valid tof command, forward it and wait for the
251 // response
252 //let tof_resp = TofResponse::GeneralFail(TofResponseCode::RespErrNotImplemented as u32);
253 //let resp_not_implemented = prefix_board_id(&mut tof_resp.to_bytestream());
254 //let resp_not_implemented = TofResponse::GeneralFail(RESP_ERR_NOTIMPLEMENTED);
255 let return_val: Result<TofCommandCode, CmdError>;
256 match cmd {
257 TofCommand::Unknown (_) => {
258 info!("Received unknown command");
259 error!("Cannot interpret unknown command");
260 return_val = Err(CmdError::UnknownError);
261 },
262 TofCommand::Ping (_) => {
263 info!("Received ping command");
264 let mut ping = RBPing::new();
265 ping.rb_id = get_board_id().unwrap_or(0) as u8;
266 let tp = ping.pack();
267 match tp_to_pub.send(tp) {
268 Err(err) => error!("Unable to send ping response! {err}"),
269 Ok(_) => ()
270 }
271 return_val = Ok(TofCommandCode::Ping);
272 },
273 TofCommand::Moni (value) => {
274 // MSB third 8 bits are
275 let tof_component: TofComponent = TofComponent::from(((value | MASK_CMD_8BIT << 8) >> 8) as u8);
276 // MSB fourth 8 bits are
277 let id: u8 = (value | MASK_CMD_8BIT) as u8;
278 // Function that just replies to a ping command send to tofcpu
279 // get_board_id PANICS!! TODO
280 let rb_id = get_board_id().unwrap() as u8;
281
282 if tof_component != TofComponent::MT &&
283 tof_component != TofComponent::TofCpu &&
284 tof_component != TofComponent::Unknown &&
285 rb_id != id {
286 // The packet was not for this RB so bye
287 continue;
288 } else {
289 match tof_component {
290 TofComponent::RB => {
291 info!("Received RB moni command");
292 let moni = get_rb_moni(id).unwrap();
293 let tp = moni.pack();
294
295 match tp_to_pub.send(tp) {
296 Err(err) => {
297 error!("RB moni sending failed! Err {err}");
298 return_val = Err(CmdError::MoniError);
299 }
300 Ok(_) => {
301 info!("RB moni sent");
302 return_val = Ok(TofCommandCode::Moni);
303 }
304 };
305 },
306 TofComponent::PB => {
307 info!("Received PB moni command");
308 let moni = get_pb_moni(id).unwrap();
309 let tp = moni.pack();
310 match tp_to_pub.send(tp) {
311 Err(err) => {
312 error!("PB moni sending failed! Err {err}");
313 return_val = Err(CmdError::MoniError);
314 }
315 Ok(_) => {
316 info!("PB moni sent");
317 return_val = Ok(TofCommandCode::Moni);
318 }
319 };
320 },
321 TofComponent::LTB => {
322 info!("Received LTB moni command");
323 let moni = get_ltb_moni(id).unwrap();
324 let tp = moni.pack();
325 match tp_to_pub.send(tp) {
326 Err(err) => {
327 error!("LTB moni sending failed! Err {err}");
328 return_val = Err(CmdError::MoniError);
329 }
330 Ok(_) => {
331 info!("LTB moni sent");
332 return_val = Ok(TofCommandCode::Moni);
333 }
334 };
335 },
336 _ => {
337 return_val = Err(CmdError::MoniError);
338 error!("An RB can control just PBs and LTBs.")
339 }
340 }
341 }
342 },
343 TofCommand::DataRunStop(value) => {
344 // MSB fourth 8 bits are RB ID
345 let rb_id: u8 = (value | MASK_CMD_8BIT) as u8;
346 println!("=> Received command to end run for board ids {rb_id}!");
347
348 let my_rb_id = get_board_id().unwrap() as u8;
349 // if this RB is the one then do stuff
350 if rb_id == DEFAULT_RB_ID || rb_id == my_rb_id {
351 println!("=> Received command to end run!");
352 // default is not active for run config
353
354 let rc = RunConfig::new();
355 match run_config_sender.send(rc) {
356 Ok(_) => {
357 info!("Run stopped successfully!");
358 return_val = Ok(TofCommandCode::DataRunStop);
359 },
360 Err(err) => {
361 error!("Error stopping run! {err}");
362 return_val = Err(CmdError::RunStopError);
363 }
364 }
365 } else {
366 // The packet was not for this RB so bye
367 continue;
368 }
369 },
370 TofCommand::DataRunStart (value) => {
371 // MSB second 8 bits are run_type
372 //let run_type: u8 = ((value | (MASK_CMD_8BIT << 16)) >> 16) as u8;
373 // MSB third 8 bits are RB ID
374 let rb_id: u8 = ((value | (MASK_CMD_8BIT << 8)) >> 8) as u8;
375 // MSB fourth 8 bits are event number
376 //let event_no: u8 = (value | MASK_CMD_8BIT) as u8;
377 // let's start a run. The value of the TofCommnad shall be
378 // nevents
379
380 let my_rb_id = get_board_id().unwrap() as u8;
381 // if this RB is the one then do stuff
382 if rb_id == DEFAULT_RB_ID || rb_id == my_rb_id {
383 println!("==> Will initialize new run!");
384 //let rc = get_runconfig(&run_config_file);
385 let rc = run_config.clone();
386 match run_config_sender.send(rc) {
387 Ok(_) => {
388 info!("Run started successfully!");
389 return_val = Ok(TofCommandCode::DataRunStart);
390 },
391 Err(err) => {
392 error!("Error starting run! {err}");
393 return_val = Err(CmdError::RunStartError);
394 }
395 };
396 } else {
397 // The packet was not for this RB so bye
398 continue;
399 }
400 },
401 TofCommand::DefaultCalibration (value) => {
402 // MSB first 16 bits are voltage level
403 //let voltage_val: u16 = ((value | (MASK_CMD_16BIT << 16)) >> 16) as u16;
404 // MSB third 8 bits are RB ID
405 let rb_id: u8 = ((value | (MASK_CMD_8BIT << 8)) >> 8) as u8;
406 // MSB fourth 8 bits are extra (not used)
407 //let extra: u8 = (value | MASK_CMD_8BIT) as u8;
408
409 let my_rb_id = get_board_id().unwrap() as u8;
410 // if this RB is the one then do stuff
411 if rb_id == DEFAULT_RB_ID || rb_id == my_rb_id {
412 // FIXME - time delay? When we start all calibrations at the
413 // same time, then the nw might get too busy?
414 match rb_calibration(&run_config_sender,
415 &tp_to_pub,
416 save_cali_wf,
417 address_for_cali.clone()) {
418 Ok(_) => {
419 println!("== ==> [cmd-responder] Calibration successful!");
420 info!("Default calibration data taking successful!");
421 return_val = Ok(TofCommandCode::RBCalibration);
422 },
423 Err(err) => {
424 error!("Default calibration data taking failed! Error {err}!");
425 return_val = Err(CmdError::CalibrationError);
426 }
427 }
428 } else {
429 // The packet was not for this RB so bye
430 continue;
431 }
432 }
433 _ => {
434 error!("{} is not implemented!", cmd);
435 return_val = Err(CmdError::NotImplementedError);
436 }
437 }
438 // deal with return values
439 match return_val {
440 Err(cmd_error) => {
441 let r = TofResponse::GeneralFail(TofResponseCode::RespErrUnexecutable as u32);
442 match cmd_socket.send(r.to_bytestream(),0) {
443 Err(err) => warn!("Can not send response!, Err {err}"),
444 Ok(_) => info!("Responded to {cmd_error}!")
445 }
446 },
447 Ok(tof_command) => {
448 let r = TofResponse::Success(TofResponseCode::RespSuccFingersCrossed as u32);
449 match cmd_socket.send(r.to_bytestream(),0) {
450 Err(err) => warn!("Can not send response!! {err}"),
451 Ok(_) => info!("Responded to {tof_command}!")
452 }
453 }
454 }
455 }
456 }
457 },
458 _ => {
459 error!("Can not respond to {}", tp);
460 }
461 }
462 }
463 }
464 }
465 }
466 }
467}