liftof_cc/threads/
readoutboard_comm.rs1use std::collections::HashMap;
5
6use std::sync::{
7 Arc,
8 Mutex,
9};
10
11use std::time::{
12 Instant,
13 };
15
16use crossbeam_channel::Sender;
17
18use tof_dataclasses::database::ReadoutBoard;
19use tof_dataclasses::events::RBEvent;
20use tof_dataclasses::packets::{
21 TofPacket,
22 PacketType,
23};
24
25use tof_dataclasses::serialization::{
26 Serialization,
27 Packable
28};
29use tof_dataclasses::calibrations::{
30 RBCalibrations,
31 RBCalibrationsFlightT,
32 RBCalibrationsFlightV,
33 };
34
35use liftof_lib::{
36 waveform_analysis,
37};
38
39use liftof_lib::thread_control::ThreadControl;
40use liftof_lib::settings::AnalysisEngineSettings;
41
42pub fn readoutboard_communicator(ev_to_builder : Sender<RBEvent>,
64 tp_to_sink : Sender<TofPacket>,
65 mut rb : ReadoutBoard,
66 thread_control : Arc<Mutex<ThreadControl>>) {
67
68 let mut this_status = HashMap::<u16, bool>::new();
69 for k in 1..321 {
70 this_status.insert(k,false);
71 }
72 match rb.load_latest_calibration() {
73 Err(err) => error!("Unable to load calibration for RB {}! {}", rb.rb_id, err),
74 Ok(_) => {
75 info!("Loaded calibration for board {} successfully!", rb.rb_id);
76 }
77 }
78
79 let zmq_ctx = zmq::Context::new();
80 let board_id = rb.rb_id; info!("initializing RB thread for board {}!", board_id);
82 let mut n_errors = 0usize;
83 let mut n_chunk = 0usize;
85 let address = rb.guess_address();
87
88 let socket = zmq_ctx.socket(zmq::SUB).expect("Unable to create socket!");
91 match socket.connect(&address) {
92 Err(err) => error!("Can not connect to socket {}, {}", address, err),
93 Ok(_) => info!("Connected to {address}")
94 }
95 let topic = format!("RB{:02}", board_id);
98 match socket.set_subscribe(&topic.as_bytes()) {
99 Err(err) => error!("Unable to subscribe to topic! {err}"),
100 Ok(_) => info!("Subscribed to {:?}!", topic),
101 }
102 let mut tc_timer = Instant::now();
103 let mut verification_active = false;
104
105 let ae_settings : AnalysisEngineSettings;
106 let run_analysis_engine : bool;
107 match thread_control.lock() {
108 Ok(tc) => {
109 ae_settings = tc.liftof_settings.analysis_engine_settings.clone();
110 run_analysis_engine = tc.liftof_settings.run_analysis_engine;
111 }
112 Err(err) => {
113 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
114 error!("Ending thread, unable to acquire settings!");
115 return;
116 }
117 }
118 if run_analysis_engine {
119 info!("Will run analysis engine!");
120 } else {
122 warn!("Will not run analysis engine!");
123 }
124
125 loop {
128 if tc_timer.elapsed().as_secs_f32() > 2.1 {
129 match thread_control.try_lock() {
130 Ok(mut tc) => {
131 if tc.end_all_rb_threads {
133 tc.thread_rbcomm_active.insert(rb.rb_id,false);
135 let mut all_done = true;
137 for (_,value) in &tc.thread_rbcomm_active {
138 if *value {
139 all_done = false;
140 }
141 }
142 if all_done {
143 tc.thread_event_bldr_active = false;
144 }
145 break;
146 }
147 verification_active = tc.verification_active;
148 if verification_active {
149 tc.detector_status.update_from_map(this_status.clone());
150 }
151 },
152 Err(err) => {
153 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
154 },
155 }
156 tc_timer = Instant::now();
157 }
158 match socket.recv_bytes(0) {
161 Err(err) => {
162 n_errors += 1;
163 error!("Receiving from socket raised error {}", err);
164 }
165 Ok(buffer) => {
166 match TofPacket::from_bytestream(&buffer, &mut 4) {
169 Err(err) => {
170 error!("Unknown bytestream...{:?}", err);
171 continue;
172 },
173 Ok(tp) => {
174 match tp.packet_type {
176 PacketType::RBEvent | PacketType::RBEventMemoryView => {
177 let mut event = RBEvent::from(&tp);
178 if event.hits.len() == 0
181 && !event.header.drs_lost_trigger()
182 && run_analysis_engine {
183 match waveform_analysis(&mut event,
184 &rb,
185 ae_settings) {
186 Ok(_) => (),
187 Err(err) => {
188 warn!("Unable to analyze waveforms for this event! {err}");
189 }
190 }
191 }
192 if verification_active {
193 for h in &event.hits {
194 let verification_charge_threshhold = 10.0f32;
196 if h.get_charge_a() >= verification_charge_threshhold {
197 let status_key = h.paddle_id as u16;
198 match this_status.insert(status_key, true) {
199 Some(_) => (),
200 None => error!("Unknown paddle id! {}", h.paddle_id)
201 }
202 }
203 if h.get_charge_b() >= verification_charge_threshhold {
204 let status_key = (h.paddle_id as u16) + 160;
205 match this_status.insert(status_key, true) {
206 Some(_) => (),
207 None => error!("Unknown paddle id! {}", h.paddle_id)
208 }
209 }
210 }
211 }
212 if !verification_active {
213 match ev_to_builder.send(event) {
214 Ok(_) => (),
215 Err(err) => {
216 error!("Unable to send event! Err {err}");
217 }
218 }
219 }
220 n_chunk += 1;
222 }
223 PacketType::RBCalibration => {
224 let mut flight_v = RBCalibrationsFlightV::new();
225 let mut flight_t = RBCalibrationsFlightT::new();
226
227 match tp.unpack::<RBCalibrations>() {
228 Ok(cali) => {
229 flight_v = cali.emit_flightvcal();
230 flight_t = cali.emit_flighttcal();
231 match thread_control.lock() {
233 Ok(mut tc) => {
234 tc.calibrations.insert(board_id, cali.clone());
235 *tc.finished_calibrations.get_mut(&board_id).unwrap() = true;
236 rb.calibration = cali;
237 }
238 Err(err) => {
239 error!("Can't acquire lock for ThreadControl!! {err}");
240 },
241 }
242 }
243 Err(err) => {
244 error!("Received calibration package, but got error when unpacking! {err}");
245 }
246 }
247 let flight_v_tp = flight_v.pack();
248 let flight_t_tp = flight_t.pack();
249 match tp_to_sink.send(flight_v_tp) {
250 Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
251 Ok(_) => debug!("Packet sent"),
252 }
253 match tp_to_sink.send(flight_t_tp) {
254 Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
255 Ok(_) => debug!("Packet sent"),
256 }
257 match tp_to_sink.send(tp) {
258 Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
259 Ok(_) => debug!("Packet sent"),
260 }
261 }
262 _ => {
263 match tp_to_sink.send(tp) {
266 Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
267 Ok(_) => debug!("Packet sent"),
268 }
269 }
270 } } } } } debug!("Digested {n_chunk} chunks!");
276 debug!("Noticed {n_errors} errors!");
277 } println!("= => [rbcomm] thread for RB {} finished! (not recoverable)", board_id);
279}