liftof_rb/threads/
data_publisher.rs

1//! Make sure everything gets sent out over zmq to the TOF CPU (or 
2//! whoever is listening.
3
4use std::env;
5use std::path::Path;
6use std::fs;
7use std::fs::OpenOptions; 
8use std::ffi::OsString;
9use std::fs::File;
10use std::io::Write;
11use std::time::Instant;
12use std::sync::{
13    Arc,
14    Mutex,
15};
16
17use crossbeam_channel::Receiver;
18
19use gondola_core::prelude::*;
20
21use crate::api::{
22    prefix_board_id_noquery,
23    prefix_local,
24};
25use crate::control::get_board_id;
26
27/// Manage the 0MQ PUB socket and send everything 
28/// which comes in over the wire as a byte 
29/// payload
30///
31/// # Arguments 
32/// * write_to_disk : Write data to local disk (most likely
33///                   a SD card). This option should be only
34///                   used for diagnostic purposes.
35/// * address       : IP address to use for the local PUB 
36///                   socket to publish data over the 
37///                   network
38/// * output_fname  : In case a local file should be written,
39///                   write it with this name.
40///                   In case of a calibration file, then 
41///                   also save it in the dedicated foler.
42///                   If this is None, don't write anything.
43/// * print_packets : Print outgoing packets to terminal
44///
45pub fn data_publisher(data           : &Receiver<TofPacket>,
46                      address        : String,
47                      output_fname   : Option<String> ,
48                      print_packets  : bool,
49                      thread_control : Arc<Mutex<ThreadControl>>) {
50
51  let ctx = zmq::Context::new();
52  let data_socket = ctx.socket(zmq::PUB).expect("Unable to create 0MQ PUB socket!");
53  data_socket.bind(&address).expect("Unable to bind to data (PUB) socket {data_adress}");
54  info!("0MQ PUB socket bound to address {address}");
55
56  let mut file_on_disk : Option<File> = None;//let mut output = File::create(path)?;
57  let fname : String;
58  let write_to_disk;
59  match output_fname {
60    None => {
61      fname = String::from("Unknown.tof.gaps");
62      write_to_disk = false;
63    }
64    Some(_fname) => {
65      fname = _fname;
66      write_to_disk = true;
67    }
68  }
69  let datafile_output_file = Path::new(&fname);
70  // in case it is a calibration file, delete any old 
71  // calibration and write it to a specific location
72  let home      = env::var_os("HOME").unwrap_or(OsString::from("/home/gaps"));
73  let calib_dir = home.to_string_lossy().to_string() + "/calib"; 
74  if fname.ends_with("cali.tof.gaps") {
75    match fs::metadata(&calib_dir) {
76      Ok(metadata) => {
77        // Check if the metadata is for a directory
78        if !metadata.is_dir() {
79          error!("The path exists, but it is not a directory.");
80        }
81      }
82      Err(_) => {
83        // An error occurred, which typically means the directory does not exist
84        warn!("No calibration directory found. Will create {}", calib_dir);
85        match fs::create_dir(calib_dir.clone()) {
86          Ok(_) => (),
87          Err(err) => {
88            error!("Can not create {}! Err {err}", calib_dir)
89          }
90        }
91      }
92    } // end match
93    let calib_file = Path::new(&calib_dir);
94    let local_file = calib_file.join(fname);
95    info!("Writing calibration to {}", local_file.display() );
96    file_on_disk = OpenOptions::new().create(true).write(true).open(local_file).ok()
97  } else {
98    if write_to_disk {
99      info!("Writing to local file {}!", fname );
100      file_on_disk = OpenOptions::new().append(true).create(true).open(datafile_output_file).ok()
101    }
102  }
103 
104  let board_id     = get_board_id().unwrap_or(0) as u8;
105  if board_id == 0 {
106    error!("We could not get the board id!");
107  }
108  let mut sigint_received = false;
109  let mut kill_timer      = Instant::now();
110  let mut n_sent          = 0usize;
111  loop {
112    // check if we should end this
113    if sigint_received && kill_timer.elapsed().as_secs() > 10 {
114      info!("Kill timer expired. Ending thread!");
115      break;
116    }
117    match thread_control.lock() {
118      Ok(tc) => {
119        if tc.stop_flag {
120          info!("Received stop signal. Will stop thread!");
121          sigint_received = true;
122          kill_timer      = Instant::now();
123        }
124      },
125      Err(err) => {
126        trace!("Can't acquire lock! {err}");
127      },
128    }
129    let mut data_type = DataType::Unknown;
130    match data.recv() {
131      Err(err) => trace!("Error receiving TofPacket {err}"),
132      Ok(packet)    => {
133        if matches!(packet.packet_type, TofPacketType::RBEvent) {
134          match RBEvent::extract_datatype(&packet.payload) {
135            Ok(dtype) => {
136              data_type = dtype;
137            }
138            Err(err) => {
139              error!("Unable to extract data type! Err {err}");
140            }
141          }
142        }
143        if write_to_disk && !packet.no_write_to_disk {
144          match &mut file_on_disk {
145            None => error!("We want to write data, however the file is invalid!"),
146            Some(f) => {
147              match f.write_all(packet.to_bytestream().as_slice()) {
148                Err(err) => error!("Writing file to disk failed! Err {err}"),
149                Ok(()) => ()
150              }
151              // local file can be synced, rate should be in general 
152              // low when we are writing to the local file.
153              // Careful with SD card!
154              match f.sync_all() {
155                Err(err) => error!("Unable to sync file to disk! {err}"),
156                Ok(()) => ()
157              }
158            }
159          }
160        }
161        
162        // prefix the board id, except for our Voltage, Timing and NOI 
163        // packages. For those, we prefix with LOCAL 
164        let tp_payload : Vec<u8>;
165        match data_type {
166          // FIXME - this makes that data types for 
167          // calibration will be rerouted back to 
168          // the same board. We have to make that 
169          // behaviour configurable. 
170          // It can simply subscribe to the same 
171          // message?
172          DataType::VoltageCalibration |
173          DataType::TimingCalibration  | 
174          DataType::Noi => {
175            tp_payload = prefix_local(&mut packet.to_bytestream());
176          },
177          _ => {
178            tp_payload = prefix_board_id_noquery(board_id, &mut packet.to_bytestream());
179          }
180        }
181        match data_socket.send(tp_payload,zmq::DONTWAIT) {
182          Ok(_)    => {
183            trace!("0MQ PUB socket.send() SUCCESS!");
184            n_sent += 1;
185          },
186          Err(err) => error!("Not able to send {} over 0MQ PUB socket! {err}", packet.packet_type),
187        }
188        if packet.packet_type == TofPacketType::RBCalibration {
189          //info!("==> last data type {:?}", data_type);
190          info!("==> Calibration packet {} sent!", packet );
191        }
192        if n_sent % 1000 == 0 && n_sent > 0 && print_packets {
193          println!("==> We sent {n_sent} packets!");
194          println!("==> Last Tofpacket type: {} with {} bytes!", packet.packet_type, packet.payload.len());
195        }
196      }
197    }
198  }
199}
200