Skip to main content

gondola_core/io/
telemetry_writer.rs

1// The following file is part of gaps-online-software and published 
2// under the GPLv3 license
3
4use crate::prelude::*;
5
6/// Write TelemetryPackets to disk.
7///
8/// Operates sequentially, packets can 
9/// be added one at a time, then will
10/// be synced to disk.
11#[cfg_attr(feature="pybindings", pyclass)]
12pub struct TelemetryPacketWriter {
13
14  pub file            : File,
15  /// location to store the file
16  pub file_path       : String,
17  /// The maximum number of packets 
18  /// for a single file. Ater this 
19  /// number is reached, a new 
20  /// file is started.
21  pub pkts_per_file   : usize,
22  /// The maximum number of (Mega)bytes
23  /// per file. After this a new file 
24  /// is started
25  pub mbytes_per_file : usize,
26  pub file_name       : String,
27  pub last_timestamp  : String,
28  file_id             : usize,
29  /// internal packet counter, number of 
30  /// packets which went through the writer
31  n_packets           : usize,
32  /// internal counter for bytes written in 
33  /// this file
34  file_nbytes_wr      : usize,
35}
36
37#[cfg(feature="pybindings")]
38#[pymethods]
39impl TelemetryPacketWriter {
40
41#[new]
42  fn new_py(filepath : String, packet : &TelemetryPacket) -> PyResult<Self> {
43    let writer = Self::new(filepath, packet);
44    Ok(writer)
45  }
46
47  #[pyo3(name="add_telemetry_packet")]
48  pub fn add_telemetry_packet_py(&mut self, packet : &TelemetryPacket) {
49    self.add_telemetry_packet(packet);
50  }
51}
52
53impl TelemetryPacketWriter {
54
55  /// Instantiate a new PacketWriter 
56  ///
57  /// # Arguments
58  ///
59  pub fn new(mut file_path : String, first_packet : &TelemetryPacket) -> Self {
60    let file : File;
61    let file_name : String;
62    if !file_path.ends_with("/") {
63      file_path += "/";
64    }
65    let utc_timestamp = Self::get_timestamp_from_packet(first_packet);
66    let filename = file_path.clone() + "RAW" + &utc_timestamp + ".bin";
67    let path     = Path::new(&filename); 
68    info!("Writing to file {filename}");
69    file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
70    file_name = filename;
71    let mut writer = Self {
72      file,
73      file_path        : file_path,
74      pkts_per_file    : 0,
75      mbytes_per_file  : 420,
76      file_nbytes_wr   : 0,    
77      file_id          : 1,
78      n_packets        : 0,
79      file_name        : file_name,
80      last_timestamp   : utc_timestamp,
81    };
82    writer.add_telemetry_packet(first_packet);
83    writer
84  }
85
86  /// Extract the gcutime from the packet and use it as 
87  /// the timestamp for the next file to be written
88  pub fn get_timestamp_from_packet(packet : &TelemetryPacket) -> String {
89    let gcutime = packet.header.get_gcutime();
90    get_utc_timestamp_from_unix(gcutime).unwrap_or(String::from("000000_000000")) 
91  }
92
93  pub fn get_file(&self) -> File { 
94    let file : File;
95    let filename = format!("{}RAW{}.bin", self.file_path, self.last_timestamp);
96    //let filename = self.file_path.clone() + &get_runfilename(runid,self.file_id as u64, None);
97    let path     = Path::new(&filename); 
98    info!("Writing to file {filename}");
99    file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
100    file
101  }
102
103  /// Induce serialization to disk for a TofPacket
104  ///
105  ///
106  pub fn add_telemetry_packet(&mut self, packet : &TelemetryPacket) {
107    self.last_timestamp = Self::get_timestamp_from_packet(packet);
108    let buffer = packet.to_bytestream();
109    self.file_nbytes_wr += buffer.len();
110    match self.file.write_all(buffer.as_slice()) {
111      Err(err) => error!("Writing to file to path {} failed! {}", self.file_path, err),
112      Ok(_)    => ()
113    }
114    self.n_packets += 1;
115    let mut newfile = false;
116    if self.pkts_per_file != 0 {
117      if self.n_packets == self.pkts_per_file {
118        newfile = true;
119        self.n_packets = 0;
120      }
121    } else if self.mbytes_per_file != 0 {
122      // multiply by mebibyte
123      if self.file_nbytes_wr >= self.mbytes_per_file * 1_048_576 {
124        newfile = true;
125        self.file_nbytes_wr = 0;
126      }
127    }
128    if newfile {
129        //let filename = self.file_prefix.clone() + "_" + &self.file_id.to_string() + ".tof.gaps";
130        match self.file.sync_all() {
131          Err(err) => {
132            error!("Unable to sync file to disc! {err}");
133          },
134          Ok(_) => ()
135        }
136        self.file = self.get_file();
137        self.file_id += 1;
138      }
139    debug!("TelemetryPacket written!");
140  }
141}
142