liftof_rb/threads/
event_processing.rs1use std::fs;
12use std::path::PathBuf;
13use std::sync::{
14 Arc,
15 Mutex,
16};
17
18use std::time::Instant;
19
20use crossbeam_channel::{
21 Sender,
22 Receiver
23};
24
25use tof_dataclasses::events::DataType;
26use tof_dataclasses::packets::{
27 TofPacket,
28};
29use tof_dataclasses::io::RBEventMemoryStreamer;
30use tof_dataclasses::calibrations::RBCalibrations;
31use tof_dataclasses::events::EventStatus;
32use tof_dataclasses::commands::{
33 TofOperationMode,
34};
35
36use tof_dataclasses::events::rb_event::RBPaddleID;
37
38use liftof_lib::{
39 RunStatistics,
40 };
42
43use liftof_lib::thread_control::ThreadControl;
44
45use crate::control::get_deadtime;
46
47pub fn event_processing(board_id : u8,
77 rbpaddleid : RBPaddleID,
78 bs_recv : &Receiver<Vec<u8>>,
79 get_op_mode : &Receiver<TofOperationMode>,
80 tp_sender : &Sender<TofPacket>,
81 dtf_fr_runner : &Receiver<DataType>,
82 verbose : bool,
83 thread_control : Arc<Mutex<ThreadControl>>,
84 stat : Arc<Mutex<RunStatistics>>,
85 only_perfect_events : bool) {
86
87 let mut op_mode = TofOperationMode::Default;
88 let mut thread_ctrl_check_timer = Instant::now();
89
90 let mut cali_loaded = false;
92 let cali : RBCalibrations;
93 let cali_path = format!("/home/gaps/calib/rb_{:0>2}.cali.tof.gaps", board_id);
94 let cali_path_buf = PathBuf::from(&cali_path);
95 if fs::metadata(cali_path_buf.clone()).is_ok() {
96 info!("Found valid calibration file path {cali_path_buf:?}");
97 match RBCalibrations::from_file(cali_path, true) {
98 Err(err) => {
99 error!("Can't load calibration! {err}");
100 },
101 Ok(_c) => {
102 cali = _c;
103 cali_loaded = true;
104 debug!("We loaded calibration {}", cali);
105 }
106 }
107 } else {
108 warn!("Calibration file not available!");
109 cali_loaded = false;
110 }
111
112 let mut events_not_sent : u64 = 0;
114 let mut data_type : DataType = DataType::Unknown;
115 let mut deadtime_instead_temp : bool = false;
117
118 let mut streamer = RBEventMemoryStreamer::new();
119 streamer.check_channel_errors = true;
121 let mut trace_suppressed = false;
122 match thread_control.lock() {
123 Ok(tc) => {
124 streamer.calc_crc32 = tc.liftof_settings.rb_settings.calc_crc32;
125 deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp;
126 trace_suppressed = tc.liftof_settings.mtb_settings.trace_suppression;
127 },
128 Err(err) => {
129 trace!("Can't acquire lock! {err}");
130 },
131 }
132
133 let mut skipped_events : usize = 0;
139 let mut n_events = 0usize;
140
141 'main : loop {
142 if thread_ctrl_check_timer.elapsed().as_secs() >= 6 {
145 match thread_control.lock() {
146 Ok(tc) => {
147 if tc.stop_flag {
148 info!("Received stop signal. Will stop thread!");
149 break;
150 }
151 streamer.calc_crc32 = tc.liftof_settings.rb_settings.calc_crc32;
152 deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp;
153 },
154 Err(err) => {
155 trace!("Can't acquire lock! {err}");
156 },
157 }
158 thread_ctrl_check_timer = Instant::now();
159 }
160
161 if !get_op_mode.is_empty() {
162 match get_op_mode.try_recv() {
163 Err(err) => trace!("No op mode change detected! Err {err}"),
164 Ok(mode) => {
165 warn!("Will change operation mode to {:?}!", mode);
166 match mode {
167 TofOperationMode::Default => {
168 streamer.request_mode = false;
169 op_mode = mode;
170 },
171 TofOperationMode::RBWaveform => {
172 if !cali_loaded {
173 error!("Requesting waveform analysis without having a calibration loaded!");
174 error!("Can't do waveform analysis without calibration!");
175 error!("Switching mode to Default");
176 op_mode = TofOperationMode::Default;
177 }
178 }
179 _ => (),
180 }
181 }
182 }
183 }
184 if !dtf_fr_runner.is_empty() {
185 match dtf_fr_runner.try_recv() {
186 Err(err) => {
187 error!("Issues receiving datatype/format! {err}");
188 }
189 Ok(dtf) => {
190 data_type = dtf;
191 info!("Will process events for data type {}!", data_type);
192 }
193 }
194 }
195 if bs_recv.is_empty() {
196 continue 'main;
200 }
201 let mut bytestream : Vec<u8>;
204 if events_not_sent > 0 {
205 error!("There were {events_not_sent} for this iteration of received bytes!");
206 }
207 if skipped_events > 0 {
208 error!("We skipped {} events!", skipped_events);
209 }
210 events_not_sent = 0;
213 skipped_events = 0;
214 match bs_recv.recv() {
215 Err(err) => {
216 error!("Received Garbage! Err {err}");
217 continue 'main;
218 }
219 Ok(_stream) => {
220 info!("Received {} bytes!", _stream.len());
221 bytestream = _stream;
222 streamer.consume(&mut bytestream);
224 let mut packets_in_stream : u32 = 0;
225 let mut last_event_id : u32 = 0;
226 loop {
228 if streamer.is_depleted {
229 info!("Streamer exhausted after sending {} packets!", packets_in_stream);
230 continue 'main;
233 }
234 let mut tp_to_send = TofPacket::new();
241 match op_mode {
242 TofOperationMode::RBHighThroughput => {
243 match streamer.next_tofpacket() {
244 None => {
245 streamer.is_depleted = true;
246 continue 'main;
247 },
248 Some(tp) => {
249 tp_to_send = tp;
250 }
251 }
252 },
253 TofOperationMode::Default |
254 TofOperationMode::RBWaveform => {
255 match streamer.next() {
256 None => {
257 streamer.is_depleted = true;
258 continue 'main;
259 },
260 Some(mut event) => {
261 event.header.set_rbpaddleid(&rbpaddleid);
262 if deadtime_instead_temp {
263 event.header.deadtime_instead_temp = deadtime_instead_temp;
266 match get_deadtime() {
267 Err(err) => {
268 error!("Unable to get DRS4 deadtime! {err}");
269 event.header.drs_deadtime = u16::MAX;
270 }
271 Ok(d_time) => {
272 event.header.drs_deadtime = d_time as u16;
273 }
274 }
275 }
276 if last_event_id != 0 {
278 if event.header.event_id != last_event_id + 1 {
279 if event.header.event_id > last_event_id {
280 if !trace_suppressed {
281 skipped_events += (event.header.event_id - last_event_id -1) as usize;
282 }
283 } else {
284 error!("Something with the event counter is messed up. Got event id {}, but the last event id was {}", event.header.event_id, last_event_id);
285 }
286 }
287 }
288 last_event_id = event.header.event_id;
289 event.data_type = data_type;
291 if verbose {
292 match stat.lock() {
293 Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
294 Ok(mut s) => {
295 if s.first_evid == 0 {
296 s.first_evid = event.header.event_id;
297 }
298 s.last_evid = event.header.event_id;
299 if event.status == EventStatus::ChannelIDWrong {
300 s.n_err_chid_wrong += 1;
301 }
302 if event.status == EventStatus::TailWrong {
303 s.n_err_tail_wrong += 1;
304 }
305 }
306 }
307 }
308 if event.status != EventStatus::Unknown {
309 if only_perfect_events && event.status != EventStatus::Perfect {
310 info!("Not sending this event, because it's event status is {} and we requested to send only events with EventStatus::Perfect!", event.status);
311 continue;
312 }
313 }
314 if op_mode == TofOperationMode::RBWaveform {
315 }
321 n_events += 1;
322 if verbose && n_events % 100 == 0 {
323 println!("[EVTPROC (verbose)] => Sending event {}", event);
324 }
325 tp_to_send = TofPacket::from(&event);
326 },
327 }
328 }, _ => {
330 error!("Operation mode {} not available yet!", op_mode);
331 }
332 }
333 if verbose {
334 match stat.lock() {
335 Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
336 Ok(mut _st) => {
337 _st.evproc_npack += 1;
338 }
339 }
340 }
342 match data_type {
344 DataType::VoltageCalibration |
345 DataType::TimingCalibration |
346 DataType::Noi => {
347 tp_to_send.no_write_to_disk = true;
348 },
349 _ => ()
350 }
351 match tp_sender.send(tp_to_send) {
353 Ok(_) => {
354 packets_in_stream += 1;
355 },
356 Err(err) => {
357 error!("Problem sending TofPacket over channel! {err}");
358 events_not_sent += 1;
359 }
360 }
361 } }, }} }
366