liftof_cc/threads/
readoutboard_comm.rs

1//! Readoutboard communication. Get events and 
2//! monitoring data
3
4use std::collections::HashMap;
5
6use std::sync::{
7  Arc,
8  Mutex,
9};
10
11use std::time::{
12  Instant,
13  //Duration,
14};
15
16use crossbeam_channel::Sender;
17
18use tof_dataclasses::database::ReadoutBoard;
19use tof_dataclasses::events::RBEvent;
20use tof_dataclasses::packets::{
21  TofPacket,
22  PacketType,
23};
24
25use tof_dataclasses::serialization::{
26  Serialization,
27  Packable
28};
29use tof_dataclasses::calibrations::{
30  RBCalibrations,
31  RBCalibrationsFlightT,
32  RBCalibrationsFlightV,
33  };
34
35use liftof_lib::{
36  waveform_analysis,
37};
38
39use liftof_lib::thread_control::ThreadControl;
40use liftof_lib::settings::AnalysisEngineSettings;
41
42/*************************************/
43
44/// Receive data from a readoutboard
45///
46/// In case of binary event data ("blob") this can be analyzed here
47/// It is also possible to save the data directly.
48///
49/// In case of monitoring/other tof packets, those will be forwarded
50///
51/// # Arguments:
52///
53/// * ev_to_builder       : This thread will receive RBEvent data from the assigned RB, 
54///                         if desired (see run_analysis_engine) run analysis and extract
55///                         TofHits and then pass the result on to the event builder.
56/// * tp_to_sink          : Channel which should be connect to a (global) data sink.
57///                         Packets which are of not event type (e.g. header/full binary data)
58///                         will be forwarded to the sink.
59/// * rb                  : ReadoutBoard instance, as loaded from the database. This will be used
60///                         for readoutboard id as well as paddle assignment.
61/// * ae_settings         : Settings to configure peakfinding algorithms etc. 
62///                         These can be configured with an external .toml file
63pub fn readoutboard_communicator(ev_to_builder       : Sender<RBEvent>,
64                                 tp_to_sink          : Sender<TofPacket>,
65                                 mut rb              : ReadoutBoard,
66                                 thread_control      : Arc<Mutex<ThreadControl>>) {
67
68  let mut this_status = HashMap::<u16, bool>::new();
69  for k in 1..321 {
70    this_status.insert(k,false);
71  }
72  match rb.load_latest_calibration() {
73    Err(err) => error!("Unable to load calibration for RB {}! {}", rb.rb_id, err),
74    Ok(_)    => {
75      info!("Loaded calibration for board {} successfully!", rb.rb_id);
76    }
77  }
78
79  let zmq_ctx = zmq::Context::new();
80  let board_id = rb.rb_id; //rb.id.unwrap();
81  info!("initializing RB thread for board {}!", board_id);
82  let mut n_errors        = 0usize;
83  // how many chunks ("buffers") we dealt with
84  let mut n_chunk  = 0usize;
85  // in case we want to do calibratoins
86  let address = rb.guess_address();
87
88  // FIXME - this panics, however, if we can't set up the socket, what's 
89  // the point of this thread?
90  let socket = zmq_ctx.socket(zmq::SUB).expect("Unable to create socket!");
91  match socket.connect(&address) {
92    Err(err) => error!("Can not connect to socket {}, {}", address, err),
93    Ok(_)    => info!("Connected to {address}")
94  }
95  // no need to subscribe to a topic, since there 
96  // is one port for each rb
97  let topic = format!("RB{:02}", board_id);
98  match socket.set_subscribe(&topic.as_bytes()) {
99   Err(err) => error!("Unable to subscribe to topic! {err}"),
100   Ok(_)    => info!("Subscribed to {:?}!", topic),
101  }
102  let mut tc_timer = Instant::now();
103  let mut verification_active = false;
104  
105  let ae_settings         : AnalysisEngineSettings; 
106  let run_analysis_engine : bool;
107  match thread_control.lock() {
108    Ok(tc) => {
109      ae_settings         = tc.liftof_settings.analysis_engine_settings.clone();
110      run_analysis_engine = tc.liftof_settings.run_analysis_engine;
111    }
112    Err(err) => {
113      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
114      error!("Ending thread, unable to acquire settings!");
115      return;
116    }
117  }
118  if run_analysis_engine {
119    info!("Will run analysis engine!");
120    //println!("Will use the following settings! {}", ae_settings);
121  } else {
122    warn!("Will not run analysis engine!");
123  }
124
125  // start continuous thread activity, read data from RB sockets,
126  // do analysis and pass them on.
127  loop {
128    if tc_timer.elapsed().as_secs_f32() > 2.1 {
129      match thread_control.try_lock() {
130        Ok(mut tc) => {
131          //println!("== ==> [rbcomm] tc lock acquired!");
132          if tc.end_all_rb_threads {
133            //println!("= => [rbcomm] initiate ending thread for RB {}!", board_id);
134            tc.thread_rbcomm_active.insert(rb.rb_id,false);
135            // check if all threads have ended
136            let mut all_done = true;
137            for (_,value) in &tc.thread_rbcomm_active {
138              if *value {
139                all_done = false;
140              }
141            }
142            if all_done {
143              tc.thread_event_bldr_active = false;
144            }
145            break;
146          }
147          verification_active = tc.verification_active;
148          if verification_active {
149            tc.detector_status.update_from_map(this_status.clone());
150          }
151        },
152        Err(err) => {
153          error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
154        },
155      }
156      tc_timer = Instant::now();
157    }
158    // check if we got new data
159    // this is blocking the thread
160    match socket.recv_bytes(0) {
161      Err(err) => {
162        n_errors += 1;
163        error!("Receiving from socket raised error {}", err);
164      }
165      Ok(buffer) => {
166        // strip the first 4 bytes, since they contain the 
167        // board id
168        match TofPacket::from_bytestream(&buffer, &mut 4) { 
169          Err(err) => {
170            error!("Unknown bytestream...{:?}", err);
171            continue;  
172          },
173          Ok(tp) => {
174            //n_received += 1;
175            match tp.packet_type {
176              PacketType::RBEvent | PacketType::RBEventMemoryView => {
177                let mut event = RBEvent::from(&tp);
178                // don't create the hits if the trigger is lost (the 
179                // waveform field will be empty)
180                if event.hits.len() == 0 
181                && !event.header.drs_lost_trigger() 
182                && run_analysis_engine {
183                  match waveform_analysis(&mut event, 
184                                          &rb,
185                                          ae_settings) {
186                    Ok(_) => (),
187                    Err(err) => {
188                      warn!("Unable to analyze waveforms for this event! {err}");
189                    }
190                  }
191                }
192                if verification_active {
193                  for h in &event.hits {
194                    // average charge/peak hit
195                    let verification_charge_threshhold = 10.0f32;
196                    if h.get_charge_a() >= verification_charge_threshhold {
197                      let status_key = h.paddle_id as u16;
198                      match this_status.insert(status_key, true) {
199                        Some(_) => (),
200                        None => error!("Unknown paddle id! {}", h.paddle_id)
201                      }
202                    }
203                    if h.get_charge_b() >= verification_charge_threshhold {
204                      let status_key = (h.paddle_id as u16) + 160;
205                      match this_status.insert(status_key, true) {
206                        Some(_) => (),
207                        None => error!("Unknown paddle id! {}", h.paddle_id)
208                      }
209                    }
210                  }
211                }
212                if !verification_active {
213                  match ev_to_builder.send(event) {
214                    Ok(_) => (),
215                    Err(err) => {
216                      error!("Unable to send event! Err {err}");
217                    }
218                  }
219                }
220                //n_events += 1;
221                n_chunk += 1;
222              } 
223              PacketType::RBCalibration => {
224                let mut flight_v = RBCalibrationsFlightV::new();
225                let mut flight_t = RBCalibrationsFlightT::new(); 
226
227                match tp.unpack::<RBCalibrations>() {
228                  Ok(cali) => {
229                    flight_v = cali.emit_flightvcal();
230                    flight_t = cali.emit_flighttcal(); 
231                    //println!("= => [rb_comm] Received RBCalibration!");
232                    match thread_control.lock() {
233                      Ok(mut tc) => {
234                        tc.calibrations.insert(board_id, cali.clone()); 
235                        *tc.finished_calibrations.get_mut(&board_id).unwrap() = true; 
236                        rb.calibration = cali;
237                      }
238                      Err(err) => {
239                        error!("Can't acquire lock for ThreadControl!! {err}");
240                      },
241                    }
242                  }
243                  Err(err) => {
244                    error!("Received calibration package, but got error when unpacking! {err}");
245                  }
246                }
247                let flight_v_tp = flight_v.pack();
248                let flight_t_tp = flight_t.pack();
249                match tp_to_sink.send(flight_v_tp) {
250                  Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
251                  Ok(_)    => debug!("Packet sent"),
252                }
253                match tp_to_sink.send(flight_t_tp) {
254                  Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
255                  Ok(_)    => debug!("Packet sent"),
256                }
257                match tp_to_sink.send(tp) {
258                  Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
259                  Ok(_)    => debug!("Packet sent"),
260                }
261              }
262              _ => {
263                // Currently, we will just forward all other packets
264                // directly to the data sink
265                match tp_to_sink.send(tp) {
266                  Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
267                  Ok(_)    => debug!("Packet sent"),
268                }
269              }
270            } // end match packet type
271          } // end OK
272        } // end match from_bytestream
273      } // end ok buffer 
274    } // end match 
275    debug!("Digested {n_chunk} chunks!");
276    debug!("Noticed {n_errors} errors!");
277  } // end loop
278  println!("= => [rbcomm] thread for RB {} finished! (not recoverable)", board_id);
279} // end fun
280