liftof_rb/threads/
cmd_responder.rs

1use std::time::{
2  //Instant,
3  Duration,
4};
5use std::thread;
6use std::sync::{
7  Arc,
8  Mutex,
9};
10
11use crossbeam_channel::Sender;
12
13use tof_control::helper::pa_type::PASetBias;
14
15use tof_dataclasses::commands::{
16  TofCommand,
17  TofCommandV2,
18  TofCommandCode,
19  TofResponse,
20  TofResponseCode,
21};
22
23use tof_dataclasses::commands::config::{
24  PreampBiasConfig,
25  RunConfig,  
26};
27
28use tof_dataclasses::errors::CmdError;
29use tof_dataclasses::packets::{TofPacket,
30                               PacketType};
31use tof_dataclasses::heartbeats::RBPing;
32use tof_dataclasses::serialization::Serialization;
33use tof_dataclasses::serialization::Packable;
34
35use liftof_lib::{
36    //build_tcp_from_ip,
37    TofComponent,
38    LTBThresholdName
39};
40
41use tof_dataclasses::constants::{MASK_CMD_8BIT,
42                                  MASK_CMD_16BIT};
43
44use crate::api::{rb_calibration,
45                 //set_preamp_biases,
46                 //send_preamp_bias_set,
47                 send_ltb_threshold_set,
48};
49use crate::threads::monitoring::{
50  get_ltb_moni,
51  get_pb_moni,
52  get_rb_moni
53};
54
55use liftof_lib::constants::DEFAULT_RB_ID;
56use liftof_lib::thread_control::ThreadControl;
57
58//use tof_dataclasses::threading::ThreadControl;
59
60use crate::control::{get_board_id_string,
61                     get_board_id};
62
63/// Centrailized command management
64/// 
65/// Maintain 0MQ command connection and faciliate 
66/// forwarding of commands and responses
67///
68/// # Arguments
69///
70/// * cmd_server_address        : The full address string e.g. tcp://1.1.1.1:12345 
71///                               where the command server is publishing commands..
72/// * run_config                : The default runconfig. Defined by reading in the 
73///                               config file when the code boots up.
74///                               When we receive a simple DataRunStartCommand,
75///                               we will run this configuration
76/// * run_config_sender         : A sender to send the dedicated run config to the 
77///                               runner
78/// * tp_to_pub                 : Send TofPackets to the data pub.
79///                               Some TOF commands might trigger
80///                               additional information to get 
81///                               send.
82/// * address_for_cali          : The local (self) PUB address, so that the rb_calibratoin,
83///                               can subscribe to it to loop itself the event packets
84/// * thread_control            : Manage thread control signals, e.g. stop
85pub fn cmd_responder(cmd_server_address        : String,
86                     run_config                : &RunConfig,
87                     run_config_sender         : &Sender<RunConfig>,
88                     tp_to_pub                 : &Sender<TofPacket>,
89                     address_for_cali          : String,
90                     thread_control            : Arc<Mutex<ThreadControl>>) {
91  // create 0MQ sockedts
92  //let one_milli       = time::Duration::from_millis(1);
93  //let port            = DATAPORT.to_string();
94  //let cmd_address     = build_tcp_from_ip(cmd_server_ip,port);
95  //// we will subscribe to two types of messages, BRCT and RB + 2 digits 
96  //// of board id
97  let topic_board     = get_board_id_string().expect("Can not get board id!");
98  let topic_broadcast = String::from("BRCT");
99  let ctx = zmq::Context::new();
100  // I guess expect is fine here, see above
101  let cmd_socket = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
102  info!("Will set up 0MQ SUB socket to listen for commands at address {cmd_server_address}");
103  let mut is_connected = false;
104  match cmd_socket.connect(&cmd_server_address) {
105    Err(err) => warn!("Not able to connect to {}, Error {err}", cmd_server_address),
106    Ok(_)    => {
107      info!("Connected to CnC server at {}", cmd_server_address);
108      match cmd_socket.set_subscribe(&topic_broadcast.as_bytes()) {
109        Err(err) => error!("Can not subscribe to {topic_broadcast}, err {err}"),
110        Ok(_)    => ()
111      }
112      match cmd_socket.set_subscribe(&topic_board.as_bytes()) {
113        Err(err) => error!("Can not subscribe to {topic_board}, err {err}"),
114        Ok(_)    => ()
115      }
116      is_connected = true;
117    }
118  }
119  
120  //let mut heartbeat     = Instant::now();
121
122  // I don't know if we need this, maybe the whole block can go away.
123  // Originally I thought the RBs get pinged every x seconds and if we
124  // don't see the ping, we reconnect to the socket. But I don't know
125  // if that scenario actually occurs.
126  // Paolo: instead of leaving the connection always open we might
127  //  want to reopen it if its not reachable anymore (so like command-oriented)...
128  //warn!("TODO: Heartbeat feature not yet implemented on C&C side");
129  //let heartbeat_received = false;
130  let mut save_cali_wf = false;
131  loop {
132    match thread_control.lock() {
133      Ok(tc) => {
134        save_cali_wf = tc.liftof_settings.save_cali_wf;
135        if tc.stop_flag {
136          info!("Received stop signal. Will stop thread!");
137          break;
138        }
139      },
140      Err(err) => {
141        trace!("Can't acquire lock! {err}");
142      },
143    }
144    // we need to make sure to connect to our server
145    // The startup is a bit tricky... FIXME
146    if !is_connected {
147      match cmd_socket.connect(&cmd_server_address) {
148        Err(err) => {
149          debug!("Not able to connect to {}! {err}", cmd_server_address);
150          thread::sleep(Duration::from_millis(200));
151          continue;
152        }
153        Ok(_)    => {
154          info!("Connected to CnC server at {}", cmd_server_address);
155          match cmd_socket.set_subscribe(&topic_broadcast.as_bytes()) {
156            Err(err) => error!("Can not subscribe to {topic_broadcast}, err {err}"),
157            Ok(_)    => ()
158          }
159          match cmd_socket.set_subscribe(&topic_board.as_bytes()) {
160            Err(err) => error!("Can not subscribe to {topic_board}, err {err}"),
161            Ok(_)    => ()
162          }
163          is_connected = true;
164        }
165      }
166    }
167    match cmd_socket.recv_bytes(0) {
168    //match cmd_socket.recv_bytes(zmq::DONTWAIT) {
169      Err(err) => trace!("Problem receiving command over 0MQ ! Err {err}"),
170      Ok(cmd_bytes)  => {
171        debug!("Received bytes {}", cmd_bytes.len());
172        // it will always be a tof packet
173        match TofPacket::from_bytestream(&cmd_bytes, &mut 4) {
174          Err(err) => {
175            error!("Can not decode TofPacket! bytes {:?}, error {err}", cmd_bytes);
176          },
177          Ok(tp) => {
178            match tp.packet_type {
179              // Not sure about that yet
180              //PacketType::BfswAckPacket => {
181              //  // just forward this for now
182              //  match tp_to_pub.send(tp) {
183              //    Err(err) => error!("Unable to forward Bfsw Ack packet! {err}"),
184              //    Ok(_) => ()
185              //  }
186              //  //return_val   = Ok(TofCommandCode::Ping);
187              //}
188              PacketType::TofCommandV2 => {
189                let cmd : TofCommandV2;
190                match tp.unpack::<TofCommandV2>() {
191                  Ok(_cmd) => {cmd = _cmd;}
192                  Err(err) => {
193                    error!("Unable to unpack TofCommand! {err}");
194                    continue;
195                  }
196                }
197                match cmd.command_code {
198                  TofCommandCode::DataRunStart => {
199                    let rc = run_config.clone();
200                    match run_config_sender.send(rc) {
201                      Ok(_)    => {
202                        info!("Run started successfully!");
203                        //return_val = Ok(TofCommandCode::DataRunStart);
204                      },
205                      Err(err) => {
206                        error!("Error starting run! {err}");
207                        //return_val = Err(CmdError::RunStartError);
208                      }
209                    }
210                  }
211                  TofCommandCode::DataRunStop => {
212                    let rc = RunConfig::new();
213                    match run_config_sender.send(rc) {
214                      Ok(_)    => {
215                        info!("Run stopped successfully!");
216                        //return_val = Ok(TofCommandCode::DataRunStop);
217                      },
218                      Err(err) => {
219                        error!("Error stopping run! {err}");
220                        //return_val = Err(CmdError::RunStopError);
221                      }
222                    }
223                  }
224                  TofCommandCode::RBCalibration => {
225                    match rb_calibration(&run_config_sender,
226                                         &tp_to_pub,
227                                         save_cali_wf,
228                                         address_for_cali.clone()) {
229                      Ok(_) => {
230                        println!("== ==> [cmd-responder] Calibration successful!");
231                        info!("Default calibration data taking successful!");
232                        //return_val = Ok(TofCommandCode::RBCalibration);
233                      },
234                      Err(err) => {
235                        error!("Default calibration data taking failed! Error {err}!");
236                        //return_val = Err(CmdError::CalibrationError);
237                      }
238                    }
239                  }
240                  _ => error!("Not able to execute command for command code {}", cmd.command_code)
241                }
242              }
243              PacketType::TofCommand => {
244                // we have to strip off the topic
245                match TofCommand::from_bytestream(&tp.payload, &mut 0) {
246                  Err(err) => {
247                    error!("Problem decoding command {err}");
248                  }
249                  Ok(cmd)  => {
250                    // we got a valid tof command, forward it and wait for the 
251                    // response
252                    //let tof_resp  = TofResponse::GeneralFail(TofResponseCode::RespErrNotImplemented as u32);
253                    //let resp_not_implemented = prefix_board_id(&mut tof_resp.to_bytestream());
254                    //let resp_not_implemented = TofResponse::GeneralFail(RESP_ERR_NOTIMPLEMENTED);
255                    let return_val: Result<TofCommandCode, CmdError>;
256                    match cmd {
257                      TofCommand::Unknown (_) => {
258                        info!("Received unknown command");
259                        error!("Cannot interpret unknown command");
260                        return_val = Err(CmdError::UnknownError);
261                      },
262                      TofCommand::Ping (_) => {
263                        info!("Received ping command");
264                        let mut ping = RBPing::new();
265                        ping.rb_id = get_board_id().unwrap_or(0) as u8;
266                        let tp = ping.pack();
267                        match tp_to_pub.send(tp) {
268                          Err(err) => error!("Unable to send ping response! {err}"),
269                          Ok(_) => ()
270                        }
271                        return_val   = Ok(TofCommandCode::Ping);
272                      },
273                      TofCommand::Moni (value) => {
274                        // MSB third 8 bits are 
275                        let tof_component: TofComponent = TofComponent::from(((value | MASK_CMD_8BIT << 8) >> 8) as u8);
276                        // MSB fourth 8 bits are 
277                        let id: u8 = (value | MASK_CMD_8BIT) as u8;
278                        // Function that just replies to a ping command send to tofcpu
279                        // get_board_id PANICS!! TODO
280                        let rb_id = get_board_id().unwrap() as u8;
281
282                        if tof_component != TofComponent::MT &&
283                           tof_component != TofComponent::TofCpu &&
284                           tof_component != TofComponent::Unknown &&
285                           rb_id != id {
286                          // The packet was not for this RB so bye
287                          continue;
288                        } else {
289                          match tof_component {
290                            TofComponent::RB => {
291                              info!("Received RB moni command");
292                              let moni = get_rb_moni(id).unwrap();
293                              let tp = moni.pack();
294
295                              match tp_to_pub.send(tp) {
296                                Err(err) => {
297                                  error!("RB moni sending failed! Err {err}");
298                                  return_val = Err(CmdError::MoniError);
299                                }
300                                Ok(_)    => {
301                                  info!("RB moni sent");
302                                  return_val = Ok(TofCommandCode::Moni);
303                                }
304                              };
305                            },
306                            TofComponent::PB  => {
307                              info!("Received PB moni command");
308                              let moni = get_pb_moni(id).unwrap();
309                              let tp = moni.pack();
310                              match tp_to_pub.send(tp) {
311                                Err(err) => {
312                                  error!("PB moni sending failed! Err {err}");
313                                  return_val = Err(CmdError::MoniError);
314                                }
315                                Ok(_)    => {
316                                  info!("PB moni sent");
317                                  return_val = Ok(TofCommandCode::Moni);
318                                }
319                              };
320                            },
321                            TofComponent::LTB => {
322                              info!("Received LTB moni command");
323                              let moni = get_ltb_moni(id).unwrap();
324                              let tp = moni.pack();
325                              match tp_to_pub.send(tp) {
326                                Err(err) => {
327                                  error!("LTB moni sending failed! Err {err}");
328                                  return_val = Err(CmdError::MoniError);
329                                }
330                                Ok(_)    => {
331                                  info!("LTB moni sent");
332                                  return_val = Ok(TofCommandCode::Moni);
333                                }
334                              };
335                            },
336                            _                 => {
337                              return_val = Err(CmdError::MoniError);
338                              error!("An RB can control just PBs and LTBs.")
339                            }
340                          }
341                        }
342                      },
343                      TofCommand::DataRunStop(value)   => {
344                        // MSB fourth 8 bits are RB ID
345                        let rb_id: u8 = (value | MASK_CMD_8BIT) as u8;
346                        println!("=> Received command to end run for board ids {rb_id}!");
347
348                        let my_rb_id = get_board_id().unwrap() as u8;
349                        // if this RB is the one then do stuff
350                        if rb_id == DEFAULT_RB_ID || rb_id == my_rb_id {
351                          println!("=> Received command to end run!");
352                          // default is not active for run config
353
354                          let rc = RunConfig::new();
355                          match run_config_sender.send(rc) {
356                            Ok(_)    => {
357                              info!("Run stopped successfully!");
358                              return_val = Ok(TofCommandCode::DataRunStop);
359                            },
360                            Err(err) => {
361                              error!("Error stopping run! {err}");
362                              return_val = Err(CmdError::RunStopError);
363                            }
364                          }
365                        } else {
366                          // The packet was not for this RB so bye
367                          continue;
368                        }
369                      },
370                      TofCommand::DataRunStart (value) => {
371                        // MSB second 8 bits are run_type
372                        //let run_type: u8 = ((value | (MASK_CMD_8BIT << 16)) >> 16) as u8;
373                        // MSB third 8 bits are RB ID
374                        let rb_id: u8    = ((value | (MASK_CMD_8BIT << 8)) >> 8) as u8;
375                        // MSB fourth 8 bits are event number
376                        //let event_no: u8 = (value | MASK_CMD_8BIT) as u8;
377                        // let's start a run. The value of the TofCommnad shall be 
378                        // nevents
379
380                        let my_rb_id = get_board_id().unwrap() as u8;
381                        // if this RB is the one then do stuff
382                        if rb_id == DEFAULT_RB_ID || rb_id == my_rb_id {
383                          println!("==> Will initialize new run!");
384                          //let rc    = get_runconfig(&run_config_file);
385                          let rc = run_config.clone();
386                          match run_config_sender.send(rc) {
387                            Ok(_)    => {
388                              info!("Run started successfully!");
389                              return_val = Ok(TofCommandCode::DataRunStart);
390                            },
391                            Err(err) => {
392                              error!("Error starting run! {err}");
393                              return_val = Err(CmdError::RunStartError);
394                            }
395                          };
396                        } else {
397                          // The packet was not for this RB so bye
398                          continue;
399                        }
400                      },
401                      TofCommand::DefaultCalibration  (value) => {
402                        // MSB first 16 bits are voltage level
403                        //let voltage_val: u16 = ((value | (MASK_CMD_16BIT << 16)) >> 16) as u16;
404                        // MSB third 8 bits are RB ID
405                        let rb_id: u8 = ((value | (MASK_CMD_8BIT << 8)) >> 8) as u8;
406                        // MSB fourth 8 bits are extra (not used)
407                        //let extra: u8 = (value | MASK_CMD_8BIT) as u8;
408
409                        let my_rb_id = get_board_id().unwrap() as u8;
410                        // if this RB is the one then do stuff
411                        if rb_id == DEFAULT_RB_ID || rb_id == my_rb_id {
412                          // FIXME - time delay? When we start all calibrations at the 
413                          // same time, then the nw might get too busy? 
414                          match rb_calibration(&run_config_sender,
415                                               &tp_to_pub,
416                                               save_cali_wf,
417                                               address_for_cali.clone()) {
418                            Ok(_) => {
419                              println!("== ==> [cmd-responder] Calibration successful!");
420                              info!("Default calibration data taking successful!");
421                              return_val = Ok(TofCommandCode::RBCalibration);
422                            },
423                            Err(err) => {
424                              error!("Default calibration data taking failed! Error {err}!");
425                              return_val = Err(CmdError::CalibrationError);
426                            }
427                          }
428                        } else {
429                          // The packet was not for this RB so bye
430                          continue;
431                        }
432                      }
433                      _ => {
434                        error!("{} is not implemented!", cmd);
435                        return_val = Err(CmdError::NotImplementedError);
436                      }
437                    }
438                    // deal with return values
439                    match return_val {
440                      Err(cmd_error) => {
441                        let r = TofResponse::GeneralFail(TofResponseCode::RespErrUnexecutable as u32);
442                        match cmd_socket.send(r.to_bytestream(),0) {
443                          Err(err) => warn!("Can not send response!, Err {err}"),
444                          Ok(_)    => info!("Responded to {cmd_error}!")
445                        }
446                      },
447                      Ok(tof_command)  => {
448                        let r = TofResponse::Success(TofResponseCode::RespSuccFingersCrossed as u32);
449                        match cmd_socket.send(r.to_bytestream(),0) {
450                          Err(err) => warn!("Can not send response!! {err}"),
451                          Ok(_)    => info!("Responded to {tof_command}!")
452                        }
453                      }
454                    }
455                  }
456                }  
457              },
458              _ => {
459                error!("Can not respond to {}", tp);
460              }
461            }
462          }
463        }
464      }
465    }
466  }
467}