liftof_cc/threads/
global_data_sink.rs1use std::time::{
11 Instant,
12 Duration,
13};
14use std::thread::sleep;
15use std::sync::{
16 Arc,
17 Mutex,
18};
19
20use crossbeam_channel::Receiver;
21
22use tof_dataclasses::packets::{
23 TofPacket,
24 PacketType
25};
26
27use tof_dataclasses::serialization::{
28 Serialization,
29 Packable,
30};
31
32use tof_dataclasses::io::{
33 TofPacketWriter,
34 FileType,
35};
36
37use tof_dataclasses::heartbeats::HeartBeatDataSink;
38use liftof_lib::thread_control::ThreadControl;
39
40pub fn global_data_sink(incoming : &Receiver<TofPacket>,
54 thread_control : Arc<Mutex<ThreadControl>>) {
55 sleep(Duration::from_secs(10));
58 let mut flight_address = String::from("");
59 let mut mbytes_per_file = 420usize;
60 let mut write_stream_path = String::from("");
61 let mut send_tof_summary_packets = false;
62 let mut send_rbwaveform_packets = false;
63 let mut send_tof_event_packets = false;
65 let mut write_stream = false;
66 let mut send_rbwf_every_x_event = 1;
67 let mut hb_interval = Duration::from_secs(20u64);
69 match thread_control.lock() {
70 Ok(mut tc) => {
71 tc.thread_data_sink_active = true;
72 flight_address = tc.liftof_settings.data_publisher_settings.fc_pub_address.clone();
73 mbytes_per_file = tc.liftof_settings.data_publisher_settings.mbytes_per_file;
74 write_stream_path = tc.liftof_settings.data_publisher_settings.data_dir.clone();
75 write_stream = tc.write_data_to_disk;
76 send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
77 send_rbwaveform_packets = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
78 send_tof_event_packets = tc.liftof_settings.data_publisher_settings.send_tof_event_packets;
79 send_rbwf_every_x_event = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
80 hb_interval = Duration::from_secs(tc.liftof_settings.data_publisher_settings.hb_send_interval as u64);
81 },
82 Err(err) => {
83 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
84 },
85 }
86
87 if send_rbwf_every_x_event == 0 {
88 error!("0 is not a reasonable value for send_rbwf_every_x_event!. We will switch of the sending of RBWaveforms instead!");
89 send_rbwaveform_packets = false;
90 }
91
92 let mut evid_check = Vec::<u32>::new();
93
94 let ctx = zmq::Context::new();
95 let data_socket = ctx.socket(zmq::PUB).expect("Can not create socket!");
97 let unlim : i32 = 1000000;
98 data_socket.set_sndhwm(unlim).unwrap();
99 match data_socket.bind(&flight_address) {
101 Err(err) => panic!("Can not bind to address {}! {}", flight_address, err),
106 Ok(_) => ()
107 }
108 info!("ZMQ PUB Socket for global data sink bound to {flight_address}");
109
110 let mut timer = Instant::now();
114 let mut check_settings_timer = Instant::now();
115
116 let mut writer : Option<TofPacketWriter> = None;
118 let mut runid : u32 = 0;
119 let mut new_run_start = false;
120 let mut retire = false;
121 let mut heartbeat = HeartBeatDataSink::new();
122 let mut hb_timer = Instant::now();
123 loop {
125 if retire {
126 warn!("Will end data sink thread in 25 seconds!");
129 println!("= =>Will end data sink thread in 25 seconds!");
130 sleep(Duration::from_secs(25));
131 break;
132 }
133 if check_settings_timer.elapsed().as_secs_f32() > 1.5 {
137 match thread_control.try_lock() {
138 Ok(mut tc) => {
139 send_tof_event_packets = tc.liftof_settings.data_publisher_settings.send_tof_event_packets;
140 send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
141 send_rbwaveform_packets = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
142
143 if tc.stop_flag {
144 tc.thread_data_sink_active = false;
145 retire = true;
147 }
148 if tc.new_run_start_flag {
149 new_run_start = true;
150 write_stream = tc.write_data_to_disk;
151 write_stream_path = tc.liftof_settings.data_publisher_settings.data_dir.clone();
152 runid = tc.run_id;
153 write_stream_path += &(format!("/{}/", runid));
154 tc.new_run_start_flag = false;
155 }
156 },
157 Err(err) => {
158 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
159 },
160 }
161 check_settings_timer = Instant::now();
162 }
163 if write_stream && new_run_start {
164 let file_type = FileType::RunFile(runid as u32);
165 writer = Some(TofPacketWriter::new(write_stream_path.clone(), file_type));
167 writer.as_mut().unwrap().mbytes_per_file = mbytes_per_file as usize;
168 new_run_start = false;
169 } else if !write_stream {
170 writer = None;
171 }
172 let mut send_this_packet = true;
173 match incoming.recv() {
174 Err(err) => trace!("No new packet, err {err}"),
175 Ok(pack) => {
176 debug!("Got new tof packet {}", pack.packet_type);
177 if writer.is_some() {
178 match pack.packet_type {
179 PacketType::TofEventSummary
180 | PacketType::RBWaveform => (),
181 _ => {
182 writer.as_mut().unwrap().add_tof_packet(&pack);
183 heartbeat.n_pack_write_disk += 1;
184 heartbeat.n_bytes_written += pack.payload.len() as u64;
185 }
186 }
187 }
188
189 match pack.packet_type {
190 PacketType::TofEvent => {
191 send_this_packet = send_tof_event_packets;
192 }
193 PacketType::RBWaveform => {
194 send_this_packet = send_rbwaveform_packets;
195 }
196 PacketType::TofEventSummary => {
197 send_this_packet = send_tof_summary_packets;
198 }
199 _ => ()
200 }
201 if send_this_packet {
202 match data_socket.send(pack.to_bytestream(),0) {
203 Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
204 Ok(_) => {
205 trace!("TofPacket sent");
206 heartbeat.n_packets_sent += 1;
207 }
208 } }
210 } } let evid_check_len = evid_check.len();
216 if timer.elapsed().as_secs() > 10 {
217 if evid_check_len > 0 {
219 let mut evid = evid_check[0];
220 for _ in 0..evid_check_len {
221 if !evid_check.contains(&evid) {
222 heartbeat.n_evid_missing += 1;
223 heartbeat.n_evid_chunksize = evid_check_len as u64;
224 }
225 evid += 1;
226 }
227 }
228 timer = Instant::now();
229 }
230 if hb_timer.elapsed() >= hb_interval {
231 heartbeat.met += hb_timer.elapsed().as_secs();
232
233 match data_socket.send(heartbeat.pack().to_bytestream(),0) {
234 Err(err) => error!("Not able to send heartbeat over 0MQ PUB! {err}"),
235 Ok(_) => {
236 trace!("Heartbeat sent");
237 }
238 }
239 evid_check.clear();
240 hb_timer = Instant::now();
241 }
242 } }