liftof_rb/threads/
data_publisher.rs1use std::env;
2use std::path::Path;
3use std::fs;
4use std::fs::OpenOptions;
5use std::ffi::OsString;
6use std::fs::File;
7use std::io::Write;
8use std::time::Instant;
9use std::sync::{
10 Arc,
11 Mutex,
12};
13
14use crossbeam_channel::Receiver;
15
16use tof_dataclasses::packets::{TofPacket,
17 PacketType};
18use tof_dataclasses::events::{RBEvent,
19 DataType};
20use tof_dataclasses::serialization::Serialization;
21use liftof_lib::thread_control::ThreadControl;
22
23use crate::api::{
24 prefix_board_id_noquery,
25 prefix_local,
26};
27use crate::control::get_board_id;
28
29pub fn data_publisher(data : &Receiver<TofPacket>,
48 address : String,
49 output_fname : Option<String> ,
50 print_packets : bool,
51 thread_control : Arc<Mutex<ThreadControl>>) {
52
53 let ctx = zmq::Context::new();
54 let data_socket = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
55 data_socket.bind(&address).expect("Unable to bind to data (PUB) socket {data_adress}");
56 info!("0MQ PUB socket bound to address {address}");
57
58 let mut file_on_disk : Option<File> = None;let fname : String;
60 let write_to_disk;
61 match output_fname {
62 None => {
63 fname = String::from("Unknown.tof.gaps");
64 write_to_disk = false;
65 }
66 Some(_fname) => {
67 fname = _fname;
68 write_to_disk = true;
69 }
70 }
71 let datafile_output_file = Path::new(&fname);
72 let home = env::var_os("HOME").unwrap_or(OsString::from("/home/gaps"));
75 let calib_dir = home.to_string_lossy().to_string() + "/calib";
76 if fname.ends_with("cali.tof.gaps") {
77 match fs::metadata(&calib_dir) {
78 Ok(metadata) => {
79 if !metadata.is_dir() {
81 error!("The path exists, but it is not a directory.");
82 }
83 }
84 Err(_) => {
85 warn!("No calibration directory found. Will create {}", calib_dir);
87 match fs::create_dir(calib_dir.clone()) {
88 Ok(_) => (),
89 Err(err) => {
90 error!("Can not create {}! Err {err}", calib_dir)
91 }
92 }
93 }
94 } let calib_file = Path::new(&calib_dir);
96 let local_file = calib_file.join(fname);
97 info!("Writing calibration to {}", local_file.display() );
98 file_on_disk = OpenOptions::new().create(true).write(true).open(local_file).ok()
99 } else {
100 if write_to_disk {
101 info!("Writing to local file {}!", fname );
102 file_on_disk = OpenOptions::new().append(true).create(true).open(datafile_output_file).ok()
103 }
104 }
105
106 let board_id = get_board_id().unwrap_or(0) as u8;
107 if board_id == 0 {
108 error!("We could not get the board id!");
109 }
110 let mut sigint_received = false;
111 let mut kill_timer = Instant::now();
112 let mut n_sent = 0usize;
113 loop {
114 if sigint_received && kill_timer.elapsed().as_secs() > 10 {
116 info!("Kill timer expired. Ending thread!");
117 break;
118 }
119 match thread_control.lock() {
120 Ok(tc) => {
121 if tc.stop_flag {
122 info!("Received stop signal. Will stop thread!");
123 sigint_received = true;
124 kill_timer = Instant::now();
125 }
126 },
127 Err(err) => {
128 trace!("Can't acquire lock! {err}");
129 },
130 }
131 let mut data_type = DataType::Unknown;
132 match data.recv() {
133 Err(err) => trace!("Error receiving TofPacket {err}"),
134 Ok(packet) => {
135 if matches!(packet.packet_type, PacketType::RBEvent) {
136 match RBEvent::extract_datatype(&packet.payload) {
137 Ok(dtype) => {
138 data_type = dtype;
139 }
140 Err(err) => {
141 error!("Unable to extract data type! Err {err}");
142 }
143 }
144 }
145 if write_to_disk && !packet.no_write_to_disk {
146 match &mut file_on_disk {
147 None => error!("We want to write data, however the file is invalid!"),
148 Some(f) => {
149 match f.write_all(packet.to_bytestream().as_slice()) {
150 Err(err) => error!("Writing file to disk failed! Err {err}"),
151 Ok(()) => ()
152 }
153 match f.sync_all() {
157 Err(err) => error!("Unable to sync file to disk! {err}"),
158 Ok(()) => ()
159 }
160 }
161 }
162 }
163
164 let tp_payload : Vec<u8>;
167 match data_type {
168 DataType::VoltageCalibration |
175 DataType::TimingCalibration |
176 DataType::Noi => {
177 tp_payload = prefix_local(&mut packet.to_bytestream());
178 },
179 _ => {
180 tp_payload = prefix_board_id_noquery(board_id, &mut packet.to_bytestream());
181 }
182 }
183 match data_socket.send(tp_payload,zmq::DONTWAIT) {
184 Ok(_) => {
185 trace!("0MQ PUB socket.send() SUCCESS!");
186 n_sent += 1;
187 },
188 Err(err) => error!("Not able to send {} over 0MQ PUB socket! {err}", packet.packet_type),
189 }
190 if packet.packet_type == PacketType::RBCalibration {
191 info!("==> Calibration packet {} sent!", packet );
193 }
194 if n_sent % 1000 == 0 && n_sent > 0 && print_packets {
195 println!("==> We sent {n_sent} packets!");
196 println!("==> Last Tofpacket type: {} with {} bytes!", packet.packet_type, packet.payload.len());
197 }
198 }
199 }
200 }
201}
202