liftof_rb/threads/
data_publisher.rs

1use std::env;
2use std::path::Path;
3use std::fs;
4use std::fs::OpenOptions; 
5use std::ffi::OsString;
6use std::fs::File;
7use std::io::Write;
8use std::time::Instant;
9use std::sync::{
10    Arc,
11    Mutex,
12};
13
14use crossbeam_channel::Receiver;
15
16use tof_dataclasses::packets::{TofPacket,
17                               PacketType};
18use tof_dataclasses::events::{RBEvent,
19                              DataType};
20use tof_dataclasses::serialization::Serialization;
21use liftof_lib::thread_control::ThreadControl;
22
23use crate::api::{
24    prefix_board_id_noquery,
25    prefix_local,
26};
27use crate::control::get_board_id;
28
29/// Manage the 0MQ PUB socket and send everything 
30/// which comes in over the wire as a byte 
31/// payload
32///
33/// # Arguments 
34/// * write_to_disk : Write data to local disk (most likely
35///                   a SD card). This option should be only
36///                   used for diagnostic purposes.
37/// * address       : IP address to use for the local PUB 
38///                   socket to publish data over the 
39///                   network
40/// * output_fname  : In case a local file should be written,
41///                   write it with this name.
42///                   In case of a calibration file, then 
43///                   also save it in the dedicated foler.
44///                   If this is None, don't write anything.
45/// * print_packets : Print outgoing packets to terminal
46///
47pub fn data_publisher(data           : &Receiver<TofPacket>,
48                      address        : String,
49                      output_fname   : Option<String> ,
50                      print_packets  : bool,
51                      thread_control : Arc<Mutex<ThreadControl>>) {
52
53  let ctx = zmq::Context::new();
54  let data_socket = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
55  data_socket.bind(&address).expect("Unable to bind to data (PUB) socket {data_adress}");
56  info!("0MQ PUB socket bound to address {address}");
57
58  let mut file_on_disk : Option<File> = None;//let mut output = File::create(path)?;
59  let fname : String;
60  let write_to_disk;
61  match output_fname {
62    None => {
63      fname = String::from("Unknown.tof.gaps");
64      write_to_disk = false;
65    }
66    Some(_fname) => {
67      fname = _fname;
68      write_to_disk = true;
69    }
70  }
71  let datafile_output_file = Path::new(&fname);
72  // in case it is a calibration file, delete any old 
73  // calibration and write it to a specific location
74  let home      = env::var_os("HOME").unwrap_or(OsString::from("/home/gaps"));
75  let calib_dir = home.to_string_lossy().to_string() + "/calib"; 
76  if fname.ends_with("cali.tof.gaps") {
77    match fs::metadata(&calib_dir) {
78      Ok(metadata) => {
79        // Check if the metadata is for a directory
80        if !metadata.is_dir() {
81          error!("The path exists, but it is not a directory.");
82        }
83      }
84      Err(_) => {
85        // An error occurred, which typically means the directory does not exist
86        warn!("No calibration directory found. Will create {}", calib_dir);
87        match fs::create_dir(calib_dir.clone()) {
88          Ok(_) => (),
89          Err(err) => {
90            error!("Can not create {}! Err {err}", calib_dir)
91          }
92        }
93      }
94    } // end match
95    let calib_file = Path::new(&calib_dir);
96    let local_file = calib_file.join(fname);
97    info!("Writing calibration to {}", local_file.display() );
98    file_on_disk = OpenOptions::new().create(true).write(true).open(local_file).ok()
99  } else {
100    if write_to_disk {
101      info!("Writing to local file {}!", fname );
102      file_on_disk = OpenOptions::new().append(true).create(true).open(datafile_output_file).ok()
103    }
104  }
105 
106  let board_id     = get_board_id().unwrap_or(0) as u8;
107  if board_id == 0 {
108    error!("We could not get the board id!");
109  }
110  let mut sigint_received = false;
111  let mut kill_timer      = Instant::now();
112  let mut n_sent          = 0usize;
113  loop {
114    // check if we should end this
115    if sigint_received && kill_timer.elapsed().as_secs() > 10 {
116      info!("Kill timer expired. Ending thread!");
117      break;
118    }
119    match thread_control.lock() {
120      Ok(tc) => {
121        if tc.stop_flag {
122          info!("Received stop signal. Will stop thread!");
123          sigint_received = true;
124          kill_timer      = Instant::now();
125        }
126      },
127      Err(err) => {
128        trace!("Can't acquire lock! {err}");
129      },
130    }
131    let mut data_type = DataType::Unknown;
132    match data.recv() {
133      Err(err) => trace!("Error receiving TofPacket {err}"),
134      Ok(packet)    => {
135        if matches!(packet.packet_type, PacketType::RBEvent) {
136          match RBEvent::extract_datatype(&packet.payload) {
137            Ok(dtype) => {
138              data_type = dtype;
139            }
140            Err(err) => {
141              error!("Unable to extract data type! Err {err}");
142            }
143          }
144        }
145        if write_to_disk && !packet.no_write_to_disk {
146          match &mut file_on_disk {
147            None => error!("We want to write data, however the file is invalid!"),
148            Some(f) => {
149              match f.write_all(packet.to_bytestream().as_slice()) {
150                Err(err) => error!("Writing file to disk failed! Err {err}"),
151                Ok(()) => ()
152              }
153              // local file can be synced, rate should be in general 
154              // low when we are writing to the local file.
155              // Careful with SD card!
156              match f.sync_all() {
157                Err(err) => error!("Unable to sync file to disk! {err}"),
158                Ok(()) => ()
159              }
160            }
161          }
162        }
163        
164        // prefix the board id, except for our Voltage, Timing and NOI 
165        // packages. For those, we prefix with LOCAL 
166        let tp_payload : Vec<u8>;
167        match data_type {
168          // FIXME - this makes that data types for 
169          // calibration will be rerouted back to 
170          // the same board. We have to make that 
171          // behaviour configurable. 
172          // It can simply subscribe to the same 
173          // message?
174          DataType::VoltageCalibration |
175          DataType::TimingCalibration  | 
176          DataType::Noi => {
177            tp_payload = prefix_local(&mut packet.to_bytestream());
178          },
179          _ => {
180            tp_payload = prefix_board_id_noquery(board_id, &mut packet.to_bytestream());
181          }
182        }
183        match data_socket.send(tp_payload,zmq::DONTWAIT) {
184          Ok(_)    => {
185            trace!("0MQ PUB socket.send() SUCCESS!");
186            n_sent += 1;
187          },
188          Err(err) => error!("Not able to send {} over 0MQ PUB socket! {err}", packet.packet_type),
189        }
190        if packet.packet_type == PacketType::RBCalibration {
191          //info!("==> last data type {:?}", data_type);
192          info!("==> Calibration packet {} sent!", packet );
193        }
194        if n_sent % 1000 == 0 && n_sent > 0 && print_packets {
195          println!("==> We sent {n_sent} packets!");
196          println!("==> Last Tofpacket type: {} with {} bytes!", packet.packet_type, packet.payload.len());
197        }
198      }
199    }
200  }
201}
202