liftof_rb/threads/
data_publisher.rs

1//! Make sure everything gets sent out over zmq to the TOF CPU (or 
2//! whoever is listening.
3
4use std::env;
5use std::path::Path;
6use std::fs;
7use std::fs::OpenOptions; 
8use std::ffi::OsString;
9use std::fs::File;
10use std::io::Write;
11use std::time::Instant;
12use std::sync::{
13    Arc,
14    Mutex,
15};
16
17use crossbeam_channel::Receiver;
18
19use gondola_core::prelude::*;
20//use tof_dataclasses::packets::{TofPacket,
21//                               PacketType};
22//use tof_dataclasses::events::{RBEvent,
23//                              DataType};
24//use tof_dataclasses::serialization::Serialization;
25//use liftof_lib::thread_control::ThreadControl;
26
27use crate::api::{
28    prefix_board_id_noquery,
29    prefix_local,
30};
31use crate::control::get_board_id;
32
33/// Manage the 0MQ PUB socket and send everything 
34/// which comes in over the wire as a byte 
35/// payload
36///
37/// # Arguments 
38/// * write_to_disk : Write data to local disk (most likely
39///                   a SD card). This option should be only
40///                   used for diagnostic purposes.
41/// * address       : IP address to use for the local PUB 
42///                   socket to publish data over the 
43///                   network
44/// * output_fname  : In case a local file should be written,
45///                   write it with this name.
46///                   In case of a calibration file, then 
47///                   also save it in the dedicated foler.
48///                   If this is None, don't write anything.
49/// * print_packets : Print outgoing packets to terminal
50///
51pub fn data_publisher(data           : &Receiver<TofPacket>,
52                      address        : String,
53                      output_fname   : Option<String> ,
54                      print_packets  : bool,
55                      thread_control : Arc<Mutex<ThreadControl>>) {
56
57  let ctx = zmq::Context::new();
58  let data_socket = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
59  data_socket.bind(&address).expect("Unable to bind to data (PUB) socket {data_adress}");
60  info!("0MQ PUB socket bound to address {address}");
61
62  let mut file_on_disk : Option<File> = None;//let mut output = File::create(path)?;
63  let fname : String;
64  let write_to_disk;
65  match output_fname {
66    None => {
67      fname = String::from("Unknown.tof.gaps");
68      write_to_disk = false;
69    }
70    Some(_fname) => {
71      fname = _fname;
72      write_to_disk = true;
73    }
74  }
75  let datafile_output_file = Path::new(&fname);
76  // in case it is a calibration file, delete any old 
77  // calibration and write it to a specific location
78  let home      = env::var_os("HOME").unwrap_or(OsString::from("/home/gaps"));
79  let calib_dir = home.to_string_lossy().to_string() + "/calib"; 
80  if fname.ends_with("cali.tof.gaps") {
81    match fs::metadata(&calib_dir) {
82      Ok(metadata) => {
83        // Check if the metadata is for a directory
84        if !metadata.is_dir() {
85          error!("The path exists, but it is not a directory.");
86        }
87      }
88      Err(_) => {
89        // An error occurred, which typically means the directory does not exist
90        warn!("No calibration directory found. Will create {}", calib_dir);
91        match fs::create_dir(calib_dir.clone()) {
92          Ok(_) => (),
93          Err(err) => {
94            error!("Can not create {}! Err {err}", calib_dir)
95          }
96        }
97      }
98    } // end match
99    let calib_file = Path::new(&calib_dir);
100    let local_file = calib_file.join(fname);
101    info!("Writing calibration to {}", local_file.display() );
102    file_on_disk = OpenOptions::new().create(true).write(true).open(local_file).ok()
103  } else {
104    if write_to_disk {
105      info!("Writing to local file {}!", fname );
106      file_on_disk = OpenOptions::new().append(true).create(true).open(datafile_output_file).ok()
107    }
108  }
109 
110  let board_id     = get_board_id().unwrap_or(0) as u8;
111  if board_id == 0 {
112    error!("We could not get the board id!");
113  }
114  let mut sigint_received = false;
115  let mut kill_timer      = Instant::now();
116  let mut n_sent          = 0usize;
117  loop {
118    // check if we should end this
119    if sigint_received && kill_timer.elapsed().as_secs() > 10 {
120      info!("Kill timer expired. Ending thread!");
121      break;
122    }
123    match thread_control.lock() {
124      Ok(tc) => {
125        if tc.stop_flag {
126          info!("Received stop signal. Will stop thread!");
127          sigint_received = true;
128          kill_timer      = Instant::now();
129        }
130      },
131      Err(err) => {
132        trace!("Can't acquire lock! {err}");
133      },
134    }
135    let mut data_type = DataType::Unknown;
136    match data.recv() {
137      Err(err) => trace!("Error receiving TofPacket {err}"),
138      Ok(packet)    => {
139        if matches!(packet.packet_type, TofPacketType::RBEvent) {
140          match RBEvent::extract_datatype(&packet.payload) {
141            Ok(dtype) => {
142              data_type = dtype;
143            }
144            Err(err) => {
145              error!("Unable to extract data type! Err {err}");
146            }
147          }
148        }
149        if write_to_disk && !packet.no_write_to_disk {
150          match &mut file_on_disk {
151            None => error!("We want to write data, however the file is invalid!"),
152            Some(f) => {
153              match f.write_all(packet.to_bytestream().as_slice()) {
154                Err(err) => error!("Writing file to disk failed! Err {err}"),
155                Ok(()) => ()
156              }
157              // local file can be synced, rate should be in general 
158              // low when we are writing to the local file.
159              // Careful with SD card!
160              match f.sync_all() {
161                Err(err) => error!("Unable to sync file to disk! {err}"),
162                Ok(()) => ()
163              }
164            }
165          }
166        }
167        
168        // prefix the board id, except for our Voltage, Timing and NOI 
169        // packages. For those, we prefix with LOCAL 
170        let tp_payload : Vec<u8>;
171        match data_type {
172          // FIXME - this makes that data types for 
173          // calibration will be rerouted back to 
174          // the same board. We have to make that 
175          // behaviour configurable. 
176          // It can simply subscribe to the same 
177          // message?
178          DataType::VoltageCalibration |
179          DataType::TimingCalibration  | 
180          DataType::Noi => {
181            tp_payload = prefix_local(&mut packet.to_bytestream());
182          },
183          _ => {
184            tp_payload = prefix_board_id_noquery(board_id, &mut packet.to_bytestream());
185          }
186        }
187        match data_socket.send(tp_payload,zmq::DONTWAIT) {
188          Ok(_)    => {
189            trace!("0MQ PUB socket.send() SUCCESS!");
190            n_sent += 1;
191          },
192          Err(err) => error!("Not able to send {} over 0MQ PUB socket! {err}", packet.packet_type),
193        }
194        if packet.packet_type == TofPacketType::RBCalibration {
195          //info!("==> last data type {:?}", data_type);
196          info!("==> Calibration packet {} sent!", packet );
197        }
198        if n_sent % 1000 == 0 && n_sent > 0 && print_packets {
199          println!("==> We sent {n_sent} packets!");
200          println!("==> Last Tofpacket type: {} with {} bytes!", packet.packet_type, packet.payload.len());
201        }
202      }
203    }
204  }
205}
206