liftof_cc/threads/
readoutboard_comm.rs1use std::collections::HashMap;
5
6use std::sync::{
7 Arc,
8 Mutex,
9};
10
11use std::time::{
12 Instant,
13 };
15
16use crossbeam_channel::Sender;
17
18use gondola_core::prelude::*;
19
20pub fn readoutboard_communicator(ev_to_builder : Sender<RBEvent>,
42 tp_to_sink : Sender<TofPacket>,
43 mut rb : ReadoutBoard,
44 thread_control : Arc<Mutex<ThreadControl>>) {
45
46 let mut this_status = HashMap::<u16, bool>::new();
47 for k in 1..321 {
48 this_status.insert(k,false);
49 }
50 match rb.load_latest_calibration() {
51 Err(err) => error!("Unable to load calibration for RB {}! {}", rb.rb_id, err),
52 Ok(_) => {
53 info!("Loaded calibration for board {} successfully!", rb.rb_id);
54 }
55 }
56
57 let zmq_ctx = zmq::Context::new();
58 let board_id = rb.rb_id; info!("initializing RB thread for board {}!", board_id);
60 let mut n_errors = 0usize;
61 let mut n_chunk = 0usize;
63 let address = rb.guess_address();
65
66 let socket = zmq_ctx.socket(zmq::SUB).expect("Unable to create socket!");
69 match socket.connect(&address) {
70 Err(err) => error!("Can not connect to socket {}, {}", address, err),
71 Ok(_) => info!("Connected to {address}")
72 }
73 let topic = format!("RB{:02}", board_id);
76 match socket.set_subscribe(&topic.as_bytes()) {
77 Err(err) => error!("Unable to subscribe to topic! {err}"),
78 Ok(_) => info!("Subscribed to {:?}!", topic),
79 }
80 let mut tc_timer = Instant::now();
81 let mut verification_active = false;
82
83 let ae_settings : AnalysisEngineSettings;
84 let run_analysis_engine : bool;
85 match thread_control.lock() {
86 Ok(tc) => {
87 ae_settings = tc.liftof_settings.analysis_engine_settings.clone();
88 run_analysis_engine = tc.liftof_settings.run_analysis_engine;
89 }
90 Err(err) => {
91 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
92 error!("Ending thread, unable to acquire settings!");
93 return;
94 }
95 }
96 if run_analysis_engine {
97 info!("Will run analysis engine!");
98 } else {
100 warn!("Will not run analysis engine!");
101 }
102
103 loop {
106 if tc_timer.elapsed().as_secs_f32() > 2.1 {
108 match thread_control.try_lock() {
109 Ok(mut tc) => {
110 debug!(" lock on thread control acquired!");
111 if tc.end_all_rb_threads {
113 tc.thread_rbcomm_active.insert(rb.rb_id,false);
115 let mut all_done = true;
117 for (_,value) in &tc.thread_rbcomm_active {
118 if *value {
119 all_done = false;
120 }
121 }
122 if all_done {
123 tc.thread_event_bldr_active = false;
124 }
125 break;
126 }
127 verification_active = tc.verification_active;
128 if verification_active {
129 debug!("Found verification flag active!");
130 tc.detector_status.update_from_map(this_status.clone());
131 }
132 },
133 Err(err) => {
134 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
135 },
136 }
137 tc_timer = Instant::now();
138 }
139 match socket.recv_bytes(0) {
142 Err(err) => {
143 n_errors += 1;
144 error!("Receiving from socket raised error {}", err);
145 }
146 Ok(buffer) => {
147 match TofPacket::from_bytestream(&buffer, &mut 4) {
150 Err(err) => {
151 error!("Unknown bytestream...{:?}", err);
152 continue;
153 },
154 Ok(tp) => {
155 match tp.packet_type {
157 TofPacketType::RBEvent | TofPacketType::RBEventMemoryView => {
158 let mut event : RBEvent;
160 match tp.unpack::<RBEvent>() {
161 Ok(rbev_) => {
162 event = rbev_;
163 }
164 Err(err) => {
165 error!("Can't unpack RBEvent! {err}");
166 continue;
167 }
168 }
169 if event.hits.len() == 0
172 && !event.header.drs_lost_trigger()
173 && run_analysis_engine {
174 match waveform_analysis(&mut event,
175 &rb,
176 ae_settings) {
177 Ok(_) => (),
178 Err(err) => {
179 warn!("Unable to analyze waveforms for this event! {err}");
180 }
181 }
182 }
183 if verification_active {
184 debug!("Found active verification run, will update hit map!");
185 debug!("{}", event);
186 for h in &event.hits {
187 let verification_charge_threshhold = 10.0f32;
189 if h.get_charge_a() >= verification_charge_threshhold {
190 let status_key = h.paddle_id as u16;
191 match this_status.insert(status_key, true) {
192 Some(_) => (),
193 None => error!("Unknown paddle id! {}", h.paddle_id)
194 }
195 }
196 if h.get_charge_b() >= verification_charge_threshhold {
197 let status_key = (h.paddle_id as u16) + 160;
198 match this_status.insert(status_key, true) {
199 Some(_) => (),
200 None => error!("Unknown paddle id! {}", h.paddle_id)
201 }
202 }
203 }
204 } else {
205 match ev_to_builder.send(event) {
206 Ok(_) => (),
207 Err(err) => {
208 error!("Unable to send event! Err {err}");
209 }
210 }
211 }
212 n_chunk += 1;
214 }
215 TofPacketType::RBCalibration => {
216 let mut flight_v = RBCalibrationFlightV::new();
217 let mut flight_t = RBCalibrationFlightT::new();
218
219 match tp.unpack::<RBCalibrations>() {
220 Ok(cali) => {
221 flight_v = cali.emit_flightvcal();
222 flight_t = cali.emit_flighttcal();
223 match thread_control.lock() {
225 Ok(mut tc) => {
226 tc.calibrations.insert(board_id, cali.clone());
227 *tc.finished_calibrations.get_mut(&board_id).unwrap() = true;
228 rb.calibration = cali;
229 }
230 Err(err) => {
231 error!("Can't acquire lock for ThreadControl!! {err}");
232 },
233 }
234 }
235 Err(err) => {
236 error!("Received calibration package, but got error when unpacking! {err}");
237 }
238 }
239 let flight_v_tp = flight_v.pack();
240 let flight_t_tp = flight_t.pack();
241 match tp_to_sink.send(flight_v_tp) {
242 Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
243 Ok(_) => debug!("Packet sent"),
244 }
245 match tp_to_sink.send(flight_t_tp) {
246 Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
247 Ok(_) => debug!("Packet sent"),
248 }
249 match tp_to_sink.send(tp) {
250 Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
251 Ok(_) => debug!("Packet sent"),
252 }
253 }
254 _ => {
255 match tp_to_sink.send(tp) {
258 Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
259 Ok(_) => debug!("Packet sent"),
260 }
261 }
262 } } } } } trace!("Digested {n_chunk} chunks!");
268 debug!("Noticed {n_errors} errors!");
269 } println!("= => [rbcomm] thread for RB {} finished! (not recoverable)", board_id);
271}