liftof_rb/threads/
event_processing.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
//! Event processing deals with the raw memory input
//! from the buffers, send to it by the runner 
//! when reading out the system memory
//!
//! Different modes are available, from sending
//! TofPackets directly through the RBEventMemoryStreamer
//! without any further parsing of the events (this has 
//! to be done then on the TOF main computer) to doing 
//! waveform analysis on the RBs directly

use std::fs;
use std::path::PathBuf;
use std::sync::{
  Arc,
  Mutex,
};

use std::time::Instant;

use crossbeam_channel::{
  Sender,
  Receiver
};

use tof_dataclasses::events::DataType;
use tof_dataclasses::packets::{
  TofPacket,
};
use tof_dataclasses::io::RBEventMemoryStreamer;
use tof_dataclasses::calibrations::RBCalibrations;
use tof_dataclasses::events::EventStatus;
use tof_dataclasses::commands::{
  TofOperationMode,
};

use tof_dataclasses::events::rb_event::RBPaddleID;

use liftof_lib::{
  RunStatistics,
  //waveform_analysis,
};

use liftof_lib::thread_control::ThreadControl;

use crate::control::get_deadtime;

///  Transforms raw bytestream to TofPackets
///
///  This allows to get the eventid from the 
///  binrary form of the RBEvent
///
///  #Arguments
///  
///  * board_id            : The unique ReadoutBoard identifier
///                          (ID) of this RB
///  * bs_recv             : A receiver for bytestreams. The 
///                          bytestream comes directly from 
///                          the data buffers.
///  * get_op_mode         : The TOF operation mode. Typically,
///                          this is "Default", meaning that the
///                          RBs will sent what is in the memory 
///                          buffer translated into RBEvents.
///                          In "RBHighThrougput" mode, it will not
///                          translate them into RBEvents, but just
///                          transmits the content of the buffers, and 
///                          RBWaveform mode will do waveform analysis
///                          on the boards
///  * tp_sender           : Send the resulting data product to 
///                          get processed further
///  * data_type           : If different from 0, do some processing
///                          on the data read from memory
///  * verbose             : More output to the console for debugging
///  * only_perfect_events : Only transmit events with EventStatus::Perfect.
///                          This only applies when the op mode is not 
///                          RBHighThroughput
pub fn event_processing(board_id            : u8,
                        rbpaddleid          : RBPaddleID,
                        bs_recv             : &Receiver<Vec<u8>>,
                        get_op_mode         : &Receiver<TofOperationMode>, 
                        tp_sender           : &Sender<TofPacket>,
                        dtf_fr_runner       : &Receiver<DataType>,
                        verbose             : bool,
                        thread_control      : Arc<Mutex<ThreadControl>>,
                        stat                : Arc<Mutex<RunStatistics>>,
                        only_perfect_events : bool) {
  
  let mut op_mode = TofOperationMode::Default;
  let mut thread_ctrl_check_timer = Instant::now();

  // load calibration just in case?
  let mut cali_loaded = false;
  let cali        : RBCalibrations;
  let cali_path       = format!("/home/gaps/calib/rb_{:0>2}.cali.tof.gaps", board_id);
  let cali_path_buf   = PathBuf::from(&cali_path);
  if fs::metadata(cali_path_buf.clone()).is_ok() {
    info!("Found valid calibration file path {cali_path_buf:?}");
    match RBCalibrations::from_file(cali_path, true) {
      Err(err) => {
        error!("Can't load calibration! {err}");
      },
      Ok(_c) => {
        cali = _c;
        cali_loaded = true;
        debug!("We loaded calibration {}", cali);
      }
    }
  } else {
    warn!("Calibration file not available!");
    cali_loaded = false;
  }
  
  // FIXME - deprecate!
  let mut events_not_sent : u64 = 0;
  let mut data_type       : DataType   = DataType::Unknown;
  // should we store drs deadtime instead of the FPGA temperature
  let mut deadtime_instead_temp : bool = false;
  
  let mut streamer        = RBEventMemoryStreamer::new();
  // FIXME
  streamer.check_channel_errors = true;
  let mut trace_suppressed      = false;  
  match thread_control.lock() {
    Ok(tc) => {
      streamer.calc_crc32   = tc.liftof_settings.rb_settings.calc_crc32;
      deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp; 
      trace_suppressed      = tc.liftof_settings.mtb_settings.trace_suppression;
    },
    Err(err) => {
      trace!("Can't acquire lock! {err}");
    },
  }
  
  // loop variables
  // our cachesize is 50 events. This means each time we 
  // receive data over bs_recv, we have received 50 more 
  // events. This means we might want to wait for 50 MTE
  // events?
  let mut skipped_events : usize = 0;
  let mut n_events = 0usize;
  
  'main : loop {
    // FIXME - this whole loop needs to be faster, and there 
    // is no interactive commanding anymore.
    if thread_ctrl_check_timer.elapsed().as_secs() >= 6 {
      match thread_control.lock() {
        Ok(tc) => {
          if tc.stop_flag {
            info!("Received stop signal. Will stop thread!");
            break;
          }
          streamer.calc_crc32   = tc.liftof_settings.rb_settings.calc_crc32;
          deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp; 
        },
        Err(err) => {
          trace!("Can't acquire lock! {err}");
        },
      }
      thread_ctrl_check_timer = Instant::now();
    }

    if !get_op_mode.is_empty() {
      match get_op_mode.try_recv() {
        Err(err) => trace!("No op mode change detected! Err {err}"),
        Ok(mode) => {
          warn!("Will change operation mode to {:?}!", mode);
          match mode {
            TofOperationMode::Default    => {
              streamer.request_mode = false;
              op_mode = mode;
            },
            TofOperationMode::RBWaveform   => {
              if !cali_loaded {
                error!("Requesting waveform analysis without having a calibration loaded!");
                error!("Can't do waveform analysis without calibration!");
                error!("Switching mode to Default");
                op_mode = TofOperationMode::Default;
              }
            }
            _ => (),
          }
        }
      }
    }
    if !dtf_fr_runner.is_empty() {
      match dtf_fr_runner.try_recv() {
        Err(err) => {
          error!("Issues receiving datatype/format! {err}");
        }
        Ok(dtf) => {
          data_type = dtf; 
          info!("Will process events for data type {}!", data_type);
        }
      }
    }
    if bs_recv.is_empty() {
      //println!("--> Empty bs_rec");
      // FIXME - benchmark
      //thread::sleep(one_milli/2);
      continue 'main;
    }
    // this can't be blocking anymore, since 
    // otherwise we miss the datatype
    let mut bytestream : Vec<u8>;
    if events_not_sent > 0 {
      error!("There were {events_not_sent} for this iteration of received bytes!");
    }
    if skipped_events > 0 {
      error!("We skipped {} events!", skipped_events);
    }
    // reset skipped events and events not sent, 
    // these are per iteration
    events_not_sent = 0;
    skipped_events  = 0;
    match bs_recv.recv() {
      Err(err) => {
        error!("Received Garbage! Err {err}");
        continue 'main;
      }
      Ok(_stream) => {
        info!("Received {} bytes!", _stream.len());
        bytestream = _stream;
        //streamer.add(&bytestream, bytestream.len());
        streamer.consume(&mut bytestream);
        let mut packets_in_stream : u32 = 0;
        let mut last_event_id     : u32 = 0;
        //println!("Streamer::stream size {}", streamer.stream.len());
        loop {
          if streamer.is_depleted {
            info!("Streamer exhausted after sending {} packets!", packets_in_stream);
            //break 'event_reader;
            // we immediatly want more data in the streamer
            continue 'main;
          }
          // FIXME - here we have the choice. 
          // streamer.next() will yield the next event,
          // decoded
          // streamer.next_tofpacket() instead will only
          // yield the next event, not deserialzed
          // but wrapped already in a tofpacket
          let mut tp_to_send = TofPacket::new();
          match op_mode {
            TofOperationMode::RBHighThroughput => {
              match streamer.next_tofpacket() {
                None => {
                  streamer.is_depleted = true;
                  continue 'main;
                },
                Some(tp) => {
                  tp_to_send = tp;
                }
              }
            },
            TofOperationMode::Default |
            TofOperationMode::RBWaveform => {
              match streamer.next() {
                None => {
                  streamer.is_depleted = true;
                  continue 'main;
                },
                Some(mut event) => {
                  event.header.set_rbpaddleid(&rbpaddleid);
                  if deadtime_instead_temp {
                    // in case we want to add the deadtime to the header, 
                    // we have to do that here!
                    event.header.deadtime_instead_temp = deadtime_instead_temp;
                    match get_deadtime() {
                      Err(err) => {
                        error!("Unable to get DRS4 deadtime! {err}");
                        event.header.drs_deadtime = u16::MAX;
                      }
                      Ok(d_time) => {
                        event.header.drs_deadtime = d_time as u16;
                      }
                    }
                  }
                  //println!("Got event id {}", event.header.event_id);
                  if last_event_id != 0 {
                    if event.header.event_id != last_event_id + 1 {
                      if event.header.event_id > last_event_id {
                        if !trace_suppressed {
                          skipped_events += (event.header.event_id - last_event_id -1) as usize;
                        }
                      } else {
                        error!("Something with the event counter is messed up. Got event id {}, but the last event id was {}", event.header.event_id, last_event_id);
                      }
                    }
                  }
                  last_event_id = event.header.event_id;
                  //println!("This event id {}!", last_event_id);
                  event.data_type = data_type;
                  if verbose {
                    match stat.lock() {
                      Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
                      Ok(mut s) => {
                        if s.first_evid == 0 {
                          s.first_evid = event.header.event_id;
                        }
                        s.last_evid = event.header.event_id;
                        if event.status == EventStatus::ChannelIDWrong {
                          s.n_err_chid_wrong += 1;
                        }
                        if event.status == EventStatus::TailWrong {
                          s.n_err_tail_wrong += 1;
                        }
                      }
                    }
                  }
                  if event.status != EventStatus::Unknown {
                    if only_perfect_events && event.status != EventStatus::Perfect {
                      info!("Not sending this event, because it's event status is {} and we requested to send only events with EventStatus::Perfect!", event.status);
                      continue;
                    }
                  }
                  if op_mode == TofOperationMode::RBWaveform {
                    //debug!("Using paddle map {:?}", paddle_map);
                    //match waveform_analysis(&mut event, &paddle_map, &cali) {
                    //  Err(err) => error!("Waveform analysis failed! {err}"),
                    //  Ok(_)    => ()
                    //}
                  }
                  n_events += 1;
                  if verbose && n_events % 100 == 0 {
                    println!("[EVTPROC (verbose)] => Sending event {}", event);
                  }
                  tp_to_send = TofPacket::from(&event);
                },
              } 
            }, // end op mode ~ waveform/event
            _ => {
              error!("Operation mode {} not available yet!", op_mode);
            }
          }
          if verbose {
            match stat.lock() {
              Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
              Ok(mut _st)  => {
                _st.evproc_npack += 1; 
              }
            }
            //println!("[EVTPROC (verbose)] => Sending TofPacket {}", tp_to_send);
          }
          // set flags
          match data_type {
            DataType::VoltageCalibration |
            DataType::TimingCalibration  | 
            DataType::Noi => {
              tp_to_send.no_write_to_disk = true;
            },
            _ => ()
          }
          // send the packet
          match tp_sender.send(tp_to_send) {
            Ok(_) => {
              packets_in_stream += 1;
            },
            Err(err) => {
              error!("Problem sending TofPacket over channel! {err}");
              events_not_sent += 1;
            }
          }
        } // end 'event_reader
      }, // end OK(recv)
    }// end match 
  } // end outer loop
}