liftof_rb/threads/
data_publisher.rs1use std::env;
5use std::path::Path;
6use std::fs;
7use std::fs::OpenOptions;
8use std::ffi::OsString;
9use std::fs::File;
10use std::io::Write;
11use std::time::Instant;
12use std::sync::{
13 Arc,
14 Mutex,
15};
16
17use crossbeam_channel::Receiver;
18
19use gondola_core::prelude::*;
20
21use crate::api::{
22 prefix_board_id_noquery,
23 prefix_local,
24};
25use crate::control::get_board_id;
26
27pub fn data_publisher(data : &Receiver<TofPacket>,
46 address : String,
47 output_fname : Option<String> ,
48 print_packets : bool,
49 thread_control : Arc<Mutex<ThreadControl>>) {
50
51 let ctx = zmq::Context::new();
52 let data_socket = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
53 data_socket.bind(&address).expect("Unable to bind to data (PUB) socket {data_adress}");
54 info!("0MQ PUB socket bound to address {address}");
55
56 let mut file_on_disk : Option<File> = None;let fname : String;
58 let write_to_disk;
59 match output_fname {
60 None => {
61 fname = String::from("Unknown.tof.gaps");
62 write_to_disk = false;
63 }
64 Some(_fname) => {
65 fname = _fname;
66 write_to_disk = true;
67 }
68 }
69 let datafile_output_file = Path::new(&fname);
70 let home = env::var_os("HOME").unwrap_or(OsString::from("/home/gaps"));
73 let calib_dir = home.to_string_lossy().to_string() + "/calib";
74 if fname.ends_with("cali.tof.gaps") {
75 match fs::metadata(&calib_dir) {
76 Ok(metadata) => {
77 if !metadata.is_dir() {
79 error!("The path exists, but it is not a directory.");
80 }
81 }
82 Err(_) => {
83 warn!("No calibration directory found. Will create {}", calib_dir);
85 match fs::create_dir(calib_dir.clone()) {
86 Ok(_) => (),
87 Err(err) => {
88 error!("Can not create {}! Err {err}", calib_dir)
89 }
90 }
91 }
92 } let calib_file = Path::new(&calib_dir);
94 let local_file = calib_file.join(fname);
95 info!("Writing calibration to {}", local_file.display() );
96 file_on_disk = OpenOptions::new().create(true).write(true).open(local_file).ok()
97 } else {
98 if write_to_disk {
99 info!("Writing to local file {}!", fname );
100 file_on_disk = OpenOptions::new().append(true).create(true).open(datafile_output_file).ok()
101 }
102 }
103
104 let board_id = get_board_id().unwrap_or(0) as u8;
105 if board_id == 0 {
106 error!("We could not get the board id!");
107 }
108 let mut sigint_received = false;
109 let mut kill_timer = Instant::now();
110 let mut n_sent = 0usize;
111 loop {
112 if sigint_received && kill_timer.elapsed().as_secs() > 10 {
114 info!("Kill timer expired. Ending thread!");
115 break;
116 }
117 match thread_control.lock() {
118 Ok(tc) => {
119 if tc.stop_flag {
120 info!("Received stop signal. Will stop thread!");
121 sigint_received = true;
122 kill_timer = Instant::now();
123 }
124 },
125 Err(err) => {
126 trace!("Can't acquire lock! {err}");
127 },
128 }
129 let mut data_type = DataType::Unknown;
130 match data.recv() {
131 Err(err) => trace!("Error receiving TofPacket {err}"),
132 Ok(packet) => {
133 if matches!(packet.packet_type, TofPacketType::RBEvent) {
134 match RBEvent::extract_datatype(&packet.payload) {
135 Ok(dtype) => {
136 data_type = dtype;
137 }
138 Err(err) => {
139 error!("Unable to extract data type! Err {err}");
140 }
141 }
142 }
143 if write_to_disk && !packet.no_write_to_disk {
144 match &mut file_on_disk {
145 None => error!("We want to write data, however the file is invalid!"),
146 Some(f) => {
147 match f.write_all(packet.to_bytestream().as_slice()) {
148 Err(err) => error!("Writing file to disk failed! Err {err}"),
149 Ok(()) => ()
150 }
151 match f.sync_all() {
155 Err(err) => error!("Unable to sync file to disk! {err}"),
156 Ok(()) => ()
157 }
158 }
159 }
160 }
161
162 let tp_payload : Vec<u8>;
165 match data_type {
166 DataType::VoltageCalibration |
173 DataType::TimingCalibration |
174 DataType::Noi => {
175 tp_payload = prefix_local(&mut packet.to_bytestream());
176 },
177 _ => {
178 tp_payload = prefix_board_id_noquery(board_id, &mut packet.to_bytestream());
179 }
180 }
181 match data_socket.send(tp_payload,zmq::DONTWAIT) {
182 Ok(_) => {
183 trace!("0MQ PUB socket.send() SUCCESS!");
184 n_sent += 1;
185 },
186 Err(err) => error!("Not able to send {} over 0MQ PUB socket! {err}", packet.packet_type),
187 }
188 if packet.packet_type == TofPacketType::RBCalibration {
189 info!("==> Calibration packet {} sent!", packet );
191 }
192 if n_sent % 1000 == 0 && n_sent > 0 && print_packets {
193 println!("==> We sent {n_sent} packets!");
194 println!("==> Last Tofpacket type: {} with {} bytes!", packet.packet_type, packet.payload.len());
195 }
196 }
197 }
198 }
199}
200