liftof_rb/threads/
cmd_responder.rs

1//! React to commands while running. This feature has been stripped down, due to 
2//! a change in philosophy - whenever a setting is changed, a new run (and thus 
3//! a new config file) should be created.
4//!
5//! However, there are some things which still need to be done during runtime, e.g.
6//! taking calibration data
7// This file is part of gaps-online-software and published 
8// under the GPLv3 license
9
10use std::time::{
11  //Instant,
12  Duration,
13};
14use std::thread;
15use std::sync::{
16  Arc,
17  Mutex,
18};
19
20use crossbeam_channel::Sender;
21//use tof_control::helper::pa_type::PASetBias;
22use gondola_core::prelude::*;
23
24use crate::api::{rb_calibration};
25use crate::control::get_board_id_string;
26
27/// Centrailized command management
28/// 
29/// Maintain 0MQ command connection and faciliate 
30/// forwarding of commands and responses
31///
32/// # Arguments
33///
34/// * cmd_server_address        : The full address string e.g. tcp://1.1.1.1:12345 
35///                               where the command server is publishing commands..
36/// * run_config                : The default runconfig. Defined by reading in the 
37///                               config file when the code boots up.
38///                               When we receive a simple DataRunStartCommand,
39///                               we will run this configuration
40/// * run_config_sender         : A sender to send the dedicated run config to the 
41///                               runner
42/// * tp_to_pub                 : Send TofPackets to the data pub.
43///                               Some TOF commands might trigger
44///                               additional information to get 
45///                               send.
46/// * address_for_cali          : The local (self) PUB address, so that the rb_calibratoin,
47///                               can subscribe to it to loop itself the event packets
48/// * thread_control            : Manage thread control signals, e.g. stop
49pub fn cmd_responder(cmd_server_address        : String,
50                     run_config                : &RunConfig,
51                     run_config_sender         : &Sender<RunConfig>,
52                     tp_to_pub                 : &Sender<TofPacket>,
53                     address_for_cali          : String,
54                     thread_control            : Arc<Mutex<ThreadControl>>) {
55  // create 0MQ sockedts
56  //let one_milli       = time::Duration::from_millis(1);
57  //let port            = DATAPORT.to_string();
58  //let cmd_address     = build_tcp_from_ip(cmd_server_ip,port);
59  //// we will subscribe to two types of messages, BRCT and RB + 2 digits 
60  //// of board id
61  let topic_board     = get_board_id_string().expect("Can not get board id!");
62  let topic_broadcast = String::from("BRCT");
63  let ctx = zmq::Context::new();
64  // I guess expect is fine here, see above
65  let cmd_socket = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
66  info!("Will set up 0MQ SUB socket to listen for commands at address {cmd_server_address}");
67  let mut is_connected = false;
68  match cmd_socket.connect(&cmd_server_address) {
69    Err(err) => warn!("Not able to connect to {}, Error {err}", cmd_server_address),
70    Ok(_)    => {
71      info!("Connected to CnC server at {}", cmd_server_address);
72      match cmd_socket.set_subscribe(&topic_broadcast.as_bytes()) {
73        Err(err) => error!("Can not subscribe to {topic_broadcast}, err {err}"),
74        Ok(_)    => ()
75      }
76      match cmd_socket.set_subscribe(&topic_board.as_bytes()) {
77        Err(err) => error!("Can not subscribe to {topic_board}, err {err}"),
78        Ok(_)    => ()
79      }
80      is_connected = true;
81    }
82  }
83  
84  //let mut heartbeat     = Instant::now();
85
86  // I don't know if we need this, maybe the whole block can go away.
87  // Originally I thought the RBs get pinged every x seconds and if we
88  // don't see the ping, we reconnect to the socket. But I don't know
89  // if that scenario actually occurs.
90  // Paolo: instead of leaving the connection always open we might
91  //  want to reopen it if its not reachable anymore (so like command-oriented)...
92  //warn!("TODO: Heartbeat feature not yet implemented on C&C side");
93  //let heartbeat_received = false;
94  let mut save_cali_wf = false;
95  loop {
96    match thread_control.lock() {
97      Ok(tc) => {
98        save_cali_wf = tc.liftof_settings.save_cali_wf;
99        if tc.stop_flag {
100          info!("Received stop signal. Will stop thread!");
101          break;
102        }
103      },
104      Err(err) => {
105        trace!("Can't acquire lock! {err}");
106      },
107    }
108    // we need to make sure to connect to our server
109    // The startup is a bit tricky... FIXME
110    if !is_connected {
111      match cmd_socket.connect(&cmd_server_address) {
112        Err(err) => {
113          debug!("Not able to connect to {}! {err}", cmd_server_address);
114          thread::sleep(Duration::from_millis(200));
115          continue;
116        }
117        Ok(_)    => {
118          info!("Connected to CnC server at {}", cmd_server_address);
119          match cmd_socket.set_subscribe(&topic_broadcast.as_bytes()) {
120            Err(err) => error!("Can not subscribe to {topic_broadcast}, err {err}"),
121            Ok(_)    => ()
122          }
123          match cmd_socket.set_subscribe(&topic_board.as_bytes()) {
124            Err(err) => error!("Can not subscribe to {topic_board}, err {err}"),
125            Ok(_)    => ()
126          }
127          is_connected = true;
128        }
129      }
130    }
131    match cmd_socket.recv_bytes(0) {
132    //match cmd_socket.recv_bytes(zmq::DONTWAIT) {
133      Err(err) => trace!("Problem receiving command over 0MQ ! Err {err}"),
134      Ok(cmd_bytes)  => {
135        debug!("Received bytes {}", cmd_bytes.len());
136        // it will always be a tof packet
137        match TofPacket::from_bytestream(&cmd_bytes, &mut 4) {
138          Err(err) => {
139            error!("Can not decode TofPacket! bytes {:?}, error {err}", cmd_bytes);
140          },
141          Ok(tp) => {
142            match tp.packet_type {
143              // Not sure about that yet
144              //PacketType::BfswAckPacket => {
145              //  // just forward this for now
146              //  match tp_to_pub.send(tp) {
147              //    Err(err) => error!("Unable to forward Bfsw Ack packet! {err}"),
148              //    Ok(_) => ()
149              //  }
150              //  //return_val   = Ok(TofCommandCode::Ping);
151              //}
152              TofPacketType::TofCommand => {
153                let cmd : TofCommand;
154                match tp.unpack::<TofCommand>() {
155                  Ok(_cmd) => {cmd = _cmd;}
156                  Err(err) => {
157                    error!("Unable to unpack TofCommand! {err}");
158                    continue;
159                  }
160                }
161                match cmd.command_code {
162                  TofCommandCode::DataRunStart => {
163                    let rc = run_config.clone();
164                    match run_config_sender.send(rc) {
165                      Ok(_)    => {
166                        info!("Run started successfully!");
167                        //return_val = Ok(TofCommandCode::DataRunStart);
168                      },
169                      Err(err) => {
170                        error!("Error starting run! {err}");
171                        //return_val = Err(CmdError::RunStartError);
172                      }
173                    }
174                  }
175                  TofCommandCode::DataRunStop => {
176                    let rc = RunConfig::new();
177                    match run_config_sender.send(rc) {
178                      Ok(_)    => {
179                        info!("Run stopped successfully!");
180                        //return_val = Ok(TofCommandCode::DataRunStop);
181                      },
182                      Err(err) => {
183                        error!("Error stopping run! {err}");
184                        //return_val = Err(CmdError::RunStopError);
185                      }
186                    }
187                  }
188                  TofCommandCode::RBCalibration => {
189                    match rb_calibration(&run_config_sender,
190                                         &tp_to_pub,
191                                         save_cali_wf,
192                                         address_for_cali.clone()) {
193                      Ok(_) => {
194                        println!("== ==> [cmd-responder] Calibration successful!");
195                        info!("Default calibration data taking successful!");
196                        //return_val = Ok(TofCommandCode::RBCalibration);
197                      },
198                      Err(err) => {
199                        error!("Default calibration data taking failed! Error {err}!");
200                        //return_val = Err(CmdError::CalibrationError);
201                      }
202                    }
203                  }
204                  _ => error!("Not able to execute command for command code {}", cmd.command_code)
205                }
206              }
207              //TofPacketType::TofCommand => {
208              //  // we have to strip off the topic
209              //  match TofCommand::from_bytestream(&tp.payload, &mut 0) {
210              //    Err(err) => {
211              //      error!("Problem decoding command {err}");
212              //    }
213              //    Ok(cmd)  => {
214              //      // we got a valid tof command, forward it and wait for the 
215              //      // response
216              //      //let tof_resp  = TofResponse::GeneralFail(TofResponseCode::RespErrNotImplemented as u32);
217              //      //let resp_not_implemented = prefix_board_id(&mut tof_resp.to_bytestream());
218              //      //let resp_not_implemented = TofResponse::GeneralFail(RESP_ERR_NOTIMPLEMENTED);
219              //      let return_val: Result<TofCommandCode, CmdError>;
220              //      match cmd {
221              //        TofCommand::Unknown (_) => {
222              //          info!("Received unknown command");
223              //          error!("Cannot interpret unknown command");
224              //          return_val = Err(CmdError::UnknownError);
225              //        },
226              //        TofCommand::Ping (_) => {
227              //          info!("Received ping command");
228              //          let mut ping = RBPing::new();
229              //          ping.rb_id = get_board_id().unwrap_or(0) as u8;
230              //          let tp = ping.pack();
231              //          match tp_to_pub.send(tp) {
232              //            Err(err) => error!("Unable to send ping response! {err}"),
233              //            Ok(_) => ()
234              //          }
235              //          return_val   = Ok(TofCommandCode::Ping);
236              //        },
237              //        TofCommand::Moni (value) => {
238              //          // MSB third 8 bits are 
239              //          let tof_component: TofComponent = TofComponent::from(((value | MASK_CMD_8BIT << 8) >> 8) as u8);
240              //          // MSB fourth 8 bits are 
241              //          let id: u8 = (value | MASK_CMD_8BIT) as u8;
242              //          // Function that just replies to a ping command send to tofcpu
243              //          // get_board_id PANICS!! TODO
244              //          let rb_id = get_board_id().unwrap() as u8;
245
246              //          if tof_component != TofComponent::MT &&
247              //             tof_component != TofComponent::TofCpu &&
248              //             tof_component != TofComponent::Unknown &&
249              //             rb_id != id {
250              //            // The packet was not for this RB so bye
251              //            continue;
252              //          } else {
253              //            match tof_component {
254              //              TofComponent::RB => {
255              //                info!("Received RB moni command");
256              //                let moni = get_rb_moni(id).unwrap();
257              //                let tp = moni.pack();
258
259              //                match tp_to_pub.send(tp) {
260              //                  Err(err) => {
261              //                    error!("RB moni sending failed! Err {err}");
262              //                    return_val = Err(CmdError::MoniError);
263              //                  }
264              //                  Ok(_)    => {
265              //                    info!("RB moni sent");
266              //                    return_val = Ok(TofCommandCode::Moni);
267              //                  }
268              //                };
269              //              },
270              //              TofComponent::PB  => {
271              //                info!("Received PB moni command");
272              //                let moni = get_pb_moni(id).unwrap();
273              //                let tp = moni.pack();
274              //                match tp_to_pub.send(tp) {
275              //                  Err(err) => {
276              //                    error!("PB moni sending failed! Err {err}");
277              //                    return_val = Err(CmdError::MoniError);
278              //                  }
279              //                  Ok(_)    => {
280              //                    info!("PB moni sent");
281              //                    return_val = Ok(TofCommandCode::Moni);
282              //                  }
283              //                };
284              //              },
285              //              TofComponent::LTB => {
286              //                info!("Received LTB moni command");
287              //                let moni = get_ltb_moni(id).unwrap();
288              //                let tp = moni.pack();
289              //                match tp_to_pub.send(tp) {
290              //                  Err(err) => {
291              //                    error!("LTB moni sending failed! Err {err}");
292              //                    return_val = Err(CmdError::MoniError);
293              //                  }
294              //                  Ok(_)    => {
295              //                    info!("LTB moni sent");
296              //                    return_val = Ok(TofCommandCode::Moni);
297              //                  }
298              //                };
299              //              },
300              //              _                 => {
301              //                return_val = Err(CmdError::MoniError);
302              //                error!("An RB can control just PBs and LTBs.")
303              //              }
304              //            }
305              //          }
306              //        },
307              //        TofCommand::DataRunStop(value)   => {
308              //          // MSB fourth 8 bits are RB ID
309              //          let rb_id: u8 = (value | MASK_CMD_8BIT) as u8;
310              //          println!("=> Received command to end run for board ids {rb_id}!");
311
312              //          let my_rb_id = get_board_id().unwrap() as u8;
313              //          // if this RB is the one then do stuff
314              //          if rb_id == DEFAULT_RB_ID || rb_id == my_rb_id {
315              //            println!("=> Received command to end run!");
316              //            // default is not active for run config
317
318              //            let rc = RunConfig::new();
319              //            match run_config_sender.send(rc) {
320              //              Ok(_)    => {
321              //                info!("Run stopped successfully!");
322              //                return_val = Ok(TofCommandCode::DataRunStop);
323              //              },
324              //              Err(err) => {
325              //                error!("Error stopping run! {err}");
326              //                return_val = Err(CmdError::RunStopError);
327              //              }
328              //            }
329              //          } else {
330              //            // The packet was not for this RB so bye
331              //            continue;
332              //          }
333              //        },
334              //        TofCommand::DataRunStart (value) => {
335              //          // MSB second 8 bits are run_type
336              //          //let run_type: u8 = ((value | (MASK_CMD_8BIT << 16)) >> 16) as u8;
337              //          // MSB third 8 bits are RB ID
338              //          let rb_id: u8    = ((value | (MASK_CMD_8BIT << 8)) >> 8) as u8;
339              //          // MSB fourth 8 bits are event number
340              //          //let event_no: u8 = (value | MASK_CMD_8BIT) as u8;
341              //          // let's start a run. The value of the TofCommnad shall be 
342              //          // nevents
343
344              //          let my_rb_id = get_board_id().unwrap() as u8;
345              //          // if this RB is the one then do stuff
346              //          if rb_id == DEFAULT_RB_ID || rb_id == my_rb_id {
347              //            println!("==> Will initialize new run!");
348              //            //let rc    = get_runconfig(&run_config_file);
349              //            let rc = run_config.clone();
350              //            match run_config_sender.send(rc) {
351              //              Ok(_)    => {
352              //                info!("Run started successfully!");
353              //                return_val = Ok(TofCommandCode::DataRunStart);
354              //              },
355              //              Err(err) => {
356              //                error!("Error starting run! {err}");
357              //                return_val = Err(CmdError::RunStartError);
358              //              }
359              //            };
360              //          } else {
361              //            // The packet was not for this RB so bye
362              //            continue;
363              //          }
364              //        },
365              //        TofCommand::DefaultCalibration  (value) => {
366              //          // MSB first 16 bits are voltage level
367              //          //let voltage_val: u16 = ((value | (MASK_CMD_16BIT << 16)) >> 16) as u16;
368              //          // MSB third 8 bits are RB ID
369              //          let rb_id: u8 = ((value | (MASK_CMD_8BIT << 8)) >> 8) as u8;
370              //          // MSB fourth 8 bits are extra (not used)
371              //          //let extra: u8 = (value | MASK_CMD_8BIT) as u8;
372
373              //          let my_rb_id = get_board_id().unwrap() as u8;
374              //          // if this RB is the one then do stuff
375              //          if rb_id == DEFAULT_RB_ID || rb_id == my_rb_id {
376              //            // FIXME - time delay? When we start all calibrations at the 
377              //            // same time, then the nw might get too busy? 
378              //            match rb_calibration(&run_config_sender,
379              //                                 &tp_to_pub,
380              //                                 save_cali_wf,
381              //                                 address_for_cali.clone()) {
382              //              Ok(_) => {
383              //                println!("== ==> [cmd-responder] Calibration successful!");
384              //                info!("Default calibration data taking successful!");
385              //                return_val = Ok(TofCommandCode::RBCalibration);
386              //              },
387              //              Err(err) => {
388              //                error!("Default calibration data taking failed! Error {err}!");
389              //                return_val = Err(CmdError::CalibrationError);
390              //              }
391              //            }
392              //          } else {
393              //            // The packet was not for this RB so bye
394              //            continue;
395              //          }
396              //        }
397              //        _ => {
398              //          error!("{} is not implemented!", cmd);
399              //          return_val = Err(CmdError::NotImplementedError);
400              //        }
401              //      }
402              //      // deal with return values
403              //      match return_val {
404              //        Err(cmd_error) => {
405              //          let r = TofResponse::GeneralFail(TofResponseCode::RespErrUnexecutable as u32);
406              //          match cmd_socket.send(r.to_bytestream(),0) {
407              //            Err(err) => warn!("Can not send response!, Err {err}"),
408              //            Ok(_)    => info!("Responded to {cmd_error}!")
409              //          }
410              //        },
411              //        Ok(tof_command)  => {
412              //          let r = TofResponse::Success(TofResponseCode::RespSuccFingersCrossed as u32);
413              //          match cmd_socket.send(r.to_bytestream(),0) {
414              //            Err(err) => warn!("Can not send response!! {err}"),
415              //            Ok(_)    => info!("Responded to {tof_command}!")
416              //          }
417              //        }
418              //      }
419              //    }
420              //  }  
421              //},
422              _ => {
423                error!("Can not respond to {}", tp);
424              }
425            }
426          }
427        }
428      }
429    }
430  }
431}