liftof_cc/threads/
global_data_sink.rs1use std::time::{
11 Instant,
12 Duration,
13};
14use std::thread::sleep;
15use std::sync::{
16 Arc,
17 Mutex,
18};
19
20use crossbeam_channel::Receiver;
21
22use gondola_core::prelude::*;
23
24pub fn global_data_sink(incoming : &Receiver<TofPacket>,
38 thread_control : Arc<Mutex<ThreadControl>>) {
39 sleep(Duration::from_secs(10));
42 let mut flight_address = String::from("");
43 let mut mbytes_per_file = 420usize;
44 let mut write_stream_path = String::from("");
45 let mut send_tof_summary_packets = false;
46 let mut send_rbwaveform_packets = false;
47 let mut send_tof_event_packets = false;
49 let mut write_stream = false;
50 let mut send_rbwf_every_x_event = 1;
51 let mut hb_interval = Duration::from_secs(20u64);
53 match thread_control.lock() {
54 Ok(mut tc) => {
55 tc.thread_data_sink_active = true;
56 flight_address = tc.liftof_settings.data_publisher_settings.fc_pub_address.clone();
57 mbytes_per_file = tc.liftof_settings.data_publisher_settings.mbytes_per_file;
58 write_stream_path = tc.liftof_settings.data_publisher_settings.data_dir.clone();
59 write_stream = tc.write_data_to_disk;
60 send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
61 send_rbwaveform_packets = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
62 send_tof_event_packets = tc.liftof_settings.data_publisher_settings.send_tof_event_packets;
63 send_rbwf_every_x_event = tc.liftof_settings.data_publisher_settings.send_rbwf_every_x_event;
64 hb_interval = Duration::from_secs(tc.liftof_settings.data_publisher_settings.hb_send_interval as u64);
65 },
66 Err(err) => {
67 error!("Can't acquire lock for ThreadControl! Unable to set calibration mode! {err}");
68 },
69 }
70
71 if send_rbwf_every_x_event == 0 {
72 error!("0 is not a reasonable value for send_rbwf_every_x_event!. We will switch of the sending of RBWaveforms instead!");
73 send_rbwaveform_packets = false;
74 }
75
76 let mut evid_check = Vec::<u32>::new();
77
78 let ctx = zmq::Context::new();
79 let data_socket = ctx.socket(zmq::PUB).expect("Can not create socket!");
81 let unlim : i32 = 1000000;
82 data_socket.set_sndhwm(unlim).unwrap();
83 match data_socket.bind(&flight_address) {
85 Err(err) => panic!("Can not bind to address {}! {}", flight_address, err),
90 Ok(_) => ()
91 }
92 info!("ZMQ PUB Socket for global data sink bound to {flight_address}");
93
94 let mut timer = Instant::now();
98 let mut check_settings_timer = Instant::now();
99
100 let mut writer : Option<TofPacketWriter> = None;
102 let mut runid : u32 = 0;
103 let mut new_run_start = false;
104 let mut retire = false;
105 let mut heartbeat = DataSinkHB::new();
106 let mut hb_timer = Instant::now();
107 loop {
109 if retire {
110 warn!("Will end data sink thread in 25 seconds!");
113 println!("= =>Will end data sink thread in 25 seconds!");
114 sleep(Duration::from_secs(25));
115 break;
116 }
117 if check_settings_timer.elapsed().as_secs_f32() > 1.5 {
121 match thread_control.try_lock() {
122 Ok(mut tc) => {
123 send_tof_event_packets = tc.liftof_settings.data_publisher_settings.send_tof_event_packets;
124 send_tof_summary_packets = tc.liftof_settings.data_publisher_settings.send_tof_summary_packets;
125 send_rbwaveform_packets = tc.liftof_settings.data_publisher_settings.send_rbwaveform_packets;
126
127 if tc.stop_flag {
128 tc.thread_data_sink_active = false;
129 retire = true;
131 }
132 if tc.new_run_start_flag {
133 new_run_start = true;
134 write_stream = tc.write_data_to_disk;
135 write_stream_path = tc.liftof_settings.data_publisher_settings.data_dir.clone();
136 runid = tc.run_id;
137 write_stream_path += &(format!("/{}/", runid));
138 tc.new_run_start_flag = false;
139 }
140 },
141 Err(err) => {
142 error!("Can't acquire lock for ThreadControl! {err}");
143 },
144 }
145 check_settings_timer = Instant::now();
146 }
147 if write_stream && new_run_start {
148 let file_type = FileType::RunFile(runid as u32);
149 writer = Some(TofPacketWriter::new(write_stream_path.clone(), file_type));
151 writer.as_mut().unwrap().mbytes_per_file = mbytes_per_file as usize;
152 new_run_start = false;
153 } else if !write_stream {
154 writer = None;
155 }
156 let mut send_this_packet = true;
157 match incoming.recv() {
158 Err(err) => trace!("No new packet, err {err}"),
159 Ok(pack) => {
160 debug!("Got new tof packet {}", pack.packet_type);
161 if writer.is_some() {
163 match pack.packet_type {
164 TofPacketType::TofEvent
165 | TofPacketType::RBWaveform => (),
166 _ => {
167 writer.as_mut().unwrap().add_tof_packet(&pack);
168 heartbeat.n_pack_write_disk += 1;
169 heartbeat.n_bytes_written += pack.payload.len() as u64;
170 }
171 }
172 }
173
174 match pack.packet_type {
177 TofPacketType::TofEventDeprecated => {
178 send_this_packet = send_tof_event_packets;
179 }
180 TofPacketType::RBWaveform => {
181 send_this_packet = send_rbwaveform_packets;
182 }
183 TofPacketType::TofEvent => {
184 send_this_packet = send_tof_summary_packets;
185 }
186 _ => ()
187 }
188 if send_this_packet {
189 match data_socket.send(pack.to_bytestream(),0) {
190 Err(err) => error !("Not able to send packet over 0MQ PUB! {err}"),
191 Ok(_) => {
192 trace!("TofPacket sent");
193 heartbeat.n_packets_sent += 1;
194 }
195 } }
197 } } let evid_check_len = evid_check.len();
203 if timer.elapsed().as_secs() > 10 {
204 if evid_check_len > 0 {
206 let mut evid = evid_check[0];
207 for _ in 0..evid_check_len {
208 if !evid_check.contains(&evid) {
209 heartbeat.n_evid_missing += 1;
210 heartbeat.n_evid_chunksize = evid_check_len as u64;
211 }
212 evid += 1;
213 }
214 }
215 timer = Instant::now();
216 }
217 if hb_timer.elapsed() >= hb_interval {
218 heartbeat.met += hb_timer.elapsed().as_secs();
219
220 match data_socket.send(heartbeat.pack().to_bytestream(),0) {
221 Err(err) => error!("Not able to send heartbeat over 0MQ PUB! {err}"),
222 Ok(_) => {
223 trace!("Heartbeat sent");
224 }
225 }
226 evid_check.clear();
227 hb_timer = Instant::now();
228 }
229 } }