liftof_rb/threads/event_processing.rs
1//! Event processing deals with the raw memory input
2//! from the buffers, send to it by the runner
3//! when reading out the system memory
4//!
5//! Different modes are available, from sending
6//! TofPackets directly through the RBEventMemoryStreamer
7//! without any further parsing of the events (this has
8//! to be done then on the TOF main computer) to doing
9//! waveform analysis on the RBs directly
10
11use std::fs;
12use std::path::PathBuf;
13use std::sync::{
14 Arc,
15 Mutex,
16};
17
18use std::time::Instant;
19
20use crossbeam_channel::{
21 Sender,
22 Receiver
23};
24
25use gondola_core::prelude::*;
26
27use crate::control::get_deadtime;
28
29/// Transforms raw bytestream to TofPackets
30///
31/// This allows to get the eventid from the
32/// binrary form of the RBEvent
33///
34/// #Arguments
35///
36/// * board_id : The unique ReadoutBoard identifier
37/// (ID) of this RB
38/// * bs_recv : A receiver for bytestreams. The
39/// bytestream comes directly from
40/// the data buffers.
41/// * get_op_mode : The TOF operation mode. Typically,
42/// this is "Default", meaning that the
43/// RBs will sent what is in the memory
44/// buffer translated into RBEvents.
45/// In "RBHighThrougput" mode, it will not
46/// translate them into RBEvents, but just
47/// transmits the content of the buffers, and
48/// RBWaveform mode will do waveform analysis
49/// on the boards
50/// * tp_sender : Send the resulting data product to
51/// get processed further
52/// * data_type : If different from 0, do some processing
53/// on the data read from memory
54/// * verbose : More output to the console for debugging
55/// * only_perfect_events : Only transmit events with EventStatus::Perfect.
56/// This only applies when the op mode is not
57/// RBHighThroughput
58pub fn event_processing(board_id : u8,
59 rbpaddleid : RBPaddleID,
60 bs_recv : &Receiver<Vec<u8>>,
61 get_op_mode : &Receiver<TofOperationMode>,
62 tp_sender : &Sender<TofPacket>,
63 dtf_fr_runner : &Receiver<DataType>,
64 verbose : bool,
65 thread_control : Arc<Mutex<ThreadControl>>,
66 stat : Arc<Mutex<RunStatistics>>,
67 only_perfect_events : bool) {
68
69 let mut op_mode = TofOperationMode::Default;
70 let mut thread_ctrl_check_timer = Instant::now();
71
72 // load calibration just in case?
73 let mut cali_loaded = false;
74 let cali : RBCalibrations;
75 let cali_path = format!("/home/gaps/calib/rb_{:0>2}.cali.tof.gaps", board_id);
76 let cali_path_buf = PathBuf::from(&cali_path);
77 if fs::metadata(cali_path_buf.clone()).is_ok() {
78 info!("Found valid calibration file path {cali_path_buf:?}");
79 match RBCalibrations::from_file(cali_path, true) {
80 Err(err) => {
81 error!("Can't load calibration! {err}");
82 },
83 Ok(_c) => {
84 cali = _c;
85 cali_loaded = true;
86 debug!("We loaded calibration {}", cali);
87 }
88 }
89 } else {
90 warn!("Calibration file not available!");
91 cali_loaded = false;
92 }
93
94 // FIXME - deprecate!
95 let mut events_not_sent : u64 = 0;
96 let mut data_type : DataType = DataType::Unknown;
97 // should we store drs deadtime instead of the FPGA temperature
98 let mut deadtime_instead_temp : bool = false;
99
100 let mut streamer = RBEventMemoryStreamer::new();
101 // FIXME
102 streamer.check_channel_errors = true;
103 let mut trace_suppressed = false;
104 match thread_control.lock() {
105 Ok(tc) => {
106 streamer.calc_crc32 = tc.liftof_settings.rb_settings.calc_crc32;
107 deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp;
108 trace_suppressed = tc.liftof_settings.mtb_settings.trace_suppression;
109 },
110 Err(err) => {
111 trace!("Can't acquire lock! {err}");
112 },
113 }
114
115 // loop variables
116 // our cachesize is 50 events. This means each time we
117 // receive data over bs_recv, we have received 50 more
118 // events. This means we might want to wait for 50 MTE
119 // events?
120 let mut skipped_events = 0usize;
121 let mut n_events = 0usize;
122 let mut n_event_id_too_large = 0u32;
123 let mut n_event_id_zero = 0u32;
124 'main : loop {
125 // FIXME - this whole loop needs to be faster, and there
126 // is no interactive commanding anymore.
127 if thread_ctrl_check_timer.elapsed().as_secs() >= 6 {
128 match thread_control.lock() {
129 Ok(mut tc) => {
130 if tc.stop_flag {
131 info!("Received stop signal. Will stop thread!");
132 break;
133 }
134 streamer.calc_crc32 = tc.liftof_settings.rb_settings.calc_crc32;
135 deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp;
136 tc.lost_event_ids = (n_event_id_zero + n_event_id_too_large) as f32 / n_events as f32;
137 },
138 Err(err) => {
139 trace!("Can't acquire lock! {err}");
140 },
141 }
142 //if n_event_id_too_large > 0 {
143 // error!("We saw {} events with an event id > 90000 in the last 6 seconds!", n_event_id_too_large);
144 // //n_event_id_too_large = 0;
145 //}
146 //if n_event_id_zero > 0 {
147 // error!("We saw {} events with an event id == 0 in the last 6 seconds!", n_event_id_zero);
148 // //n_event_id_zero = 0;
149 //}
150 thread_ctrl_check_timer = Instant::now();
151 }
152
153 if !get_op_mode.is_empty() {
154 match get_op_mode.try_recv() {
155 Err(err) => trace!("No op mode change detected! Err {err}"),
156 Ok(mode) => {
157 warn!("Will change operation mode to {:?}!", mode);
158 match mode {
159 TofOperationMode::Default => {
160 streamer.request_mode = false;
161 op_mode = mode;
162 },
163 TofOperationMode::RBWaveform => {
164 if !cali_loaded {
165 error!("Requesting waveform analysis without having a calibration loaded!");
166 error!("Can't do waveform analysis without calibration!");
167 error!("Switching mode to Default");
168 op_mode = TofOperationMode::Default;
169 }
170 }
171 _ => (),
172 }
173 }
174 }
175 }
176 if !dtf_fr_runner.is_empty() {
177 match dtf_fr_runner.try_recv() {
178 Err(err) => {
179 error!("Issues receiving datatype/format! {err}");
180 }
181 Ok(dtf) => {
182 data_type = dtf;
183 info!("Will process events for data type {}!", data_type);
184 }
185 }
186 }
187 if bs_recv.is_empty() {
188 //println!("--> Empty bs_rec");
189 // FIXME - benchmark
190 //thread::sleep(one_milli/2);
191 continue 'main;
192 }
193 // this can't be blocking anymore, since
194 // otherwise we miss the datatype
195 let mut bytestream : Vec<u8>;
196 if events_not_sent > 0 {
197 error!("There were {events_not_sent} for this iteration of received bytes!");
198 }
199 if skipped_events > 0 {
200 error!("We skipped {} events!", skipped_events);
201 }
202 // reset skipped events and events not sent,
203 // these are per iteration
204 events_not_sent = 0;
205 skipped_events = 0;
206 match bs_recv.recv() {
207 Err(err) => {
208 error!("Received Garbage! Err {err}");
209 continue 'main;
210 }
211 Ok(_stream) => {
212 info!("Received {} bytes!", _stream.len());
213 bytestream = _stream;
214 //streamer.add(&bytestream, bytestream.len());
215 streamer.consume(&mut bytestream);
216 let mut packets_in_stream : u32 = 0;
217 let mut last_event_id : u32 = 0;
218 //let mut last_event : RBEvent = RBEvent::new();
219 //println!("Streamer::stream size {}", streamer.stream.len());
220 loop {
221 if streamer.is_depleted {
222 info!("Streamer exhausted after sending {} packets!", packets_in_stream);
223 //break 'event_reader;
224 // we immediatly want more data in the streamer
225 continue 'main;
226 }
227 // FIXME - here we have the choice.
228 // streamer.next() will yield the next event,
229 // decoded
230 // streamer.next_tofpacket() instead will only
231 // yield the next event, not deserialzed
232 // but wrapped already in a tofpacket
233 let mut tp_to_send = TofPacket::new();
234 match op_mode {
235 TofOperationMode::RBHighThroughput => {
236 match streamer.next_tofpacket() {
237 None => {
238 streamer.is_depleted = true;
239 continue 'main;
240 },
241 Some(tp) => {
242 tp_to_send = tp;
243 }
244 }
245 },
246 TofOperationMode::Default |
247 TofOperationMode::RBWaveform => {
248 match streamer.next() {
249 None => {
250 streamer.is_depleted = true;
251 continue 'main;
252 },
253 Some(mut event) => {
254 event.header.set_rbpaddleid(&rbpaddleid);
255 if deadtime_instead_temp {
256 // in case we want to add the deadtime to the header,
257 // we have to do that here!
258 event.header.deadtime_instead_temp = deadtime_instead_temp;
259 match get_deadtime() {
260 Err(err) => {
261 error!("Unable to get DRS4 deadtime! {err}");
262 event.header.drs_deadtime = u16::MAX;
263 }
264 Ok(d_time) => {
265 event.header.drs_deadtime = d_time as u16;
266 }
267 }
268 }
269 //println!("Got event id {}", event.header.event_id);
270 if event.header.event_id == 0 {
271 //error!("The current event id is 0!");
272 //error!("Event {}", event);
273 //event.status = EventStatus::RBEventWacky;
274 n_event_id_zero += 1;
275 continue;
276 }
277 if last_event_id != 0 {
278 let delta_evid = event.header.event_id as i64 - last_event_id as i64;
279
280 //if event.header.event_id > 42_000_000u32 {
281 // 90000 -> at 100Hz, that is 15mins deviation
282 if i64::abs(delta_evid) > 90000 {
283 //error!("The event id deviates more than 90000 event ids from the last event id!");
284 //error!("Event {}", event);
285 //error!("Last Event {}", last_event);
286 //error!("The event id is larger than 42000000");
287 //event.status = EventStatus::RBEventWacky;
288 n_event_id_too_large += 1;
289 continue;
290 }
291 }
292 if last_event_id != 0 {
293 if event.header.event_id != last_event_id + 1 {
294 if event.header.event_id > last_event_id {
295 if !trace_suppressed {
296 skipped_events += (event.header.event_id - last_event_id -1) as usize;
297 }
298 } else {
299 error!("Something with the event counter is messed up. Got event id {}, but the last event id was {}", event.header.event_id, last_event_id);
300 }
301 }
302 }
303 // Now as we are through our sanity checks, the event id should be can use it for
304 // reference
305 last_event_id = event.header.event_id;
306 //last_event = event.clone();
307 //println!("This event id {}!", last_event_id);
308 event.data_type = data_type;
309 // skip for now, since we change event status
310 //if verbose {
311 // match stat.lock() {
312 // Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
313 // Ok(mut s) => {
314 // if s.first_evid == 0 {
315 // s.first_evid = event.header.event_id;
316 // }
317 // s.last_evid = event.header.event_id;
318 // if event.status == EventStatus::ChannelIDWrong {
319 // s.n_err_chid_wrong += 1;
320 // }
321 // if event.status == EventStatus::TailWrong {
322 // s.n_err_tail_wrong += 1;
323 // }
324 // }
325 // }
326 //}
327 if event.status != EventStatus::Unknown {
328 if only_perfect_events && event.status != EventStatus::Perfect {
329 info!("Not sending this event, because it's event status is {} and we requested to send only events with EventStatus::Perfect!", event.status);
330 continue;
331 }
332 }
333 if op_mode == TofOperationMode::RBWaveform {
334 //debug!("Using paddle map {:?}", paddle_map);
335 //match waveform_analysis(&mut event, &paddle_map, &cali) {
336 // Err(err) => error!("Waveform analysis failed! {err}"),
337 // Ok(_) => ()
338 //}
339 }
340 n_events += 1;
341 if verbose && n_events % 100 == 0 {
342 println!("[EVTPROC (verbose)] => Sending event {}", event);
343 }
344 tp_to_send = event.pack();
345 },
346 }
347 }, // end op mode ~ waveform/event
348 _ => {
349 error!("Operation mode {} not available yet!", op_mode);
350 }
351 }
352 if verbose {
353 match stat.lock() {
354 Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
355 Ok(mut _st) => {
356 _st.evproc_npack += 1;
357 }
358 }
359 //println!("[EVTPROC (verbose)] => Sending TofPacket {}", tp_to_send);
360 }
361 // set flags
362 match data_type {
363 DataType::VoltageCalibration |
364 DataType::TimingCalibration |
365 DataType::Noi => {
366 tp_to_send.no_write_to_disk = true;
367 },
368 _ => ()
369 }
370 // send the packet
371 match tp_sender.send(tp_to_send) {
372 Ok(_) => {
373 packets_in_stream += 1;
374 },
375 Err(err) => {
376 error!("Problem sending TofPacket over channel! {err}");
377 events_not_sent += 1;
378 }
379 }
380 } // end 'event_reader
381 }, // end OK(recv)
382 }// end match
383 } // end outer loop
384}
385