liftof_rb/threads/
data_publisher.rs1use std::env;
5use std::path::Path;
6use std::fs;
7use std::fs::OpenOptions;
8use std::ffi::OsString;
9use std::fs::File;
10use std::io::Write;
11use std::time::Instant;
12use std::sync::{
13 Arc,
14 Mutex,
15};
16
17use crossbeam_channel::Receiver;
18
19use gondola_core::prelude::*;
20use crate::api::{
28 prefix_board_id_noquery,
29 prefix_local,
30};
31use crate::control::get_board_id;
32
33pub fn data_publisher(data : &Receiver<TofPacket>,
52 address : String,
53 output_fname : Option<String> ,
54 print_packets : bool,
55 thread_control : Arc<Mutex<ThreadControl>>) {
56
57 let ctx = zmq::Context::new();
58 let data_socket = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
59 data_socket.bind(&address).expect("Unable to bind to data (PUB) socket {data_adress}");
60 info!("0MQ PUB socket bound to address {address}");
61
62 let mut file_on_disk : Option<File> = None;let fname : String;
64 let write_to_disk;
65 match output_fname {
66 None => {
67 fname = String::from("Unknown.tof.gaps");
68 write_to_disk = false;
69 }
70 Some(_fname) => {
71 fname = _fname;
72 write_to_disk = true;
73 }
74 }
75 let datafile_output_file = Path::new(&fname);
76 let home = env::var_os("HOME").unwrap_or(OsString::from("/home/gaps"));
79 let calib_dir = home.to_string_lossy().to_string() + "/calib";
80 if fname.ends_with("cali.tof.gaps") {
81 match fs::metadata(&calib_dir) {
82 Ok(metadata) => {
83 if !metadata.is_dir() {
85 error!("The path exists, but it is not a directory.");
86 }
87 }
88 Err(_) => {
89 warn!("No calibration directory found. Will create {}", calib_dir);
91 match fs::create_dir(calib_dir.clone()) {
92 Ok(_) => (),
93 Err(err) => {
94 error!("Can not create {}! Err {err}", calib_dir)
95 }
96 }
97 }
98 } let calib_file = Path::new(&calib_dir);
100 let local_file = calib_file.join(fname);
101 info!("Writing calibration to {}", local_file.display() );
102 file_on_disk = OpenOptions::new().create(true).write(true).open(local_file).ok()
103 } else {
104 if write_to_disk {
105 info!("Writing to local file {}!", fname );
106 file_on_disk = OpenOptions::new().append(true).create(true).open(datafile_output_file).ok()
107 }
108 }
109
110 let board_id = get_board_id().unwrap_or(0) as u8;
111 if board_id == 0 {
112 error!("We could not get the board id!");
113 }
114 let mut sigint_received = false;
115 let mut kill_timer = Instant::now();
116 let mut n_sent = 0usize;
117 loop {
118 if sigint_received && kill_timer.elapsed().as_secs() > 10 {
120 info!("Kill timer expired. Ending thread!");
121 break;
122 }
123 match thread_control.lock() {
124 Ok(tc) => {
125 if tc.stop_flag {
126 info!("Received stop signal. Will stop thread!");
127 sigint_received = true;
128 kill_timer = Instant::now();
129 }
130 },
131 Err(err) => {
132 trace!("Can't acquire lock! {err}");
133 },
134 }
135 let mut data_type = DataType::Unknown;
136 match data.recv() {
137 Err(err) => trace!("Error receiving TofPacket {err}"),
138 Ok(packet) => {
139 if matches!(packet.packet_type, TofPacketType::RBEvent) {
140 match RBEvent::extract_datatype(&packet.payload) {
141 Ok(dtype) => {
142 data_type = dtype;
143 }
144 Err(err) => {
145 error!("Unable to extract data type! Err {err}");
146 }
147 }
148 }
149 if write_to_disk && !packet.no_write_to_disk {
150 match &mut file_on_disk {
151 None => error!("We want to write data, however the file is invalid!"),
152 Some(f) => {
153 match f.write_all(packet.to_bytestream().as_slice()) {
154 Err(err) => error!("Writing file to disk failed! Err {err}"),
155 Ok(()) => ()
156 }
157 match f.sync_all() {
161 Err(err) => error!("Unable to sync file to disk! {err}"),
162 Ok(()) => ()
163 }
164 }
165 }
166 }
167
168 let tp_payload : Vec<u8>;
171 match data_type {
172 DataType::VoltageCalibration |
179 DataType::TimingCalibration |
180 DataType::Noi => {
181 tp_payload = prefix_local(&mut packet.to_bytestream());
182 },
183 _ => {
184 tp_payload = prefix_board_id_noquery(board_id, &mut packet.to_bytestream());
185 }
186 }
187 match data_socket.send(tp_payload,zmq::DONTWAIT) {
188 Ok(_) => {
189 trace!("0MQ PUB socket.send() SUCCESS!");
190 n_sent += 1;
191 },
192 Err(err) => error!("Not able to send {} over 0MQ PUB socket! {err}", packet.packet_type),
193 }
194 if packet.packet_type == TofPacketType::RBCalibration {
195 info!("==> Calibration packet {} sent!", packet );
197 }
198 if n_sent % 1000 == 0 && n_sent > 0 && print_packets {
199 println!("==> We sent {n_sent} packets!");
200 println!("==> Last Tofpacket type: {} with {} bytes!", packet.packet_type, packet.payload.len());
201 }
202 }
203 }
204 }
205}
206