liftof_rb/threads/event_processing.rs
1//! Event processing deals with the raw memory input
2//! from the buffers, send to it by the runner
3//! when reading out the system memory
4//!
5//! Different modes are available, from sending
6//! TofPackets directly through the RBEventMemoryStreamer
7//! without any further parsing of the events (this has
8//! to be done then on the TOF main computer) to doing
9//! waveform analysis on the RBs directly
10
11use std::fs;
12use std::path::PathBuf;
13use std::sync::{
14 Arc,
15 Mutex,
16};
17
18use std::time::Instant;
19
20use crossbeam_channel::{
21 Sender,
22 Receiver
23};
24
25use gondola_core::prelude::*;
26
27
28//use tof_dataclasses::events::DataType;
29//use tof_dataclasses::packets::{
30// TofPacket,
31//};
32//use tof_dataclasses::io::RBEventMemoryStreamer;
33//use tof_dataclasses::calibrations::RBCalibrations;
34//use tof_dataclasses::events::EventStatus;
35//use tof_dataclasses::commands::{
36// TofOperationMode,
37//};
38//
39//use tof_dataclasses::events::rb_event::RBPaddleID;
40//
41//use liftof_lib::{
42// RunStatistics,
43// //waveform_analysis,
44//};
45//
46//use liftof_lib::thread_control::ThreadControl;
47
48use crate::control::get_deadtime;
49
50/// Transforms raw bytestream to TofPackets
51///
52/// This allows to get the eventid from the
53/// binrary form of the RBEvent
54///
55/// #Arguments
56///
57/// * board_id : The unique ReadoutBoard identifier
58/// (ID) of this RB
59/// * bs_recv : A receiver for bytestreams. The
60/// bytestream comes directly from
61/// the data buffers.
62/// * get_op_mode : The TOF operation mode. Typically,
63/// this is "Default", meaning that the
64/// RBs will sent what is in the memory
65/// buffer translated into RBEvents.
66/// In "RBHighThrougput" mode, it will not
67/// translate them into RBEvents, but just
68/// transmits the content of the buffers, and
69/// RBWaveform mode will do waveform analysis
70/// on the boards
71/// * tp_sender : Send the resulting data product to
72/// get processed further
73/// * data_type : If different from 0, do some processing
74/// on the data read from memory
75/// * verbose : More output to the console for debugging
76/// * only_perfect_events : Only transmit events with EventStatus::Perfect.
77/// This only applies when the op mode is not
78/// RBHighThroughput
79pub fn event_processing(board_id : u8,
80 rbpaddleid : RBPaddleID,
81 bs_recv : &Receiver<Vec<u8>>,
82 get_op_mode : &Receiver<TofOperationMode>,
83 tp_sender : &Sender<TofPacket>,
84 dtf_fr_runner : &Receiver<DataType>,
85 verbose : bool,
86 thread_control : Arc<Mutex<ThreadControl>>,
87 stat : Arc<Mutex<RunStatistics>>,
88 only_perfect_events : bool) {
89
90 let mut op_mode = TofOperationMode::Default;
91 let mut thread_ctrl_check_timer = Instant::now();
92
93 // load calibration just in case?
94 let mut cali_loaded = false;
95 let cali : RBCalibrations;
96 let cali_path = format!("/home/gaps/calib/rb_{:0>2}.cali.tof.gaps", board_id);
97 let cali_path_buf = PathBuf::from(&cali_path);
98 if fs::metadata(cali_path_buf.clone()).is_ok() {
99 info!("Found valid calibration file path {cali_path_buf:?}");
100 match RBCalibrations::from_file(cali_path, true) {
101 Err(err) => {
102 error!("Can't load calibration! {err}");
103 },
104 Ok(_c) => {
105 cali = _c;
106 cali_loaded = true;
107 debug!("We loaded calibration {}", cali);
108 }
109 }
110 } else {
111 warn!("Calibration file not available!");
112 cali_loaded = false;
113 }
114
115 // FIXME - deprecate!
116 let mut events_not_sent : u64 = 0;
117 let mut data_type : DataType = DataType::Unknown;
118 // should we store drs deadtime instead of the FPGA temperature
119 let mut deadtime_instead_temp : bool = false;
120
121 let mut streamer = RBEventMemoryStreamer::new();
122 // FIXME
123 streamer.check_channel_errors = true;
124 let mut trace_suppressed = false;
125 match thread_control.lock() {
126 Ok(tc) => {
127 streamer.calc_crc32 = tc.liftof_settings.rb_settings.calc_crc32;
128 deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp;
129 trace_suppressed = tc.liftof_settings.mtb_settings.trace_suppression;
130 },
131 Err(err) => {
132 trace!("Can't acquire lock! {err}");
133 },
134 }
135
136 // loop variables
137 // our cachesize is 50 events. This means each time we
138 // receive data over bs_recv, we have received 50 more
139 // events. This means we might want to wait for 50 MTE
140 // events?
141 let mut skipped_events : usize = 0;
142 let mut n_events = 0usize;
143
144 'main : loop {
145 // FIXME - this whole loop needs to be faster, and there
146 // is no interactive commanding anymore.
147 if thread_ctrl_check_timer.elapsed().as_secs() >= 6 {
148 match thread_control.lock() {
149 Ok(tc) => {
150 if tc.stop_flag {
151 info!("Received stop signal. Will stop thread!");
152 break;
153 }
154 streamer.calc_crc32 = tc.liftof_settings.rb_settings.calc_crc32;
155 deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp;
156 },
157 Err(err) => {
158 trace!("Can't acquire lock! {err}");
159 },
160 }
161 thread_ctrl_check_timer = Instant::now();
162 }
163
164 if !get_op_mode.is_empty() {
165 match get_op_mode.try_recv() {
166 Err(err) => trace!("No op mode change detected! Err {err}"),
167 Ok(mode) => {
168 warn!("Will change operation mode to {:?}!", mode);
169 match mode {
170 TofOperationMode::Default => {
171 streamer.request_mode = false;
172 op_mode = mode;
173 },
174 TofOperationMode::RBWaveform => {
175 if !cali_loaded {
176 error!("Requesting waveform analysis without having a calibration loaded!");
177 error!("Can't do waveform analysis without calibration!");
178 error!("Switching mode to Default");
179 op_mode = TofOperationMode::Default;
180 }
181 }
182 _ => (),
183 }
184 }
185 }
186 }
187 if !dtf_fr_runner.is_empty() {
188 match dtf_fr_runner.try_recv() {
189 Err(err) => {
190 error!("Issues receiving datatype/format! {err}");
191 }
192 Ok(dtf) => {
193 data_type = dtf;
194 info!("Will process events for data type {}!", data_type);
195 }
196 }
197 }
198 if bs_recv.is_empty() {
199 //println!("--> Empty bs_rec");
200 // FIXME - benchmark
201 //thread::sleep(one_milli/2);
202 continue 'main;
203 }
204 // this can't be blocking anymore, since
205 // otherwise we miss the datatype
206 let mut bytestream : Vec<u8>;
207 if events_not_sent > 0 {
208 error!("There were {events_not_sent} for this iteration of received bytes!");
209 }
210 if skipped_events > 0 {
211 error!("We skipped {} events!", skipped_events);
212 }
213 // reset skipped events and events not sent,
214 // these are per iteration
215 events_not_sent = 0;
216 skipped_events = 0;
217 match bs_recv.recv() {
218 Err(err) => {
219 error!("Received Garbage! Err {err}");
220 continue 'main;
221 }
222 Ok(_stream) => {
223 info!("Received {} bytes!", _stream.len());
224 bytestream = _stream;
225 //streamer.add(&bytestream, bytestream.len());
226 streamer.consume(&mut bytestream);
227 let mut packets_in_stream : u32 = 0;
228 let mut last_event_id : u32 = 0;
229 //println!("Streamer::stream size {}", streamer.stream.len());
230 loop {
231 if streamer.is_depleted {
232 info!("Streamer exhausted after sending {} packets!", packets_in_stream);
233 //break 'event_reader;
234 // we immediatly want more data in the streamer
235 continue 'main;
236 }
237 // FIXME - here we have the choice.
238 // streamer.next() will yield the next event,
239 // decoded
240 // streamer.next_tofpacket() instead will only
241 // yield the next event, not deserialzed
242 // but wrapped already in a tofpacket
243 let mut tp_to_send = TofPacket::new();
244 match op_mode {
245 TofOperationMode::RBHighThroughput => {
246 match streamer.next_tofpacket() {
247 None => {
248 streamer.is_depleted = true;
249 continue 'main;
250 },
251 Some(tp) => {
252 tp_to_send = tp;
253 }
254 }
255 },
256 TofOperationMode::Default |
257 TofOperationMode::RBWaveform => {
258 match streamer.next() {
259 None => {
260 streamer.is_depleted = true;
261 continue 'main;
262 },
263 Some(mut event) => {
264 event.header.set_rbpaddleid(&rbpaddleid);
265 if deadtime_instead_temp {
266 // in case we want to add the deadtime to the header,
267 // we have to do that here!
268 event.header.deadtime_instead_temp = deadtime_instead_temp;
269 match get_deadtime() {
270 Err(err) => {
271 error!("Unable to get DRS4 deadtime! {err}");
272 event.header.drs_deadtime = u16::MAX;
273 }
274 Ok(d_time) => {
275 event.header.drs_deadtime = d_time as u16;
276 }
277 }
278 }
279 //println!("Got event id {}", event.header.event_id);
280 if last_event_id != 0 {
281 if event.header.event_id != last_event_id + 1 {
282 if event.header.event_id > last_event_id {
283 if !trace_suppressed {
284 skipped_events += (event.header.event_id - last_event_id -1) as usize;
285 }
286 } else {
287 error!("Something with the event counter is messed up. Got event id {}, but the last event id was {}", event.header.event_id, last_event_id);
288 }
289 }
290 }
291 last_event_id = event.header.event_id;
292 //println!("This event id {}!", last_event_id);
293 event.data_type = data_type;
294 if verbose {
295 match stat.lock() {
296 Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
297 Ok(mut s) => {
298 if s.first_evid == 0 {
299 s.first_evid = event.header.event_id;
300 }
301 s.last_evid = event.header.event_id;
302 if event.status == EventStatus::ChannelIDWrong {
303 s.n_err_chid_wrong += 1;
304 }
305 if event.status == EventStatus::TailWrong {
306 s.n_err_tail_wrong += 1;
307 }
308 }
309 }
310 }
311 if event.status != EventStatus::Unknown {
312 if only_perfect_events && event.status != EventStatus::Perfect {
313 info!("Not sending this event, because it's event status is {} and we requested to send only events with EventStatus::Perfect!", event.status);
314 continue;
315 }
316 }
317 if op_mode == TofOperationMode::RBWaveform {
318 //debug!("Using paddle map {:?}", paddle_map);
319 //match waveform_analysis(&mut event, &paddle_map, &cali) {
320 // Err(err) => error!("Waveform analysis failed! {err}"),
321 // Ok(_) => ()
322 //}
323 }
324 n_events += 1;
325 if verbose && n_events % 100 == 0 {
326 println!("[EVTPROC (verbose)] => Sending event {}", event);
327 }
328 tp_to_send = event.pack();
329 },
330 }
331 }, // end op mode ~ waveform/event
332 _ => {
333 error!("Operation mode {} not available yet!", op_mode);
334 }
335 }
336 if verbose {
337 match stat.lock() {
338 Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
339 Ok(mut _st) => {
340 _st.evproc_npack += 1;
341 }
342 }
343 //println!("[EVTPROC (verbose)] => Sending TofPacket {}", tp_to_send);
344 }
345 // set flags
346 match data_type {
347 DataType::VoltageCalibration |
348 DataType::TimingCalibration |
349 DataType::Noi => {
350 tp_to_send.no_write_to_disk = true;
351 },
352 _ => ()
353 }
354 // send the packet
355 match tp_sender.send(tp_to_send) {
356 Ok(_) => {
357 packets_in_stream += 1;
358 },
359 Err(err) => {
360 error!("Problem sending TofPacket over channel! {err}");
361 events_not_sent += 1;
362 }
363 }
364 } // end 'event_reader
365 }, // end OK(recv)
366 }// end match
367 } // end outer loop
368}
369