liftof_cc/threads/
global_data_sink.rs1use std::time::{
11 Instant,
12 Duration,
13};
14use std::thread::sleep;
15use std::sync::{
16 Arc,
17 Mutex,
18};
19
20use crossbeam_channel::Receiver;
21
22use gondola_core::prelude::*;
23
24pub fn global_data_sink(incoming : &Receiver<TofPacket>,
37 thread_control : Arc<Mutex<ThreadControl>>) {
38 sleep(Duration::from_secs(10));
41 let mut flight_address = String::from("");
42 let mut mbytes_per_file = 420usize;
43 let mut write_stream_path = String::from("");
44 let mut send_tof_summary_packets = false;
45 let mut send_rbwaveform_packets = false;
46 let mut send_tof_event_packets = false;
48 let mut write_stream = false;
49 let mut send_rbwf_every_x_event = 1;
50 let mut hb_interval = Duration::from_secs(20u64);
52 match thread_control.lock() {
53 Ok(mut tc) => {
54 tc.thread_data_sink_active = true;
55 flight_address = tc.liftof_settings.data_publisher_settings.fc_pub_address.clone();
56 mbytes_per_file = tc.liftof_settings.data_publisher_settings.mbytes_per_file;
57 write_stream_path = tc.liftof_settings.data_publisher_settings.data_dir.clone();
58 write_stream = tc.write_data_to_disk;
59 send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
60 send_rbwaveform_packets = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
61 send_tof_event_packets = tc.liftof_settings.data_publisher_settings.send_tof_event_packets;
62 send_rbwf_every_x_event = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
63 hb_interval = Duration::from_secs(tc.liftof_settings.data_publisher_settings.hb_send_interval as u64);
64 },
65 Err(err) => {
66 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
67 },
68 }
69
70 if send_rbwf_every_x_event == 0 {
71 error!("0 is not a reasonable value for send_rbwf_every_x_event!. We will switch of the sending of RBWaveforms instead!");
72 send_rbwaveform_packets = false;
73 }
74
75 let mut evid_check = Vec::<u32>::new();
76
77 let ctx = zmq::Context::new();
78 let data_socket = ctx.socket(zmq::PUB).expect("Can not create socket!");
80 let unlim : i32 = 1000000;
81 data_socket.set_sndhwm(unlim).unwrap();
82 match data_socket.bind(&flight_address) {
84 Err(err) => panic!("Can not bind to address {}! {}", flight_address, err),
89 Ok(_) => ()
90 }
91 info!("ZMQ PUB Socket for global data sink bound to {flight_address}");
92
93 let mut timer = Instant::now();
97 let mut check_settings_timer = Instant::now();
98
99 let mut writer : Option<TofPacketWriter> = None;
101 let mut runid : u32 = 0;
102 let mut new_run_start = false;
103 let mut retire = false;
104 let mut heartbeat = DataSinkHB::new();
105 let mut hb_timer = Instant::now();
106 loop {
108 if retire {
109 warn!("Will end data sink thread in 25 seconds!");
112 println!("= =>Will end data sink thread in 25 seconds!");
113 sleep(Duration::from_secs(25));
114 break;
115 }
116 if check_settings_timer.elapsed().as_secs_f32() > 1.5 {
120 match thread_control.try_lock() {
121 Ok(mut tc) => {
122 send_tof_event_packets = tc.liftof_settings.data_publisher_settings.send_tof_event_packets;
123 send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
124 send_rbwaveform_packets = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
125
126 if tc.stop_flag {
127 tc.thread_data_sink_active = false;
128 retire = true;
130 }
131 if tc.new_run_start_flag {
132 new_run_start = true;
133 write_stream = tc.write_data_to_disk;
134 write_stream_path = tc.liftof_settings.data_publisher_settings.data_dir.clone();
135 runid = tc.run_id;
136 write_stream_path += &(format!("/{}/", runid));
137 tc.new_run_start_flag = false;
138 }
139 },
140 Err(err) => {
141 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
142 },
143 }
144 check_settings_timer = Instant::now();
145 }
146 if write_stream && new_run_start {
147 let file_type = FileType::RunFile(runid as u32);
148 writer = Some(TofPacketWriter::new(write_stream_path.clone(), file_type));
150 writer.as_mut().unwrap().mbytes_per_file = mbytes_per_file as usize;
151 new_run_start = false;
152 } else if !write_stream {
153 writer = None;
154 }
155 let mut send_this_packet = true;
156 match incoming.recv() {
157 Err(err) => trace!("No new packet, err {err}"),
158 Ok(pack) => {
159 debug!("Got new tof packet {}", pack.packet_type);
160 if writer.is_some() {
162 match pack.packet_type {
163 TofPacketType::TofEvent
164 | TofPacketType::RBWaveform => (),
165 _ => {
166 writer.as_mut().unwrap().add_tof_packet(&pack);
167 heartbeat.n_pack_write_disk += 1;
168 heartbeat.n_bytes_written += pack.payload.len() as u64;
169 }
170 }
171 }
172
173 match pack.packet_type {
176 TofPacketType::TofEventDeprecated => {
177 send_this_packet = send_tof_event_packets;
178 }
179 TofPacketType::RBWaveform => {
180 send_this_packet = send_rbwaveform_packets;
181 }
182 TofPacketType::TofEvent => {
183 send_this_packet = send_tof_summary_packets;
184 }
185 _ => ()
186 }
187 if send_this_packet {
188 match data_socket.send(pack.to_bytestream(),0) {
189 Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
190 Ok(_) => {
191 trace!("TofPacket sent");
192 heartbeat.n_packets_sent += 1;
193 }
194 } }
196 } } let evid_check_len = evid_check.len();
202 if timer.elapsed().as_secs() > 10 {
203 if evid_check_len > 0 {
205 let mut evid = evid_check[0];
206 for _ in 0..evid_check_len {
207 if !evid_check.contains(&evid) {
208 heartbeat.n_evid_missing += 1;
209 heartbeat.n_evid_chunksize = evid_check_len as u64;
210 }
211 evid += 1;
212 }
213 }
214 timer = Instant::now();
215 }
216 if hb_timer.elapsed() >= hb_interval {
217 heartbeat.met += hb_timer.elapsed().as_secs();
218
219 match data_socket.send(heartbeat.pack().to_bytestream(),0) {
220 Err(err) => error!("Not able to send heartbeat over 0MQ PUB! {err}"),
221 Ok(_) => {
222 trace!("Heartbeat sent");
223 }
224 }
225 evid_check.clear();
226 hb_timer = Instant::now();
227 }
228 } }