liftof_cc/threads/
readoutboard_comm.rs1use std::collections::HashMap;
10
11use std::sync::{
12 Arc,
13 Mutex,
14};
15
16use std::time::{
17 Instant,
18 };
20
21use crossbeam_channel::Sender;
22
23use gondola_core::prelude::*;
24
25pub fn readoutboard_communicator(ev_to_builder : Sender<RBEvent>,
47 tp_to_sink : Sender<TofPacket>,
48 mut rb : ReadoutBoard,
49 thread_control : Arc<Mutex<ThreadControl>>) {
50
51 let mut this_status = HashMap::<u16, bool>::new();
52 for k in 1..321 {
53 this_status.insert(k,false);
54 }
55 match rb.load_latest_calibration() {
56 Err(err) => error!("Unable to load calibration for RB {}! {}", rb.rb_id, err),
57 Ok(_) => {
58 info!("Loaded calibration for board {} successfully!", rb.rb_id);
59 }
60 }
61
62 let zmq_ctx = zmq::Context::new();
63 let board_id = rb.rb_id; info!("initializing RB thread for board {}!", board_id);
65 let mut n_errors = 0usize;
66 let mut n_chunk = 0usize;
68 let address = rb.guess_address();
70
71 let socket = zmq_ctx.socket(zmq::SUB).expect("Unable to create socket!");
74 match socket.connect(&address) {
75 Err(err) => error!("Can not connect to socket {}, {}", address, err),
76 Ok(_) => info!("Connected to {address}")
77 }
78 let topic = format!("RB{:02}", board_id);
81 match socket.set_subscribe(&topic.as_bytes()) {
82 Err(err) => error!("Unable to subscribe to topic! {err}"),
83 Ok(_) => info!("Subscribed to {:?}!", topic),
84 }
85 let mut tc_timer = Instant::now();
86 let verification_active : bool;
87
88 let ae_settings : AnalysisEngineSettings;
89 let mut run_analysis_engine : bool;
90 match thread_control.lock() {
91 Ok(tc) => {
92 ae_settings = tc.liftof_settings.analysis_engine_settings.clone();
93 run_analysis_engine = tc.liftof_settings.run_analysis_engine;
94 verification_active = tc.liftof_settings.verification_run.unwrap_or(false);
95 }
96 Err(err) => {
97 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
98 error!("Ending thread, unable to acquire settings!");
99 return;
100 }
101 }
102 if verification_active {
103 run_analysis_engine = true;
105 println!("=> Running verfication on board {board_id}!");
106 } else {
107 debug!("=> Not running verification!");
108 }
109 if run_analysis_engine {
110 info!("Will run analysis engine!");
111 } else {
113 warn!("Will not run analysis engine!");
114 }
115
116 loop {
119 if tc_timer.elapsed().as_secs_f32() > 2.1 {
121 match thread_control.try_lock() {
122 Ok(mut tc) => {
123 debug!(" lock on thread control acquired!");
124 if tc.end_all_rb_threads {
126 tc.thread_rbcomm_active.insert(rb.rb_id,false);
128 let mut all_done = true;
130 for (_,value) in &tc.thread_rbcomm_active {
131 if *value {
132 all_done = false;
133 }
134 }
135 if all_done {
136 tc.thread_event_bldr_active = false;
137 }
138 break;
139 }
140 if verification_active {
142 tc.detector_status.update_from_map(this_status.clone());
144 }
145 },
146 Err(err) => {
147 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
148 },
149 }
150 tc_timer = Instant::now();
151 }
152 match socket.recv_bytes(0) {
155 Err(err) => {
156 n_errors += 1;
157 error!("Receiving from socket raised error {}", err);
158 }
159 Ok(buffer) => {
160 match TofPacket::from_bytestream(&buffer, &mut 4) {
163 Err(err) => {
164 error!("Unknown bytestream...{:?}", err);
165 continue;
166 },
167 Ok(tp) => {
168 match tp.packet_type {
170 TofPacketType::RBEvent | TofPacketType::RBEventMemoryView => {
171 let mut event : RBEvent;
173 match tp.unpack::<RBEvent>() {
174 Ok(rbev_) => {
175 event = rbev_;
176 event.creation_time = Some(Instant::now());
177 }
178 Err(err) => {
179 error!("Can't unpack RBEvent! {err}");
180 continue;
181 }
182 }
183 if event.hits.len() == 0
186 && !event.header.drs_lost_trigger()
187 && run_analysis_engine {
188 match waveform_analysis(&mut event,
189 &rb,
190 ae_settings) {
191 Ok(_) => (),
192 Err(err) => {
193 debug!("Unable to analyze waveforms for this event on RB {}! {err}", &rb.rb_id);
194 }
195 }
196 }
197 if verification_active {
198 for h in &event.hits {
201 let verification_charge_threshhold = 10.0f32;
203 if h.get_charge_a() >= verification_charge_threshhold {
204 let status_key = h.paddle_id as u16;
205 match this_status.insert(status_key, true) {
206 Some(_) => (),
207 None => error!("Unknown paddle id! {}", h.paddle_id)
208 }
209 }
210 if h.get_charge_b() >= verification_charge_threshhold {
211 let status_key = (h.paddle_id as u16) + 160;
212 match this_status.insert(status_key, true) {
213 Some(_) => (),
214 None => error!("Unknown paddle id! {}", h.paddle_id)
215 }
216 }
217 }
218 } else {
219 match ev_to_builder.send(event) {
221 Ok(_) => (),
222 Err(err) => {
223 error!("Unable to send event! Err {err}");
224 }
225 }
226 }
227 n_chunk += 1;
229 }
230 TofPacketType::RBCalibration => {
231 let mut flight_v = RBCalibrationFlightV::new();
232 let mut flight_t = RBCalibrationFlightT::new();
233
234 match tp.unpack::<RBCalibrations>() {
235 Ok(cali) => {
236 flight_v = cali.emit_flightvcal();
237 flight_t = cali.emit_flighttcal();
238 match thread_control.lock() {
240 Ok(mut tc) => {
241 tc.calibrations.insert(board_id, cali.clone());
242 *tc.finished_calibrations.get_mut(&board_id).unwrap() = true;
243 rb.calibration = cali;
244 }
245 Err(err) => {
246 error!("Can't acquire lock for ThreadControl!! {err}");
247 },
248 }
249 }
250 Err(err) => {
251 error!("Received calibration package, but got error when unpacking! {err}");
252 }
253 }
254 let flight_v_tp = flight_v.pack();
255 let flight_t_tp = flight_t.pack();
256 match tp_to_sink.send(flight_v_tp) {
257 Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
258 Ok(_) => debug!("Packet sent"),
259 }
260 match tp_to_sink.send(flight_t_tp) {
261 Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
262 Ok(_) => debug!("Packet sent"),
263 }
264 match tp_to_sink.send(tp) {
265 Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
266 Ok(_) => debug!("Packet sent"),
267 }
268 }
269 _ => {
270 match tp_to_sink.send(tp) {
273 Err(err) => error!("Can not send tof packet to data sink! Err {err}"),
274 Ok(_) => debug!("Packet sent"),
275 }
276 }
277 } } } } } trace!("Digested {n_chunk} chunks!");
283 debug!("Noticed {n_errors} errors!");
284 } println!("= => [rbcomm] thread for RB {} finished! (not recoverable)", board_id);
286}