liftof_rb/threads/
event_processing.rs

1//! Event processing deals with the raw memory input
2//! from the buffers, send to it by the runner 
3//! when reading out the system memory
4//!
5//! Different modes are available, from sending
6//! TofPackets directly through the RBEventMemoryStreamer
7//! without any further parsing of the events (this has 
8//! to be done then on the TOF main computer) to doing 
9//! waveform analysis on the RBs directly
10
11use std::fs;
12use std::path::PathBuf;
13use std::sync::{
14  Arc,
15  Mutex,
16};
17
18use std::time::Instant;
19
20use crossbeam_channel::{
21  Sender,
22  Receiver
23};
24
25use tof_dataclasses::events::DataType;
26use tof_dataclasses::packets::{
27  TofPacket,
28};
29use tof_dataclasses::io::RBEventMemoryStreamer;
30use tof_dataclasses::calibrations::RBCalibrations;
31use tof_dataclasses::events::EventStatus;
32use tof_dataclasses::commands::{
33  TofOperationMode,
34};
35
36use tof_dataclasses::events::rb_event::RBPaddleID;
37
38use liftof_lib::{
39  RunStatistics,
40  //waveform_analysis,
41};
42
43use liftof_lib::thread_control::ThreadControl;
44
45use crate::control::get_deadtime;
46
47///  Transforms raw bytestream to TofPackets
48///
49///  This allows to get the eventid from the 
50///  binrary form of the RBEvent
51///
52///  #Arguments
53///  
54///  * board_id            : The unique ReadoutBoard identifier
55///                          (ID) of this RB
56///  * bs_recv             : A receiver for bytestreams. The 
57///                          bytestream comes directly from 
58///                          the data buffers.
59///  * get_op_mode         : The TOF operation mode. Typically,
60///                          this is "Default", meaning that the
61///                          RBs will sent what is in the memory 
62///                          buffer translated into RBEvents.
63///                          In "RBHighThrougput" mode, it will not
64///                          translate them into RBEvents, but just
65///                          transmits the content of the buffers, and 
66///                          RBWaveform mode will do waveform analysis
67///                          on the boards
68///  * tp_sender           : Send the resulting data product to 
69///                          get processed further
70///  * data_type           : If different from 0, do some processing
71///                          on the data read from memory
72///  * verbose             : More output to the console for debugging
73///  * only_perfect_events : Only transmit events with EventStatus::Perfect.
74///                          This only applies when the op mode is not 
75///                          RBHighThroughput
76pub fn event_processing(board_id            : u8,
77                        rbpaddleid          : RBPaddleID,
78                        bs_recv             : &Receiver<Vec<u8>>,
79                        get_op_mode         : &Receiver<TofOperationMode>, 
80                        tp_sender           : &Sender<TofPacket>,
81                        dtf_fr_runner       : &Receiver<DataType>,
82                        verbose             : bool,
83                        thread_control      : Arc<Mutex<ThreadControl>>,
84                        stat                : Arc<Mutex<RunStatistics>>,
85                        only_perfect_events : bool) {
86  
87  let mut op_mode = TofOperationMode::Default;
88  let mut thread_ctrl_check_timer = Instant::now();
89
90  // load calibration just in case?
91  let mut cali_loaded = false;
92  let cali        : RBCalibrations;
93  let cali_path       = format!("/home/gaps/calib/rb_{:0>2}.cali.tof.gaps", board_id);
94  let cali_path_buf   = PathBuf::from(&cali_path);
95  if fs::metadata(cali_path_buf.clone()).is_ok() {
96    info!("Found valid calibration file path {cali_path_buf:?}");
97    match RBCalibrations::from_file(cali_path, true) {
98      Err(err) => {
99        error!("Can't load calibration! {err}");
100      },
101      Ok(_c) => {
102        cali = _c;
103        cali_loaded = true;
104        debug!("We loaded calibration {}", cali);
105      }
106    }
107  } else {
108    warn!("Calibration file not available!");
109    cali_loaded = false;
110  }
111  
112  // FIXME - deprecate!
113  let mut events_not_sent : u64 = 0;
114  let mut data_type       : DataType   = DataType::Unknown;
115  // should we store drs deadtime instead of the FPGA temperature
116  let mut deadtime_instead_temp : bool = false;
117  
118  let mut streamer        = RBEventMemoryStreamer::new();
119  // FIXME
120  streamer.check_channel_errors = true;
121  let mut trace_suppressed      = false;  
122  match thread_control.lock() {
123    Ok(tc) => {
124      streamer.calc_crc32   = tc.liftof_settings.rb_settings.calc_crc32;
125      deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp; 
126      trace_suppressed      = tc.liftof_settings.mtb_settings.trace_suppression;
127    },
128    Err(err) => {
129      trace!("Can't acquire lock! {err}");
130    },
131  }
132  
133  // loop variables
134  // our cachesize is 50 events. This means each time we 
135  // receive data over bs_recv, we have received 50 more 
136  // events. This means we might want to wait for 50 MTE
137  // events?
138  let mut skipped_events : usize = 0;
139  let mut n_events = 0usize;
140  
141  'main : loop {
142    // FIXME - this whole loop needs to be faster, and there 
143    // is no interactive commanding anymore.
144    if thread_ctrl_check_timer.elapsed().as_secs() >= 6 {
145      match thread_control.lock() {
146        Ok(tc) => {
147          if tc.stop_flag {
148            info!("Received stop signal. Will stop thread!");
149            break;
150          }
151          streamer.calc_crc32   = tc.liftof_settings.rb_settings.calc_crc32;
152          deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp; 
153        },
154        Err(err) => {
155          trace!("Can't acquire lock! {err}");
156        },
157      }
158      thread_ctrl_check_timer = Instant::now();
159    }
160
161    if !get_op_mode.is_empty() {
162      match get_op_mode.try_recv() {
163        Err(err) => trace!("No op mode change detected! Err {err}"),
164        Ok(mode) => {
165          warn!("Will change operation mode to {:?}!", mode);
166          match mode {
167            TofOperationMode::Default    => {
168              streamer.request_mode = false;
169              op_mode = mode;
170            },
171            TofOperationMode::RBWaveform   => {
172              if !cali_loaded {
173                error!("Requesting waveform analysis without having a calibration loaded!");
174                error!("Can't do waveform analysis without calibration!");
175                error!("Switching mode to Default");
176                op_mode = TofOperationMode::Default;
177              }
178            }
179            _ => (),
180          }
181        }
182      }
183    }
184    if !dtf_fr_runner.is_empty() {
185      match dtf_fr_runner.try_recv() {
186        Err(err) => {
187          error!("Issues receiving datatype/format! {err}");
188        }
189        Ok(dtf) => {
190          data_type = dtf; 
191          info!("Will process events for data type {}!", data_type);
192        }
193      }
194    }
195    if bs_recv.is_empty() {
196      //println!("--> Empty bs_rec");
197      // FIXME - benchmark
198      //thread::sleep(one_milli/2);
199      continue 'main;
200    }
201    // this can't be blocking anymore, since 
202    // otherwise we miss the datatype
203    let mut bytestream : Vec<u8>;
204    if events_not_sent > 0 {
205      error!("There were {events_not_sent} for this iteration of received bytes!");
206    }
207    if skipped_events > 0 {
208      error!("We skipped {} events!", skipped_events);
209    }
210    // reset skipped events and events not sent, 
211    // these are per iteration
212    events_not_sent = 0;
213    skipped_events  = 0;
214    match bs_recv.recv() {
215      Err(err) => {
216        error!("Received Garbage! Err {err}");
217        continue 'main;
218      }
219      Ok(_stream) => {
220        info!("Received {} bytes!", _stream.len());
221        bytestream = _stream;
222        //streamer.add(&bytestream, bytestream.len());
223        streamer.consume(&mut bytestream);
224        let mut packets_in_stream : u32 = 0;
225        let mut last_event_id     : u32 = 0;
226        //println!("Streamer::stream size {}", streamer.stream.len());
227        loop {
228          if streamer.is_depleted {
229            info!("Streamer exhausted after sending {} packets!", packets_in_stream);
230            //break 'event_reader;
231            // we immediatly want more data in the streamer
232            continue 'main;
233          }
234          // FIXME - here we have the choice. 
235          // streamer.next() will yield the next event,
236          // decoded
237          // streamer.next_tofpacket() instead will only
238          // yield the next event, not deserialzed
239          // but wrapped already in a tofpacket
240          let mut tp_to_send = TofPacket::new();
241          match op_mode {
242            TofOperationMode::RBHighThroughput => {
243              match streamer.next_tofpacket() {
244                None => {
245                  streamer.is_depleted = true;
246                  continue 'main;
247                },
248                Some(tp) => {
249                  tp_to_send = tp;
250                }
251              }
252            },
253            TofOperationMode::Default |
254            TofOperationMode::RBWaveform => {
255              match streamer.next() {
256                None => {
257                  streamer.is_depleted = true;
258                  continue 'main;
259                },
260                Some(mut event) => {
261                  event.header.set_rbpaddleid(&rbpaddleid);
262                  if deadtime_instead_temp {
263                    // in case we want to add the deadtime to the header, 
264                    // we have to do that here!
265                    event.header.deadtime_instead_temp = deadtime_instead_temp;
266                    match get_deadtime() {
267                      Err(err) => {
268                        error!("Unable to get DRS4 deadtime! {err}");
269                        event.header.drs_deadtime = u16::MAX;
270                      }
271                      Ok(d_time) => {
272                        event.header.drs_deadtime = d_time as u16;
273                      }
274                    }
275                  }
276                  //println!("Got event id {}", event.header.event_id);
277                  if last_event_id != 0 {
278                    if event.header.event_id != last_event_id + 1 {
279                      if event.header.event_id > last_event_id {
280                        if !trace_suppressed {
281                          skipped_events += (event.header.event_id - last_event_id -1) as usize;
282                        }
283                      } else {
284                        error!("Something with the event counter is messed up. Got event id {}, but the last event id was {}", event.header.event_id, last_event_id);
285                      }
286                    }
287                  }
288                  last_event_id = event.header.event_id;
289                  //println!("This event id {}!", last_event_id);
290                  event.data_type = data_type;
291                  if verbose {
292                    match stat.lock() {
293                      Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
294                      Ok(mut s) => {
295                        if s.first_evid == 0 {
296                          s.first_evid = event.header.event_id;
297                        }
298                        s.last_evid = event.header.event_id;
299                        if event.status == EventStatus::ChannelIDWrong {
300                          s.n_err_chid_wrong += 1;
301                        }
302                        if event.status == EventStatus::TailWrong {
303                          s.n_err_tail_wrong += 1;
304                        }
305                      }
306                    }
307                  }
308                  if event.status != EventStatus::Unknown {
309                    if only_perfect_events && event.status != EventStatus::Perfect {
310                      info!("Not sending this event, because it's event status is {} and we requested to send only events with EventStatus::Perfect!", event.status);
311                      continue;
312                    }
313                  }
314                  if op_mode == TofOperationMode::RBWaveform {
315                    //debug!("Using paddle map {:?}", paddle_map);
316                    //match waveform_analysis(&mut event, &paddle_map, &cali) {
317                    //  Err(err) => error!("Waveform analysis failed! {err}"),
318                    //  Ok(_)    => ()
319                    //}
320                  }
321                  n_events += 1;
322                  if verbose && n_events % 100 == 0 {
323                    println!("[EVTPROC (verbose)] => Sending event {}", event);
324                  }
325                  tp_to_send = TofPacket::from(&event);
326                },
327              } 
328            }, // end op mode ~ waveform/event
329            _ => {
330              error!("Operation mode {} not available yet!", op_mode);
331            }
332          }
333          if verbose {
334            match stat.lock() {
335              Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
336              Ok(mut _st)  => {
337                _st.evproc_npack += 1; 
338              }
339            }
340            //println!("[EVTPROC (verbose)] => Sending TofPacket {}", tp_to_send);
341          }
342          // set flags
343          match data_type {
344            DataType::VoltageCalibration |
345            DataType::TimingCalibration  | 
346            DataType::Noi => {
347              tp_to_send.no_write_to_disk = true;
348            },
349            _ => ()
350          }
351          // send the packet
352          match tp_sender.send(tp_to_send) {
353            Ok(_) => {
354              packets_in_stream += 1;
355            },
356            Err(err) => {
357              error!("Problem sending TofPacket over channel! {err}");
358              events_not_sent += 1;
359            }
360          }
361        } // end 'event_reader
362      }, // end OK(recv)
363    }// end match 
364  } // end outer loop
365}
366