liftof_cc/threads/
readoutboard_comm.rs

1//! Readoutboard communication. Get events and 
2//! monitoring data.
3//!
4//! Each readoutboard gets connected through a zmq socket 
5//! over ethernet and has its own thread. 
6// This file is part of gaps-online-software and published 
7// under the GPLv3 license
8
9use std::collections::HashMap;
10
11use std::sync::{
12  Arc,
13  Mutex,
14};
15
16use std::time::{
17  Instant,
18  //Duration,
19};
20
21use crossbeam_channel::Sender;
22
23use gondola_core::prelude::*;
24
25/*************************************/
26
27/// Receive data from a readoutboard
28///
29/// In case of binary event data ("blob") this can be analyzed here
30/// It is also possible to save the data directly.
31///
32/// In case of monitoring/other tof packets, those will be forwarded
33///
34/// # Arguments:
35///
36/// * ev_to_builder       : This thread will receive RBEvent data from the assigned RB, 
37///                         if desired (see run_analysis_engine) run analysis and extract
38///                         TofHits and then pass the result on to the event builder.
39/// * tp_to_sink          : Channel which should be connect to a (global) data sink.
40///                         Packets which are of not event type (e.g. header/full binary data)
41///                         will be forwarded to the sink.
42/// * rb                  : ReadoutBoard instance, as loaded from the database. This will be used
43///                         for readoutboard id as well as paddle assignment.
44/// * ae_settings         : Settings to configure peakfinding algorithms etc. 
45///                         These can be configured with an external .toml file
46pub fn readoutboard_communicator(ev_to_builder       : Sender<RBEvent>,
47                                 tp_to_sink          : Sender<TofPacket>,
48                                 mut rb              : ReadoutBoard,
49                                 thread_control      : Arc<Mutex<ThreadControl>>) {
50
51  let mut this_status = HashMap::<u16, bool>::new();
52  for k in 1..321 {
53    this_status.insert(k,false);
54  }
55  match rb.load_latest_calibration() {
56    Err(err) => error!("Unable to load calibration for RB {}! {}", rb.rb_id, err),
57    Ok(_)    => {
58      info!("Loaded calibration for board {} successfully!", rb.rb_id);
59    }
60  }
61
62  let zmq_ctx = zmq::Context::new();
63  let board_id = rb.rb_id; //rb.id.unwrap();
64  info!("initializing RB thread for board {}!", board_id);
65  let mut n_errors        = 0usize;
66  // how many chunks ("buffers") we dealt with
67  let mut n_chunk  = 0usize;
68  // in case we want to do calibratoins
69  let address = rb.guess_address();
70
71  // FIXME - this panics, however, if we can't set up the socket, what's 
72  // the point of this thread?
73  let socket = zmq_ctx.socket(zmq::SUB).expect("Unable to create socket!");
74  match socket.connect(&address) {
75    Err(err) => error!("Can not connect to socket {}, {}", address, err),
76    Ok(_)    => info!("Connected to {address}")
77  }
78  // no need to subscribe to a topic, since there 
79  // is one port for each rb
80  let topic = format!("RB{:02}", board_id);
81  match socket.set_subscribe(&topic.as_bytes()) {
82   Err(err) => error!("Unable to subscribe to topic! {err}"),
83   Ok(_)    => info!("Subscribed to {:?}!", topic),
84  }
85  let mut tc_timer = Instant::now();
86  let verification_active : bool;
87  
88  let ae_settings         : AnalysisEngineSettings; 
89  let mut run_analysis_engine : bool;
90  match thread_control.lock() {
91    Ok(tc) => {
92      ae_settings         = tc.liftof_settings.analysis_engine_settings.clone();
93      run_analysis_engine = tc.liftof_settings.run_analysis_engine;
94      verification_active = tc.liftof_settings.verification_run.unwrap_or(false);
95    }
96    Err(err) => {
97      error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
98      error!("Ending thread, unable to acquire settings!");
99      return;
100    }
101  }
102  if verification_active {
103    // needs analysis engine since it relies on hits 
104    run_analysis_engine = true;
105    println!("=> Running verfication on board {board_id}!");
106  } else {
107    debug!("=> Not running verification!");
108  }
109  if run_analysis_engine {
110    info!("Will run analysis engine!");
111    //println!("Will use the following settings! {}", ae_settings);
112  } else {
113    warn!("Will not run analysis engine!");
114  }
115  
116  // start continuous thread activity, read data from RB sockets,
117  // do analysis and pass them on.
118  loop {
119    //println!("tc_timer {}", tc_timer.elapsed().as_secs_f32());
120    if tc_timer.elapsed().as_secs_f32() > 2.1 {
121      match thread_control.try_lock() {
122        Ok(mut tc) => {
123          debug!(" lock on thread control acquired!");
124          //println!("== ==> [rbcomm] tc lock acquired!");
125          if tc.end_all_rb_threads {
126            //println!("= => [rbcomm] initiate ending thread for RB {}!", board_id);
127            tc.thread_rbcomm_active.insert(rb.rb_id,false);
128            // check if all threads have ended
129            let mut all_done = true;
130            for (_,value) in &tc.thread_rbcomm_active {
131              if *value {
132                all_done = false;
133              }
134            }
135            if all_done {
136              tc.thread_event_bldr_active = false;
137            }
138            break;
139          }
140          //verification_active = tc.verification_active;
141          if verification_active {
142            //println!("RB thread for RB {} is in 'verification' mode!", board_id);
143            tc.detector_status.update_from_map(this_status.clone());
144          }
145        },
146        Err(err) => {
147          error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
148        },
149      }
150      tc_timer = Instant::now();
151    }
152    // check if we got new data
153    // this is blocking the thread
154    match socket.recv_bytes(0) {
155      Err(err) => {
156        n_errors += 1;
157        error!("Receiving from socket raised error {}", err);
158      }
159      Ok(buffer) => {
160        // strip the first 4 bytes, since they contain the 
161        // board id
162        match TofPacket::from_bytestream(&buffer, &mut 4) { 
163          Err(err) => {
164            error!("Unknown bytestream...{:?}", err);
165            continue;  
166          },
167          Ok(tp) => {
168            //n_received += 1;
169            match tp.packet_type {
170              TofPacketType::RBEvent | TofPacketType::RBEventMemoryView => {
171                //let mut event = RBEvent::from(&tp);
172                let mut event : RBEvent;
173                match tp.unpack::<RBEvent>() {
174                  Ok(rbev_) => {
175                    event = rbev_;
176                    event.creation_time = Some(Instant::now());
177                  } 
178                  Err(err) => {
179                    error!("Can't unpack RBEvent! {err}");
180                    continue;
181                  }
182                }
183                // don't create the hits if the trigger is lost (the 
184                // waveform field will be empty)
185                if event.hits.len() == 0 
186                && !event.header.drs_lost_trigger() 
187                && run_analysis_engine {
188                  match waveform_analysis(&mut event, 
189                                          &rb,
190                                          ae_settings) {
191                    Ok(_) => (),
192                    Err(err) => {
193                      debug!("Unable to analyze waveforms for this event on RB {}! {err}", &rb.rb_id);
194                    }
195                  }
196                }
197                if verification_active {
198                  //println!("Found active verification run, will update hit map!");
199                  //debug!("{}", event);
200                  for h in &event.hits {
201                    // average charge/peak hit
202                    let verification_charge_threshhold = 10.0f32;
203                    if h.get_charge_a() >= verification_charge_threshhold {
204                      let status_key = h.paddle_id as u16;
205                      match this_status.insert(status_key, true) {
206                        Some(_) => (),
207                        None => error!("Unknown paddle id! {}", h.paddle_id)
208                      }
209                    }
210                    if h.get_charge_b() >= verification_charge_threshhold {
211                      let status_key = (h.paddle_id as u16) + 160;
212                      match this_status.insert(status_key, true) {
213                        Some(_) => (),
214                        None => error!("Unknown paddle id! {}", h.paddle_id)
215                      }
216                    }
217                  }
218                } else {
219                  //println!("RBEVENT, RB HAS {} HITS", event.hits.len());
220                  match ev_to_builder.send(event) {
221                    Ok(_) => (),
222                    Err(err) => {
223                      error!("Unable to send event! Err {err}");
224                    }
225                  }
226                }
227                //n_events += 1;
228                n_chunk += 1;
229              } 
230              TofPacketType::RBCalibration => {
231                let mut flight_v = RBCalibrationFlightV::new();
232                let mut flight_t = RBCalibrationFlightT::new(); 
233
234                match tp.unpack::<RBCalibrations>() {
235                  Ok(cali) => {
236                    flight_v = cali.emit_flightvcal();
237                    flight_t = cali.emit_flighttcal(); 
238                    //println!("= => [rb_comm] Received RBCalibration!");
239                    match thread_control.lock() {
240                      Ok(mut tc) => {
241                        tc.calibrations.insert(board_id, cali.clone()); 
242                        *tc.finished_calibrations.get_mut(&board_id).unwrap() = true; 
243                        rb.calibration = cali;
244                      }
245                      Err(err) => {
246                        error!("Can't acquire lock for ThreadControl!! {err}");
247                      },
248                    }
249                  }
250                  Err(err) => {
251                    error!("Received calibration package, but got error when unpacking! {err}");
252                  }
253                }
254                let flight_v_tp = flight_v.pack();
255                let flight_t_tp = flight_t.pack();
256                match tp_to_sink.send(flight_v_tp) {
257                  Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
258                  Ok(_)    => debug!("Packet sent"),
259                }
260                match tp_to_sink.send(flight_t_tp) {
261                  Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
262                  Ok(_)    => debug!("Packet sent"),
263                }
264                match tp_to_sink.send(tp) {
265                  Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
266                  Ok(_)    => debug!("Packet sent"),
267                }
268              }
269              _ => {
270                // Currently, we will just forward all other packets
271                // directly to the data sink
272                match tp_to_sink.send(tp) {
273                  Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
274                  Ok(_)    => debug!("Packet sent"),
275                }
276              }
277            } // end match packet type
278          } // end OK
279        } // end match from_bytestream
280      } // end ok buffer 
281    } // end match 
282    trace!("Digested {n_chunk} chunks!");
283    debug!("Noticed {n_errors} errors!");
284  } // end loop
285  println!("= => [rbcomm] thread for RB {} finished! (not recoverable)", board_id);
286} // end fun
287