liftof_rb/threads/
event_processing.rs

1//! Event processing deals with the raw memory input
2//! from the buffers, send to it by the runner 
3//! when reading out the system memory
4//!
5//! Different modes are available, from sending
6//! TofPackets directly through the RBEventMemoryStreamer
7//! without any further parsing of the events (this has 
8//! to be done then on the TOF main computer) to doing 
9//! waveform analysis on the RBs directly
10
11use std::fs;
12use std::path::PathBuf;
13use std::sync::{
14  Arc,
15  Mutex,
16};
17
18use std::time::Instant;
19
20use crossbeam_channel::{
21  Sender,
22  Receiver
23};
24
25use gondola_core::prelude::*;
26
27
28//use tof_dataclasses::events::DataType;
29//use tof_dataclasses::packets::{
30//  TofPacket,
31//};
32//use tof_dataclasses::io::RBEventMemoryStreamer;
33//use tof_dataclasses::calibrations::RBCalibrations;
34//use tof_dataclasses::events::EventStatus;
35//use tof_dataclasses::commands::{
36//  TofOperationMode,
37//};
38//
39//use tof_dataclasses::events::rb_event::RBPaddleID;
40//
41//use liftof_lib::{
42//  RunStatistics,
43//  //waveform_analysis,
44//};
45//
46//use liftof_lib::thread_control::ThreadControl;
47
48use crate::control::get_deadtime;
49
50///  Transforms raw bytestream to TofPackets
51///
52///  This allows to get the eventid from the 
53///  binrary form of the RBEvent
54///
55///  #Arguments
56///  
57///  * board_id            : The unique ReadoutBoard identifier
58///                          (ID) of this RB
59///  * bs_recv             : A receiver for bytestreams. The 
60///                          bytestream comes directly from 
61///                          the data buffers.
62///  * get_op_mode         : The TOF operation mode. Typically,
63///                          this is "Default", meaning that the
64///                          RBs will sent what is in the memory 
65///                          buffer translated into RBEvents.
66///                          In "RBHighThrougput" mode, it will not
67///                          translate them into RBEvents, but just
68///                          transmits the content of the buffers, and 
69///                          RBWaveform mode will do waveform analysis
70///                          on the boards
71///  * tp_sender           : Send the resulting data product to 
72///                          get processed further
73///  * data_type           : If different from 0, do some processing
74///                          on the data read from memory
75///  * verbose             : More output to the console for debugging
76///  * only_perfect_events : Only transmit events with EventStatus::Perfect.
77///                          This only applies when the op mode is not 
78///                          RBHighThroughput
79pub fn event_processing(board_id            : u8,
80                        rbpaddleid          : RBPaddleID,
81                        bs_recv             : &Receiver<Vec<u8>>,
82                        get_op_mode         : &Receiver<TofOperationMode>, 
83                        tp_sender           : &Sender<TofPacket>,
84                        dtf_fr_runner       : &Receiver<DataType>,
85                        verbose             : bool,
86                        thread_control      : Arc<Mutex<ThreadControl>>,
87                        stat                : Arc<Mutex<RunStatistics>>,
88                        only_perfect_events : bool) {
89  
90  let mut op_mode = TofOperationMode::Default;
91  let mut thread_ctrl_check_timer = Instant::now();
92
93  // load calibration just in case?
94  let mut cali_loaded = false;
95  let cali        : RBCalibrations;
96  let cali_path       = format!("/home/gaps/calib/rb_{:0>2}.cali.tof.gaps", board_id);
97  let cali_path_buf   = PathBuf::from(&cali_path);
98  if fs::metadata(cali_path_buf.clone()).is_ok() {
99    info!("Found valid calibration file path {cali_path_buf:?}");
100    match RBCalibrations::from_file(cali_path, true) {
101      Err(err) => {
102        error!("Can't load calibration! {err}");
103      },
104      Ok(_c) => {
105        cali = _c;
106        cali_loaded = true;
107        debug!("We loaded calibration {}", cali);
108      }
109    }
110  } else {
111    warn!("Calibration file not available!");
112    cali_loaded = false;
113  }
114  
115  // FIXME - deprecate!
116  let mut events_not_sent : u64 = 0;
117  let mut data_type       : DataType   = DataType::Unknown;
118  // should we store drs deadtime instead of the FPGA temperature
119  let mut deadtime_instead_temp : bool = false;
120  
121  let mut streamer        = RBEventMemoryStreamer::new();
122  // FIXME
123  streamer.check_channel_errors = true;
124  let mut trace_suppressed      = false;  
125  match thread_control.lock() {
126    Ok(tc) => {
127      streamer.calc_crc32   = tc.liftof_settings.rb_settings.calc_crc32;
128      deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp; 
129      trace_suppressed      = tc.liftof_settings.mtb_settings.trace_suppression;
130    },
131    Err(err) => {
132      trace!("Can't acquire lock! {err}");
133    },
134  }
135  
136  // loop variables
137  // our cachesize is 50 events. This means each time we 
138  // receive data over bs_recv, we have received 50 more 
139  // events. This means we might want to wait for 50 MTE
140  // events?
141  let mut skipped_events : usize = 0;
142  let mut n_events = 0usize;
143  
144  'main : loop {
145    // FIXME - this whole loop needs to be faster, and there 
146    // is no interactive commanding anymore.
147    if thread_ctrl_check_timer.elapsed().as_secs() >= 6 {
148      match thread_control.lock() {
149        Ok(tc) => {
150          if tc.stop_flag {
151            info!("Received stop signal. Will stop thread!");
152            break;
153          }
154          streamer.calc_crc32   = tc.liftof_settings.rb_settings.calc_crc32;
155          deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp; 
156        },
157        Err(err) => {
158          trace!("Can't acquire lock! {err}");
159        },
160      }
161      thread_ctrl_check_timer = Instant::now();
162    }
163
164    if !get_op_mode.is_empty() {
165      match get_op_mode.try_recv() {
166        Err(err) => trace!("No op mode change detected! Err {err}"),
167        Ok(mode) => {
168          warn!("Will change operation mode to {:?}!", mode);
169          match mode {
170            TofOperationMode::Default    => {
171              streamer.request_mode = false;
172              op_mode = mode;
173            },
174            TofOperationMode::RBWaveform   => {
175              if !cali_loaded {
176                error!("Requesting waveform analysis without having a calibration loaded!");
177                error!("Can't do waveform analysis without calibration!");
178                error!("Switching mode to Default");
179                op_mode = TofOperationMode::Default;
180              }
181            }
182            _ => (),
183          }
184        }
185      }
186    }
187    if !dtf_fr_runner.is_empty() {
188      match dtf_fr_runner.try_recv() {
189        Err(err) => {
190          error!("Issues receiving datatype/format! {err}");
191        }
192        Ok(dtf) => {
193          data_type = dtf; 
194          info!("Will process events for data type {}!", data_type);
195        }
196      }
197    }
198    if bs_recv.is_empty() {
199      //println!("--> Empty bs_rec");
200      // FIXME - benchmark
201      //thread::sleep(one_milli/2);
202      continue 'main;
203    }
204    // this can't be blocking anymore, since 
205    // otherwise we miss the datatype
206    let mut bytestream : Vec<u8>;
207    if events_not_sent > 0 {
208      error!("There were {events_not_sent} for this iteration of received bytes!");
209    }
210    if skipped_events > 0 {
211      error!("We skipped {} events!", skipped_events);
212    }
213    // reset skipped events and events not sent, 
214    // these are per iteration
215    events_not_sent = 0;
216    skipped_events  = 0;
217    match bs_recv.recv() {
218      Err(err) => {
219        error!("Received Garbage! Err {err}");
220        continue 'main;
221      }
222      Ok(_stream) => {
223        info!("Received {} bytes!", _stream.len());
224        bytestream = _stream;
225        //streamer.add(&bytestream, bytestream.len());
226        streamer.consume(&mut bytestream);
227        let mut packets_in_stream : u32 = 0;
228        let mut last_event_id     : u32 = 0;
229        //println!("Streamer::stream size {}", streamer.stream.len());
230        loop {
231          if streamer.is_depleted {
232            info!("Streamer exhausted after sending {} packets!", packets_in_stream);
233            //break 'event_reader;
234            // we immediatly want more data in the streamer
235            continue 'main;
236          }
237          // FIXME - here we have the choice. 
238          // streamer.next() will yield the next event,
239          // decoded
240          // streamer.next_tofpacket() instead will only
241          // yield the next event, not deserialzed
242          // but wrapped already in a tofpacket
243          let mut tp_to_send = TofPacket::new();
244          match op_mode {
245            TofOperationMode::RBHighThroughput => {
246              match streamer.next_tofpacket() {
247                None => {
248                  streamer.is_depleted = true;
249                  continue 'main;
250                },
251                Some(tp) => {
252                  tp_to_send = tp;
253                }
254              }
255            },
256            TofOperationMode::Default |
257            TofOperationMode::RBWaveform => {
258              match streamer.next() {
259                None => {
260                  streamer.is_depleted = true;
261                  continue 'main;
262                },
263                Some(mut event) => {
264                  event.header.set_rbpaddleid(&rbpaddleid);
265                  if deadtime_instead_temp {
266                    // in case we want to add the deadtime to the header, 
267                    // we have to do that here!
268                    event.header.deadtime_instead_temp = deadtime_instead_temp;
269                    match get_deadtime() {
270                      Err(err) => {
271                        error!("Unable to get DRS4 deadtime! {err}");
272                        event.header.drs_deadtime = u16::MAX;
273                      }
274                      Ok(d_time) => {
275                        event.header.drs_deadtime = d_time as u16;
276                      }
277                    }
278                  }
279                  //println!("Got event id {}", event.header.event_id);
280                  if last_event_id != 0 {
281                    if event.header.event_id != last_event_id + 1 {
282                      if event.header.event_id > last_event_id {
283                        if !trace_suppressed {
284                          skipped_events += (event.header.event_id - last_event_id -1) as usize;
285                        }
286                      } else {
287                        error!("Something with the event counter is messed up. Got event id {}, but the last event id was {}", event.header.event_id, last_event_id);
288                      }
289                    }
290                  }
291                  last_event_id = event.header.event_id;
292                  //println!("This event id {}!", last_event_id);
293                  event.data_type = data_type;
294                  if verbose {
295                    match stat.lock() {
296                      Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
297                      Ok(mut s) => {
298                        if s.first_evid == 0 {
299                          s.first_evid = event.header.event_id;
300                        }
301                        s.last_evid = event.header.event_id;
302                        if event.status == EventStatus::ChannelIDWrong {
303                          s.n_err_chid_wrong += 1;
304                        }
305                        if event.status == EventStatus::TailWrong {
306                          s.n_err_tail_wrong += 1;
307                        }
308                      }
309                    }
310                  }
311                  if event.status != EventStatus::Unknown {
312                    if only_perfect_events && event.status != EventStatus::Perfect {
313                      info!("Not sending this event, because it's event status is {} and we requested to send only events with EventStatus::Perfect!", event.status);
314                      continue;
315                    }
316                  }
317                  if op_mode == TofOperationMode::RBWaveform {
318                    //debug!("Using paddle map {:?}", paddle_map);
319                    //match waveform_analysis(&mut event, &paddle_map, &cali) {
320                    //  Err(err) => error!("Waveform analysis failed! {err}"),
321                    //  Ok(_)    => ()
322                    //}
323                  }
324                  n_events += 1;
325                  if verbose && n_events % 100 == 0 {
326                    println!("[EVTPROC (verbose)] => Sending event {}", event);
327                  }
328                  tp_to_send = event.pack();
329                },
330              } 
331            }, // end op mode ~ waveform/event
332            _ => {
333              error!("Operation mode {} not available yet!", op_mode);
334            }
335          }
336          if verbose {
337            match stat.lock() {
338              Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
339              Ok(mut _st)  => {
340                _st.evproc_npack += 1; 
341              }
342            }
343            //println!("[EVTPROC (verbose)] => Sending TofPacket {}", tp_to_send);
344          }
345          // set flags
346          match data_type {
347            DataType::VoltageCalibration |
348            DataType::TimingCalibration  | 
349            DataType::Noi => {
350              tp_to_send.no_write_to_disk = true;
351            },
352            _ => ()
353          }
354          // send the packet
355          match tp_sender.send(tp_to_send) {
356            Ok(_) => {
357              packets_in_stream += 1;
358            },
359            Err(err) => {
360              error!("Problem sending TofPacket over channel! {err}");
361              events_not_sent += 1;
362            }
363          }
364        } // end 'event_reader
365      }, // end OK(recv)
366    }// end match 
367  } // end outer loop
368}
369