Skip to main content

gondola_core/io/caraspace/
writer.rs

1// This file is part of gaps-online-software and published 
2// under the GPLv3 license
3
4use crate::prelude::*;
5
6/// Write CRFrames to disk.
7///
8/// Operates sequentially, frames can 
9/// be added one at a time, then will
10/// be synced to disk.
11#[cfg_attr(feature="pybindings", pyclass)]
12pub struct CRWriter {
13
14  pub file            : File,
15  /// location to store the file
16  pub file_path       : String,
17  /// The maximum number of packets 
18  /// for a single file. Ater this 
19  /// number is reached, a new 
20  /// file is started.
21  pub pkts_per_file   : usize,
22  /// The maximum number of (Mega)bytes
23  /// per file. After this a new file 
24  /// is started
25  pub mbytes_per_file     : usize,
26  pub file_name           : String,
27  pub run_id              : u32,
28  file_id                 : usize,
29  /// internal packet counter, number of 
30  /// packets which went through the writer
31  n_packets               : usize,
32  /// internal counter for bytes written in 
33  /// this file
34  file_nbytes_wr          : usize,
35  /// it can also take a timestamp, in case we 
36  /// don't want to use the current time when the 
37  /// file is written
38  pub file_timestamp      : Option<String>,
39  /// if writing a new file every gcu seconds, keep 
40  /// the first gcutimestamp 
41  pub first_gcu_timestamp : Option<u64>,
42  /// Write a new file every gcu seconds (from telemetry packet header) 
43  pub file_len_gcu_sec    : Option<u32>,
44}
45
46impl CRWriter {
47
48  /// Instantiate a new PacketWriter 
49  ///
50  /// # Arguments
51  ///
52  /// * file_path        : Path to store the file under
53  /// * run_id           : Run ID for this file (will be written in filename)
54  /// * subrun_id        : Sub-Run ID for this file (will be written in filename. 
55  ///                      If None, a generic "0" will be used
56  /// * timestamp        : The writer will add an automatic timestamp to the current file
57  ///                      based on the current time. This option allows to overwrite 
58  ///                      that behaviour
59  /// * file_len_gcu_sec : If not None, this will use the gcu time from added packets 
60  ///                      as the timestamp in the filename, and will write a new file 
61  ///                      every given number of seconds
62  pub fn new(mut file_path : String, run_id : u32,
63             subrun_id : Option<u64>, timestamp : Option<String>, file_len_gcu_sec : Option<u32>) -> Self {
64    if file_len_gcu_sec.is_some() {
65      if timestamp.is_none() {
66        panic!("If the writer should operate in the mode where it writes a new file every x seconds depending on the delta of gcu times in the packets, it needs the first timestamp given. This needs to be the first timestamp of the packets to be written");
67      }
68    }
69    let file : File;
70    let file_name : String;
71    if !file_path.ends_with("/") {
72      file_path += "/";
73    }
74    let filename : String;
75    let first_timestamp = timestamp.clone();
76    if let Some(subrun) = subrun_id {
77      filename = format!("{}{}", file_path, get_runfilename(run_id, subrun, None, timestamp, false));
78    } else {
79      filename = format!("{}{}", file_path, get_runfilename(run_id, 0, None, timestamp, false));
80    }
81    let path     = Path::new(&filename); 
82    //println!("Writing to file {filename}");
83    file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
84    file_name = filename;
85    let mut first_gcu_timestamp = None;
86    if first_timestamp.is_some() {
87      first_gcu_timestamp = get_unix_timestamp(&first_timestamp.unwrap(), None);
88    }
89    Self {
90      file,
91      file_path           : file_path,
92      pkts_per_file       : 0,
93      mbytes_per_file     : 420,
94      run_id              : run_id,
95      file_nbytes_wr      : 0,    
96      file_id             : 1,
97      n_packets           : 0,
98      file_name           : file_name,
99      file_timestamp      : None,
100      first_gcu_timestamp : first_gcu_timestamp,
101      file_len_gcu_sec    : file_len_gcu_sec
102    }
103  }
104
105  pub fn get_file(&self, timestamp : Option<String>) -> File { 
106    let file : File;
107    let filename = format!("{}{}", self.file_path, get_runfilename(self.run_id, self.file_id as u64, None, timestamp, false));
108    //let filename = self.file_path.clone() + &get_runfilename(runid,self.file_id as u64, None);
109    let path     = Path::new(&filename); 
110    info!("Writing to file {filename}");
111    file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
112    file
113  }
114
115  /// Induce serialization to disk for a CRFrame
116  ///
117  ///
118  pub fn add_frame(&mut self, frame : &CRFrame) {
119    let buffer = frame.to_bytestream();
120    self.file_nbytes_wr += buffer.len();
121    match self.file.write_all(buffer.as_slice()) {
122      Err(err) => error!("Writing to file to path {} failed! {}", self.file_path, err),
123      Ok(_)    => ()
124    }
125    self.n_packets += 1;
126    let mut newfile = false;
127    if self.pkts_per_file != 0 {
128      if self.n_packets == self.pkts_per_file {
129        newfile = true;
130        self.n_packets = 0;
131      }
132    } else if self.mbytes_per_file != 0 {
133      // multiply by mebibyte
134      if self.file_nbytes_wr >= self.mbytes_per_file * 1_048_576 {
135        newfile = true;
136        self.file_nbytes_wr = 0;
137      }
138    } else if self.file_len_gcu_sec.is_some() {
139      if frame.timestamp.clone().unwrap() - self.first_gcu_timestamp.unwrap() as f64 > self.file_len_gcu_sec.unwrap() as f64 {
140        newfile = true; 
141        self.file_timestamp = get_utc_timestamp_from_unix(frame.timestamp.clone().unwrap()); 
142        //println!("starting new file with {}", self.file_timestamp.clone().unwrap());
143        self.first_gcu_timestamp = Some(frame.timestamp.unwrap() as u64);
144      }
145    }
146    if newfile {
147        //let filename = self.file_prefix.clone() + "_" + &self.file_id.to_string() + ".tof.gaps";
148        match self.file.sync_all() {
149          Err(err) => {
150            error!("Unable to sync file to disc! {err}");
151          },
152          Ok(_) => ()
153        }
154        self.file = self.get_file(self.file_timestamp.clone());
155        self.file_id += 1;
156        //let path  = Path::new(&filename);
157        //println!("==> [TOFPACKETWRITER] Will start a new file {}", path.display());
158        //self.file = OpenOptions::new().create(true).append(true).open(path).expect("Unable to open file {filename}");
159        //self.n_packets = 0;
160        //self.file_id += 1;
161      }
162  debug!("CRFrame written!");
163  }
164}
165
166impl Default for CRWriter {
167  fn default() -> Self {
168    CRWriter::new(String::from(""),0,None, None, None)
169  }
170}
171
172impl fmt::Display for CRWriter {
173  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
174    let mut repr = String::from("<CRWriter:");
175    repr += &(format!("\n  path: {}", self.file_path)); 
176    repr += ">";
177    write!(f, "{}", repr)
178  }
179}
180
181#[cfg(feature="pybindings")]
182#[pymethods]
183impl CRWriter {
184  
185  #[new]
186  #[pyo3(signature = (filename, run_id, subrun_id = None, timestamp = None, file_len_gcu_sec = None))]
187  fn new_py(filename : String, run_id : u32, subrun_id : Option<u64>, timestamp : Option<String>, file_len_gcu_sec : Option<u32>) -> Self {
188    Self::new(filename, run_id, subrun_id, timestamp, file_len_gcu_sec)
189  }
190 
191  fn set_mbytes_per_file(&mut self, fsize : usize) {
192    self.mbytes_per_file = fsize;
193  }
194
195  fn set_file_timestamp(&mut self, timestamp : String) {
196    self.file_timestamp = Some(timestamp);
197  }
198  
199  #[pyo3(name="add_frame")]
200  fn add_frame_py(&mut self, frame : CRFrame) {
201    self.add_frame(&frame);  
202  }
203}
204
205#[cfg(feature="pybindings")]
206pythonize_display!(CRWriter);
207