liftof_cc/threads/
readoutboard_comm.rs

1//! Readoutboard communication. Get events and 
2//! monitoring data
3
4use std::collections::HashMap;
5
6use std::sync::{
7  Arc,
8  Mutex,
9};
10
11use std::time::{
12  Instant,
13  //Duration,
14};
15
16use crossbeam_channel::Sender;
17
18use gondola_core::prelude::*;
19
20/*************************************/
21
22/// Receive data from a readoutboard
23///
24/// In case of binary event data ("blob") this can be analyzed here
25/// It is also possible to save the data directly.
26///
27/// In case of monitoring/other tof packets, those will be forwarded
28///
29/// # Arguments:
30///
31/// * ev_to_builder       : This thread will receive RBEvent data from the assigned RB, 
32///                         if desired (see run_analysis_engine) run analysis and extract
33///                         TofHits and then pass the result on to the event builder.
34/// * tp_to_sink          : Channel which should be connect to a (global) data sink.
35///                         Packets which are of not event type (e.g. header/full binary data)
36///                         will be forwarded to the sink.
37/// * rb                  : ReadoutBoard instance, as loaded from the database. This will be used
38///                         for readoutboard id as well as paddle assignment.
39/// * ae_settings         : Settings to configure peakfinding algorithms etc. 
40///                         These can be configured with an external .toml file
41pub fn readoutboard_communicator(ev_to_builder       : Sender<RBEvent>,
42                                 tp_to_sink          : Sender<TofPacket>,
43                                 mut rb              : ReadoutBoard,
44                                 thread_control      : Arc<Mutex<ThreadControl>>) {
45
46  let mut this_status = HashMap::<u16, bool>::new();
47  for k in 1..321 {
48    this_status.insert(k,false);
49  }
50  match rb.load_latest_calibration() {
51    Err(err) => error!("Unable to load calibration for RB {}! {}", rb.rb_id, err),
52    Ok(_)    => {
53      info!("Loaded calibration for board {} successfully!", rb.rb_id);
54    }
55  }
56
57  let zmq_ctx = zmq::Context::new();
58  let board_id = rb.rb_id; //rb.id.unwrap();
59  info!("initializing RB thread for board {}!", board_id);
60  let mut n_errors        = 0usize;
61  // how many chunks ("buffers") we dealt with
62  let mut n_chunk  = 0usize;
63  // in case we want to do calibratoins
64  let address = rb.guess_address();
65
66  // FIXME - this panics, however, if we can't set up the socket, what's 
67  // the point of this thread?
68  let socket = zmq_ctx.socket(zmq::SUB).expect("Unable to create socket!");
69  match socket.connect(&address) {
70    Err(err) => error!("Can not connect to socket {}, {}", address, err),
71    Ok(_)    => info!("Connected to {address}")
72  }
73  // no need to subscribe to a topic, since there 
74  // is one port for each rb
75  let topic = format!("RB{:02}", board_id);
76  match socket.set_subscribe(&topic.as_bytes()) {
77   Err(err) => error!("Unable to subscribe to topic! {err}"),
78   Ok(_)    => info!("Subscribed to {:?}!", topic),
79  }
80  let mut tc_timer = Instant::now();
81  let mut verification_active = false;
82  
83  let ae_settings         : AnalysisEngineSettings; 
84  let run_analysis_engine : bool;
85  match thread_control.lock() {
86    Ok(tc) => {
87      ae_settings         = tc.liftof_settings.analysis_engine_settings.clone();
88      run_analysis_engine = tc.liftof_settings.run_analysis_engine;
89    }
90    Err(err) => {
91      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
92      error!("Ending thread, unable to acquire settings!");
93      return;
94    }
95  }
96  if run_analysis_engine {
97    info!("Will run analysis engine!");
98    //println!("Will use the following settings! {}", ae_settings);
99  } else {
100    warn!("Will not run analysis engine!");
101  }
102
103  // start continuous thread activity, read data from RB sockets,
104  // do analysis and pass them on.
105  loop {
106    //println!("tc_timer {}", tc_timer.elapsed().as_secs_f32());
107    if tc_timer.elapsed().as_secs_f32() > 2.1 {
108      match thread_control.try_lock() {
109        Ok(mut tc) => {
110          debug!(" lock on thread control acquired!");
111          //println!("== ==> [rbcomm] tc lock acquired!");
112          if tc.end_all_rb_threads {
113            //println!("= => [rbcomm] initiate ending thread for RB {}!", board_id);
114            tc.thread_rbcomm_active.insert(rb.rb_id,false);
115            // check if all threads have ended
116            let mut all_done = true;
117            for (_,value) in &tc.thread_rbcomm_active {
118              if *value {
119                all_done = false;
120              }
121            }
122            if all_done {
123              tc.thread_event_bldr_active = false;
124            }
125            break;
126          }
127          verification_active = tc.verification_active;
128          if verification_active {
129            debug!("Found verification flag active!");
130            tc.detector_status.update_from_map(this_status.clone());
131          }
132        },
133        Err(err) => {
134          error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
135        },
136      }
137      tc_timer = Instant::now();
138    }
139    // check if we got new data
140    // this is blocking the thread
141    match socket.recv_bytes(0) {
142      Err(err) => {
143        n_errors += 1;
144        error!("Receiving from socket raised error {}", err);
145      }
146      Ok(buffer) => {
147        // strip the first 4 bytes, since they contain the 
148        // board id
149        match TofPacket::from_bytestream(&buffer, &mut 4) { 
150          Err(err) => {
151            error!("Unknown bytestream...{:?}", err);
152            continue;  
153          },
154          Ok(tp) => {
155            //n_received += 1;
156            match tp.packet_type {
157              TofPacketType::RBEvent | TofPacketType::RBEventMemoryView => {
158                //let mut event = RBEvent::from(&tp);
159                let mut event : RBEvent;
160                match tp.unpack::<RBEvent>() {
161                  Ok(rbev_) => {
162                    event = rbev_;
163                  } 
164                  Err(err) => {
165                    error!("Can't unpack RBEvent! {err}");
166                    continue;
167                  }
168                }
169                // don't create the hits if the trigger is lost (the 
170                // waveform field will be empty)
171                if event.hits.len() == 0 
172                && !event.header.drs_lost_trigger() 
173                && run_analysis_engine {
174                  match waveform_analysis(&mut event, 
175                                          &rb,
176                                          ae_settings) {
177                    Ok(_) => (),
178                    Err(err) => {
179                      warn!("Unable to analyze waveforms for this event! {err}");
180                    }
181                  }
182                }
183                if verification_active {
184                  debug!("Found active verification run, will update hit map!");
185                  debug!("{}", event);
186                  for h in &event.hits {
187                    // average charge/peak hit
188                    let verification_charge_threshhold = 10.0f32;
189                    if h.get_charge_a() >= verification_charge_threshhold {
190                      let status_key = h.paddle_id as u16;
191                      match this_status.insert(status_key, true) {
192                        Some(_) => (),
193                        None => error!("Unknown paddle id! {}", h.paddle_id)
194                      }
195                    }
196                    if h.get_charge_b() >= verification_charge_threshhold {
197                      let status_key = (h.paddle_id as u16) + 160;
198                      match this_status.insert(status_key, true) {
199                        Some(_) => (),
200                        None => error!("Unknown paddle id! {}", h.paddle_id)
201                      }
202                    }
203                  }
204                } else {
205                  match ev_to_builder.send(event) {
206                    Ok(_) => (),
207                    Err(err) => {
208                      error!("Unable to send event! Err {err}");
209                    }
210                  }
211                }
212                //n_events += 1;
213                n_chunk += 1;
214              } 
215              TofPacketType::RBCalibration => {
216                let mut flight_v = RBCalibrationFlightV::new();
217                let mut flight_t = RBCalibrationFlightT::new(); 
218
219                match tp.unpack::<RBCalibrations>() {
220                  Ok(cali) => {
221                    flight_v = cali.emit_flightvcal();
222                    flight_t = cali.emit_flighttcal(); 
223                    //println!("= => [rb_comm] Received RBCalibration!");
224                    match thread_control.lock() {
225                      Ok(mut tc) => {
226                        tc.calibrations.insert(board_id, cali.clone()); 
227                        *tc.finished_calibrations.get_mut(&board_id).unwrap() = true; 
228                        rb.calibration = cali;
229                      }
230                      Err(err) => {
231                        error!("Can't acquire lock for ThreadControl!! {err}");
232                      },
233                    }
234                  }
235                  Err(err) => {
236                    error!("Received calibration package, but got error when unpacking! {err}");
237                  }
238                }
239                let flight_v_tp = flight_v.pack();
240                let flight_t_tp = flight_t.pack();
241                match tp_to_sink.send(flight_v_tp) {
242                  Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
243                  Ok(_)    => debug!("Packet sent"),
244                }
245                match tp_to_sink.send(flight_t_tp) {
246                  Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
247                  Ok(_)    => debug!("Packet sent"),
248                }
249                match tp_to_sink.send(tp) {
250                  Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
251                  Ok(_)    => debug!("Packet sent"),
252                }
253              }
254              _ => {
255                // Currently, we will just forward all other packets
256                // directly to the data sink
257                match tp_to_sink.send(tp) {
258                  Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
259                  Ok(_)    => debug!("Packet sent"),
260                }
261              }
262            } // end match packet type
263          } // end OK
264        } // end match from_bytestream
265      } // end ok buffer 
266    } // end match 
267    trace!("Digested {n_chunk} chunks!");
268    debug!("Noticed {n_errors} errors!");
269  } // end loop
270  println!("= => [rbcomm] thread for RB {} finished! (not recoverable)", board_id);
271} // end fun
272