liftof_rb/threads/event_processing.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
//! Event processing deals with the raw memory input
//! from the buffers, send to it by the runner
//! when reading out the system memory
//!
//! Different modes are available, from sending
//! TofPackets directly through the RBEventMemoryStreamer
//! without any further parsing of the events (this has
//! to be done then on the TOF main computer) to doing
//! waveform analysis on the RBs directly
use std::fs;
use std::path::PathBuf;
use std::sync::{
Arc,
Mutex,
};
use std::time::Instant;
use crossbeam_channel::{
Sender,
Receiver
};
use tof_dataclasses::events::DataType;
use tof_dataclasses::packets::{
TofPacket,
};
use tof_dataclasses::io::RBEventMemoryStreamer;
use tof_dataclasses::calibrations::RBCalibrations;
use tof_dataclasses::events::EventStatus;
use tof_dataclasses::commands::{
TofOperationMode,
};
use tof_dataclasses::events::rb_event::RBPaddleID;
use liftof_lib::{
RunStatistics,
//waveform_analysis,
};
use liftof_lib::thread_control::ThreadControl;
use crate::control::get_deadtime;
/// Transforms raw bytestream to TofPackets
///
/// This allows to get the eventid from the
/// binrary form of the RBEvent
///
/// #Arguments
///
/// * board_id : The unique ReadoutBoard identifier
/// (ID) of this RB
/// * bs_recv : A receiver for bytestreams. The
/// bytestream comes directly from
/// the data buffers.
/// * get_op_mode : The TOF operation mode. Typically,
/// this is "Default", meaning that the
/// RBs will sent what is in the memory
/// buffer translated into RBEvents.
/// In "RBHighThrougput" mode, it will not
/// translate them into RBEvents, but just
/// transmits the content of the buffers, and
/// RBWaveform mode will do waveform analysis
/// on the boards
/// * tp_sender : Send the resulting data product to
/// get processed further
/// * data_type : If different from 0, do some processing
/// on the data read from memory
/// * verbose : More output to the console for debugging
/// * only_perfect_events : Only transmit events with EventStatus::Perfect.
/// This only applies when the op mode is not
/// RBHighThroughput
pub fn event_processing(board_id : u8,
rbpaddleid : RBPaddleID,
bs_recv : &Receiver<Vec<u8>>,
get_op_mode : &Receiver<TofOperationMode>,
tp_sender : &Sender<TofPacket>,
dtf_fr_runner : &Receiver<DataType>,
verbose : bool,
thread_control : Arc<Mutex<ThreadControl>>,
stat : Arc<Mutex<RunStatistics>>,
only_perfect_events : bool) {
let mut op_mode = TofOperationMode::Default;
let mut thread_ctrl_check_timer = Instant::now();
// load calibration just in case?
let mut cali_loaded = false;
let cali : RBCalibrations;
let cali_path = format!("/home/gaps/calib/rb_{:0>2}.cali.tof.gaps", board_id);
let cali_path_buf = PathBuf::from(&cali_path);
if fs::metadata(cali_path_buf.clone()).is_ok() {
info!("Found valid calibration file path {cali_path_buf:?}");
match RBCalibrations::from_file(cali_path, true) {
Err(err) => {
error!("Can't load calibration! {err}");
},
Ok(_c) => {
cali = _c;
cali_loaded = true;
debug!("We loaded calibration {}", cali);
}
}
} else {
warn!("Calibration file not available!");
cali_loaded = false;
}
// FIXME - deprecate!
let mut events_not_sent : u64 = 0;
let mut data_type : DataType = DataType::Unknown;
// should we store drs deadtime instead of the FPGA temperature
let mut deadtime_instead_temp : bool = false;
let mut streamer = RBEventMemoryStreamer::new();
// FIXME
streamer.check_channel_errors = true;
let mut trace_suppressed = false;
match thread_control.lock() {
Ok(tc) => {
streamer.calc_crc32 = tc.liftof_settings.rb_settings.calc_crc32;
deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp;
trace_suppressed = tc.liftof_settings.mtb_settings.trace_suppression;
},
Err(err) => {
trace!("Can't acquire lock! {err}");
},
}
// loop variables
// our cachesize is 50 events. This means each time we
// receive data over bs_recv, we have received 50 more
// events. This means we might want to wait for 50 MTE
// events?
let mut skipped_events : usize = 0;
let mut n_events = 0usize;
'main : loop {
// FIXME - this whole loop needs to be faster, and there
// is no interactive commanding anymore.
if thread_ctrl_check_timer.elapsed().as_secs() >= 6 {
match thread_control.lock() {
Ok(tc) => {
if tc.stop_flag {
info!("Received stop signal. Will stop thread!");
break;
}
streamer.calc_crc32 = tc.liftof_settings.rb_settings.calc_crc32;
deadtime_instead_temp = tc.liftof_settings.rb_settings.drs_deadtime_instead_fpga_temp;
},
Err(err) => {
trace!("Can't acquire lock! {err}");
},
}
thread_ctrl_check_timer = Instant::now();
}
if !get_op_mode.is_empty() {
match get_op_mode.try_recv() {
Err(err) => trace!("No op mode change detected! Err {err}"),
Ok(mode) => {
warn!("Will change operation mode to {:?}!", mode);
match mode {
TofOperationMode::Default => {
streamer.request_mode = false;
op_mode = mode;
},
TofOperationMode::RBWaveform => {
if !cali_loaded {
error!("Requesting waveform analysis without having a calibration loaded!");
error!("Can't do waveform analysis without calibration!");
error!("Switching mode to Default");
op_mode = TofOperationMode::Default;
}
}
_ => (),
}
}
}
}
if !dtf_fr_runner.is_empty() {
match dtf_fr_runner.try_recv() {
Err(err) => {
error!("Issues receiving datatype/format! {err}");
}
Ok(dtf) => {
data_type = dtf;
info!("Will process events for data type {}!", data_type);
}
}
}
if bs_recv.is_empty() {
//println!("--> Empty bs_rec");
// FIXME - benchmark
//thread::sleep(one_milli/2);
continue 'main;
}
// this can't be blocking anymore, since
// otherwise we miss the datatype
let mut bytestream : Vec<u8>;
if events_not_sent > 0 {
error!("There were {events_not_sent} for this iteration of received bytes!");
}
if skipped_events > 0 {
error!("We skipped {} events!", skipped_events);
}
// reset skipped events and events not sent,
// these are per iteration
events_not_sent = 0;
skipped_events = 0;
match bs_recv.recv() {
Err(err) => {
error!("Received Garbage! Err {err}");
continue 'main;
}
Ok(_stream) => {
info!("Received {} bytes!", _stream.len());
bytestream = _stream;
//streamer.add(&bytestream, bytestream.len());
streamer.consume(&mut bytestream);
let mut packets_in_stream : u32 = 0;
let mut last_event_id : u32 = 0;
//println!("Streamer::stream size {}", streamer.stream.len());
loop {
if streamer.is_depleted {
info!("Streamer exhausted after sending {} packets!", packets_in_stream);
//break 'event_reader;
// we immediatly want more data in the streamer
continue 'main;
}
// FIXME - here we have the choice.
// streamer.next() will yield the next event,
// decoded
// streamer.next_tofpacket() instead will only
// yield the next event, not deserialzed
// but wrapped already in a tofpacket
let mut tp_to_send = TofPacket::new();
match op_mode {
TofOperationMode::RBHighThroughput => {
match streamer.next_tofpacket() {
None => {
streamer.is_depleted = true;
continue 'main;
},
Some(tp) => {
tp_to_send = tp;
}
}
},
TofOperationMode::Default |
TofOperationMode::RBWaveform => {
match streamer.next() {
None => {
streamer.is_depleted = true;
continue 'main;
},
Some(mut event) => {
event.header.set_rbpaddleid(&rbpaddleid);
if deadtime_instead_temp {
// in case we want to add the deadtime to the header,
// we have to do that here!
event.header.deadtime_instead_temp = deadtime_instead_temp;
match get_deadtime() {
Err(err) => {
error!("Unable to get DRS4 deadtime! {err}");
event.header.drs_deadtime = u16::MAX;
}
Ok(d_time) => {
event.header.drs_deadtime = d_time as u16;
}
}
}
//println!("Got event id {}", event.header.event_id);
if last_event_id != 0 {
if event.header.event_id != last_event_id + 1 {
if event.header.event_id > last_event_id {
if !trace_suppressed {
skipped_events += (event.header.event_id - last_event_id -1) as usize;
}
} else {
error!("Something with the event counter is messed up. Got event id {}, but the last event id was {}", event.header.event_id, last_event_id);
}
}
}
last_event_id = event.header.event_id;
//println!("This event id {}!", last_event_id);
event.data_type = data_type;
if verbose {
match stat.lock() {
Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
Ok(mut s) => {
if s.first_evid == 0 {
s.first_evid = event.header.event_id;
}
s.last_evid = event.header.event_id;
if event.status == EventStatus::ChannelIDWrong {
s.n_err_chid_wrong += 1;
}
if event.status == EventStatus::TailWrong {
s.n_err_tail_wrong += 1;
}
}
}
}
if event.status != EventStatus::Unknown {
if only_perfect_events && event.status != EventStatus::Perfect {
info!("Not sending this event, because it's event status is {} and we requested to send only events with EventStatus::Perfect!", event.status);
continue;
}
}
if op_mode == TofOperationMode::RBWaveform {
//debug!("Using paddle map {:?}", paddle_map);
//match waveform_analysis(&mut event, &paddle_map, &cali) {
// Err(err) => error!("Waveform analysis failed! {err}"),
// Ok(_) => ()
//}
}
n_events += 1;
if verbose && n_events % 100 == 0 {
println!("[EVTPROC (verbose)] => Sending event {}", event);
}
tp_to_send = TofPacket::from(&event);
},
}
}, // end op mode ~ waveform/event
_ => {
error!("Operation mode {} not available yet!", op_mode);
}
}
if verbose {
match stat.lock() {
Err(err) => error!("Unable to acquire lock on shared memory for RunStatisitcis! {err}"),
Ok(mut _st) => {
_st.evproc_npack += 1;
}
}
//println!("[EVTPROC (verbose)] => Sending TofPacket {}", tp_to_send);
}
// set flags
match data_type {
DataType::VoltageCalibration |
DataType::TimingCalibration |
DataType::Noi => {
tp_to_send.no_write_to_disk = true;
},
_ => ()
}
// send the packet
match tp_sender.send(tp_to_send) {
Ok(_) => {
packets_in_stream += 1;
},
Err(err) => {
error!("Problem sending TofPacket over channel! {err}");
events_not_sent += 1;
}
}
} // end 'event_reader
}, // end OK(recv)
}// end match
} // end outer loop
}