liftof_rb/threads/
event_processing.rs

1//! Event processing deals with the raw memory input
2//! from the buffers, send to it by the runner 
3//! when reading out the system memory
4//!
5//! Different modes are available, from sending
6//! TofPackets directly through the RBEventMemoryStreamer
7//! without any further parsing of the events (this has 
8//! to be done then on the TOF main computer) to doing 
9//! waveform analysis on the RBs directly
10
11use std::fs;
12use std::path::PathBuf;
13use std::sync::{
14  Arc,
15  Mutex,
16};
17
18use std::time::Instant;
19
20use crossbeam_channel::{
21  Sender,
22  Receiver
23};
24
25use gondola_core::prelude::*;
26
27use crate::control::get_deadtime;
28
29///  Transforms raw bytestream to TofPackets
30///
31///  This allows to get the eventid from the 
32///  binrary form of the RBEvent
33///
34///  #Arguments
35///  
36///  * board_id            : The unique ReadoutBoard identifier
37///                          (ID) of this RB
38///  * bs_recv             : A receiver for bytestreams. The 
39///                          bytestream comes directly from 
40///                          the data buffers.
41///  * get_op_mode         : The TOF operation mode. Typically,
42///                          this is "Default", meaning that the
43///                          RBs will sent what is in the memory 
44///                          buffer translated into RBEvents.
45///                          In "RBHighThrougput" mode, it will not
46///                          translate them into RBEvents, but just
47///                          transmits the content of the buffers, and 
48///                          RBWaveform mode will do waveform analysis
49///                          on the boards
50///  * tp_sender           : Send the resulting data product to 
51///                          get processed further
52///  * data_type           : If different from 0, do some processing
53///                          on the data read from memory
54///  * verbose             : More output to the console for debugging
55///  * only_perfect_events : Only transmit events with EventStatus::Perfect.
56///                          This only applies when the op mode is not 
57///                          RBHighThroughput
58pub fn event_processing(board_id            : u8,
59                        rbpaddleid          : RBPaddleID,
60                        bs_recv             : &Receiver<Vec<u8>>,
61                        get_op_mode         : &Receiver<TofOperationMode>, 
62                        tp_sender           : &Sender<TofPacket>,
63                        dtf_fr_runner       : &Receiver<DataType>,
64                        verbose             : bool,
65                        thread_control      : Arc<Mutex<ThreadControl>>,
66                        stat                : Arc<Mutex<RunStatistics>>,
67                        only_perfect_events : bool) {
68  
69  let mut op_mode = TofOperationMode::Default;
70  let mut thread_ctrl_check_timer = Instant::now();
71
72  // load calibration just in case?
73  let mut cali_loaded = false;
74  let cali        : RBCalibrations;
75  let cali_path       = format!("/home/gaps/calib/rb_{:0>2}.cali.tof.gaps", board_id);
76  let cali_path_buf   = PathBuf::from(&cali_path);
77  if fs::metadata(cali_path_buf.clone()).is_ok() {
78    info!("Found valid calibration file path {cali_path_buf:?}");
79    match RBCalibrations::from_file(cali_path, true) {
80      Err(err) => {
81        error!("Can't load calibration! {err}");
82      },
83      Ok(_c) => {
84        cali = _c;
85        cali_loaded = true;
86        debug!("We loaded calibration {}", cali);
87      }
88    }
89  } else {
90    warn!("Calibration file not available!");
91    cali_loaded = false;
92  }
93  
94  // FIXME - deprecate!
95  let mut events_not_sent : u64 = 0;
96  let mut data_type       : DataType   = DataType::Unknown;
97  // should we store drs deadtime instead of the FPGA temperature
98  let mut deadtime_instead_temp : bool = false;
99  
100  let mut streamer        = RBEventMemoryStreamer::new();
101  // FIXME
102  streamer.check_channel_errors = true;
103  let mut trace_suppressed      = false;  
104  match thread_control.lock() {
105    Ok(tc) => {
106      streamer.calc_crc32   = tc.liftof_settings.rb_settings.calc_crc32;
107      deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp; 
108      trace_suppressed      = tc.liftof_settings.mtb_settings.trace_suppression;
109    },
110    Err(err) => {
111      trace!("Can't acquire lock! {err}");
112    },
113  }
114  
115  // loop variables
116  // our cachesize is 50 events. This means each time we 
117  // receive data over bs_recv, we have received 50 more 
118  // events. This means we might want to wait for 50 MTE
119  // events?
120  let mut skipped_events         = 0usize;
121  let mut n_events               = 0usize;
122  let mut n_event_id_too_large   = 0u32; 
123  let mut n_event_id_zero        = 0u32;
124  'main : loop {
125    // FIXME - this whole loop needs to be faster, and there 
126    // is no interactive commanding anymore.
127    if thread_ctrl_check_timer.elapsed().as_secs() >= 6 {
128      match thread_control.lock() {
129        Ok(mut tc) => {
130          if tc.stop_flag {
131            info!("Received stop signal. Will stop thread!");
132            break;
133          }
134          streamer.calc_crc32   = tc.liftof_settings.rb_settings.calc_crc32;
135          deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp; 
136          tc.lost_event_ids     = (n_event_id_zero + n_event_id_too_large) as f32 / n_events as f32;
137        },
138        Err(err) => {
139          trace!("Can't acquire lock! {err}");
140        },
141      }
142      //if n_event_id_too_large > 0 {
143      //  error!("We saw {} events with an event id > 90000 in the last 6 seconds!", n_event_id_too_large);
144      //  //n_event_id_too_large = 0;
145      //}
146      //if n_event_id_zero > 0 {
147      //  error!("We saw {} events with an event id == 0 in the last 6 seconds!", n_event_id_zero);
148      //  //n_event_id_zero = 0;
149      //}
150      thread_ctrl_check_timer = Instant::now();
151    }
152
153    if !get_op_mode.is_empty() {
154      match get_op_mode.try_recv() {
155        Err(err) => trace!("No op mode change detected! Err {err}"),
156        Ok(mode) => {
157          warn!("Will change operation mode to {:?}!", mode);
158          match mode {
159            TofOperationMode::Default    => {
160              streamer.request_mode = false;
161              op_mode = mode;
162            },
163            TofOperationMode::RBWaveform   => {
164              if !cali_loaded {
165                error!("Requesting waveform analysis without having a calibration loaded!");
166                error!("Can't do waveform analysis without calibration!");
167                error!("Switching mode to Default");
168                op_mode = TofOperationMode::Default;
169              }
170            }
171            _ => (),
172          }
173        }
174      }
175    }
176    if !dtf_fr_runner.is_empty() {
177      match dtf_fr_runner.try_recv() {
178        Err(err) => {
179          error!("Issues receiving datatype/format! {err}");
180        }
181        Ok(dtf) => {
182          data_type = dtf; 
183          info!("Will process events for data type {}!", data_type);
184        }
185      }
186    }
187    if bs_recv.is_empty() {
188      //println!("--> Empty bs_rec");
189      // FIXME - benchmark
190      //thread::sleep(one_milli/2);
191      continue 'main;
192    }
193    // this can't be blocking anymore, since 
194    // otherwise we miss the datatype
195    let mut bytestream : Vec<u8>;
196    if events_not_sent > 0 {
197      error!("There were {events_not_sent} for this iteration of received bytes!");
198    }
199    if skipped_events > 0 {
200      error!("We skipped {} events!", skipped_events);
201    }
202    // reset skipped events and events not sent, 
203    // these are per iteration
204    events_not_sent = 0;
205    skipped_events  = 0;
206    match bs_recv.recv() {
207      Err(err) => {
208        error!("Received Garbage! Err {err}");
209        continue 'main;
210      }
211      Ok(_stream) => {
212        info!("Received {} bytes!", _stream.len());
213        bytestream = _stream;
214        //streamer.add(&bytestream, bytestream.len());
215        streamer.consume(&mut bytestream);
216        let mut packets_in_stream : u32 = 0;
217        let mut last_event_id     : u32 = 0;
218        //let mut last_event        : RBEvent = RBEvent::new();
219        //println!("Streamer::stream size {}", streamer.stream.len());
220        loop {
221          if streamer.is_depleted {
222            info!("Streamer exhausted after sending {} packets!", packets_in_stream);
223            //break 'event_reader;
224            // we immediatly want more data in the streamer
225            continue 'main;
226          }
227          // FIXME - here we have the choice. 
228          // streamer.next() will yield the next event,
229          // decoded
230          // streamer.next_tofpacket() instead will only
231          // yield the next event, not deserialzed
232          // but wrapped already in a tofpacket
233          let mut tp_to_send = TofPacket::new();
234          match op_mode {
235            TofOperationMode::RBHighThroughput => {
236              match streamer.next_tofpacket() {
237                None => {
238                  streamer.is_depleted = true;
239                  continue 'main;
240                },
241                Some(tp) => {
242                  tp_to_send = tp;
243                }
244              }
245            },
246            TofOperationMode::Default |
247            TofOperationMode::RBWaveform => {
248              match streamer.next() {
249                None => {
250                  streamer.is_depleted = true;
251                  continue 'main;
252                },
253                Some(mut event) => {
254                  event.header.set_rbpaddleid(&rbpaddleid);
255                  if deadtime_instead_temp {
256                    // in case we want to add the deadtime to the header, 
257                    // we have to do that here!
258                    event.header.deadtime_instead_temp = deadtime_instead_temp;
259                    match get_deadtime() {
260                      Err(err) => {
261                        error!("Unable to get DRS4 deadtime! {err}");
262                        event.header.drs_deadtime = u16::MAX;
263                      }
264                      Ok(d_time) => {
265                        event.header.drs_deadtime = d_time as u16;
266                      }
267                    }
268                  }
269                  //println!("Got event id {}", event.header.event_id);
270                  if event.header.event_id == 0 {
271                    //error!("The current event id is 0!");
272                    //error!("Event {}", event);
273                    //event.status = EventStatus::RBEventWacky;
274                    n_event_id_zero += 1;
275                    continue; 
276                  } 
277                  if last_event_id != 0 {
278                    let delta_evid = event.header.event_id as i64 - last_event_id as i64;
279                    
280                    //if event.header.event_id > 42_000_000u32 {
281                    // 90000 -> at 100Hz, that is 15mins deviation
282                    if i64::abs(delta_evid) > 90000 {
283                      //error!("The event id deviates more than 90000 event ids from the last event id!");
284                      //error!("Event {}", event);
285                      //error!("Last Event {}", last_event);
286                      //error!("The event id is larger than 42000000");
287                      //event.status = EventStatus::RBEventWacky;
288                      n_event_id_too_large += 1;
289                      continue;
290                    }
291                  }
292                  if last_event_id != 0 {
293                    if event.header.event_id != last_event_id + 1 {
294                      if event.header.event_id > last_event_id {
295                        if !trace_suppressed {
296                          skipped_events += (event.header.event_id - last_event_id -1) as usize;
297                        }
298                      } else {
299                        error!("Something with the event counter is messed up. Got event id {}, but the last event id was {}", event.header.event_id, last_event_id);
300                      }
301                    }
302                  }
303                  // Now as we are through our sanity checks, the event id should be can use it for 
304                  // reference
305                  last_event_id = event.header.event_id;
306                  //last_event    = event.clone();
307                  //println!("This event id {}!", last_event_id);
308                  event.data_type = data_type;
309                  // skip for now, since we change event status
310                  //if verbose {
311                  //  match stat.lock() {
312                  //    Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
313                  //    Ok(mut s) => {
314                  //      if s.first_evid == 0 {
315                  //        s.first_evid = event.header.event_id;
316                  //      }
317                  //      s.last_evid = event.header.event_id;
318                  //      if event.status == EventStatus::ChannelIDWrong {
319                  //        s.n_err_chid_wrong += 1;
320                  //      }
321                  //      if event.status == EventStatus::TailWrong {
322                  //        s.n_err_tail_wrong += 1;
323                  //      }
324                  //    }
325                  //  }
326                  //}
327                  if event.status != EventStatus::Unknown {
328                    if only_perfect_events && event.status != EventStatus::Perfect {
329                      info!("Not sending this event, because it's event status is {} and we requested to send only events with EventStatus::Perfect!", event.status);
330                      continue;
331                    }
332                  }
333                  if op_mode == TofOperationMode::RBWaveform {
334                    //debug!("Using paddle map {:?}", paddle_map);
335                    //match waveform_analysis(&mut event, &paddle_map, &cali) {
336                    //  Err(err) => error!("Waveform analysis failed! {err}"),
337                    //  Ok(_)    => ()
338                    //}
339                  }
340                  n_events += 1;
341                  if verbose && n_events % 100 == 0 {
342                    println!("[EVTPROC (verbose)] => Sending event {}", event);
343                  }
344                  tp_to_send = event.pack();
345                },
346              } 
347            }, // end op mode ~ waveform/event
348            _ => {
349              error!("Operation mode {} not available yet!", op_mode);
350            }
351          }
352          if verbose {
353            match stat.lock() {
354              Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
355              Ok(mut _st)  => {
356                _st.evproc_npack += 1; 
357              }
358            }
359            //println!("[EVTPROC (verbose)] => Sending TofPacket {}", tp_to_send);
360          }
361          // set flags
362          match data_type {
363            DataType::VoltageCalibration |
364            DataType::TimingCalibration  | 
365            DataType::Noi => {
366              tp_to_send.no_write_to_disk = true;
367            },
368            _ => ()
369          }
370          // send the packet
371          match tp_sender.send(tp_to_send) {
372            Ok(_) => {
373              packets_in_stream += 1;
374            },
375            Err(err) => {
376              error!("Problem sending TofPacket over channel! {err}");
377              events_not_sent += 1;
378            }
379          }
380        } // end 'event_reader
381      }, // end OK(recv)
382    }// end match 
383  } // end outer loop
384}
385