1use std::fmt;
17
18use std::fs::{
19 self,
20 File,
21 OpenOptions
22};
23
24use std::path::Path;
25use std::io::{
26 self,
27 BufReader,
28 Seek,
29 SeekFrom,
30 Read,
31 ErrorKind
32};
33use regex::Regex;
34
35use indicatif::{
36 ProgressBar,
37 ProgressStyle
38};
39
40use crate::frame::CRFrame;
41use crate::serialization::CRSerializeable;
42use crate::parsers::*;
43
44#[derive(Debug)] pub struct CRReader {
49 pub filenames : Vec<String>,
51 pub file_index : usize,
54 file_reader : BufReader<File>,
57 cursor : usize,
60 n_packs_read : usize,
62 n_packs_skipped : usize,
64 pub n_errors : usize,
67 pub skip_ahead : usize,
69 pub stop_after : usize,
71 }
77
78impl fmt::Display for CRReader {
79 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
80 let mut range_repr = String::from("");
81 if self.skip_ahead > 0 {
82 range_repr += &(format!("({}", self.skip_ahead));
83 } else {
84 range_repr += "(";
85 }
86 if self.stop_after > 0 {
87 range_repr += &(format!("..{})", self.stop_after));
88 } else {
89 range_repr += "..)";
90 }
91 let mut repr = String::from("<CRReader :");
92 repr += "\n -- files:";
93 for k in &self.filenames {
94 repr += &format!("\n -- {k}");
95 }
96 if self.filenames.len() > 0 {
97 repr += &format!("\n current : {}", self.get_current_filename().unwrap());
98 }
99 repr += &String::from("\n -- -- -- -- -- -- -- -- -- -- -- --");
100 repr += &format!("\n read {} packets, {} errors, range {}>", self.n_packs_read, self.n_errors, range_repr);
101 write!(f, "{}", repr)
102 }
103}
104
105impl CRReader {
106
107 pub fn new(filename_or_directory : String) -> Result<Self, io::Error> {
114 let infiles = Self::list_path_contents_sorted(&filename_or_directory, None)?;
137 if infiles.len() == 0 {
138 error!("Unable to read files from {filename_or_directory}. Is this a valid path?");
139 return Err(io::Error::new(ErrorKind::NotFound, "Unable to find given path!"))
140 }
141 let firstfile = infiles[0].clone();
142 let file = OpenOptions::new().create(false).append(false).read(true).open(&firstfile).expect("Unable to open file {filename}");
143 let packet_reader = Self {
144 filenames : infiles,
145 file_index : 0,
146 file_reader : BufReader::new(file),
147 cursor : 0,
148 n_packs_read : 0,
149 n_errors : 0,
150 skip_ahead : 0,
151 stop_after : 0,
152 n_packs_skipped : 0,
153 };
156 Ok(packet_reader)
157 }
158
159 pub fn get_current_filename(&self) -> Option<String> {
162 if self.filenames.len() <= self.file_index {
164 return None;
165 }
166 Some(self.filenames[self.file_index].clone())
167 }
168
169 fn list_path_contents_sorted(input: &str, pattern: Option<Regex>) -> Result<Vec<String>, io::Error> {
180 let path = Path::new(input);
181 match fs::metadata(path) {
182 Ok(metadata) => {
183 if metadata.is_file() {
184 let fname = String::from(input);
185 return Ok(vec![fname]);
186 }
187 if metadata.is_dir() {
188 let re : Regex;
189 match pattern {
190 None => {
191 re = Regex::new(r"Run\d+_\d+\.(\d{6})_(\d{6})UTC(\.tof)?\.gaps$").unwrap();
192 }
193 Some(_re) => {
194 re = _re;
195 }
196 }
197 let mut entries: Vec<(u32, u32, String)> = fs::read_dir(path)?
198 .filter_map(Result::ok) .filter_map(|entry| {
200 let filename = format!("{}/{}", path.display(), entry.file_name().into_string().ok()?);
201 re.captures(&filename.clone()).map(|caps| {
202 let date = caps.get(1)?.as_str().parse::<u32>().ok()?;
203 let time = caps.get(2)?.as_str().parse::<u32>().ok()?;
204 Some((date, time, filename))
205 })?
206 })
207 .collect();
208
209 entries.sort_by(|a, b| (a.0, a.1).cmp(&(b.0, b.1)));
211 return Ok(entries.into_iter().map(|(_, _, name)| name).collect());
213 }
214 Err(io::Error::new(ErrorKind::Other, "Path exists but is neither a file nor a directory"))
215 }
216 Err(e) => Err(e),
217 }
218 }
219
220 pub fn first_frame(&mut self) -> Option<CRFrame> {
227 match self.rewind() {
228 Err(err) => {
229 error!("Error when rewinding files! {err}");
230 }
231 Ok(_) => ()
232 }
233 let frame = self.get_next_frame();
234 match self.rewind() {
235 Err(err) => {
236 error!("Error when rewinding files! {err}");
237 }
238 Ok(_) => ()
239 }
240 return frame;
241 }
242
243 pub fn last_frame(&mut self) -> Option<CRFrame> {
245 self.file_index = self.filenames.len() - 1;
246 let lastfilename = self.filenames[self.file_index].clone();
247 let lastfile = OpenOptions::new().create(false).append(false).read(true).open(lastfilename).expect("Unable to open file {nextfilename}");
248 self.file_reader = BufReader::new(lastfile);
249 self.cursor = 0;
250 let mut frame = CRFrame::new();
251 let mut idx = 0;
252 loop {
253 match self.get_next_frame() {
254 None => {
255 match self.rewind() {
256 Err(err) => {
257 error!("Error when rewinding files! {err}");
258 }
259 Ok(_) => ()
260 }
261 if idx == 0 {
262 return None;
263 } else {
264 return Some(frame);
265 }
266 }
267 Some(_fr) => {
268 idx += 1;
269 frame = _fr;
270 continue;
271 }
272 }
273 }
274 }
275
276 pub fn get_n_frames(&mut self) -> usize {
278 let _ = self.rewind();
279 let mut nframes = 0usize;
280 let mut buffer = [0];
281 let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
282 let bar_style = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
283 let bar = ProgressBar::new(self.filenames.len() as u64);
284 bar.set_position(0);
285 bar.set_message (String::from("Counting frames.."));
286 bar.set_prefix ("\u{2728}");
287 bar.set_style (bar_style);
288 bar.set_position(self.file_index as u64);
289 loop {
290 match self.file_reader.read_exact(&mut buffer) {
291 Err(err) => {
292 debug!("Unable to read from file! {err}");
293 match self.progress_file() {
294 None => break,
295 Some(_) => {
296 bar.set_position(self.file_index as u64);
297 continue;
298 }
299 };
300 }
301 Ok(_) => {
302 self.cursor += 1;
303 }
304 }
305 if buffer[0] != 0xAA {
306 continue;
307 } else {
308 match self.file_reader.read_exact(&mut buffer) {
309 Err(err) => {
310 debug!("Unable to read from file! {err}");
311 match self.progress_file() {
312 None => break,
313 Some(_) => {
314 bar.set_position(self.file_index as u64);
315 continue;
316 }
317 };
318 }
319 Ok(_) => {
320 self.cursor += 1;
321 }
322 }
323 if buffer[0] != 0xAA {
325 continue;
326 } else {
327 let mut buffer_psize = [0,0,0,0,0,0,0,0];
329 match self.file_reader.read_exact(&mut buffer_psize) {
330 Err(_err) => {
331 match self.progress_file() {
332 None => break,
333 Some(_) => {
334 bar.set_position(self.file_index as u64);
335 continue;
336 }
337 }
338 }
339 Ok(_) => {
340 self.cursor += 8;
341 }
342 }
343 let vec_data = buffer_psize.to_vec();
344 let size = parse_u64(&vec_data, &mut 0);
345 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
346 Err(err) => {
347 error!("Unable to read {size} bytes from {}! {err}", self.get_current_filename().unwrap());
348 match self.progress_file() {
349 None => break,
350 Some(_) => {
351 bar.set_position(self.file_index as u64);
352 continue;
353 }
354 }
355 }
356 Ok(_) => {
357 self.cursor += size as usize;
358 nframes += 1;
359 }
360 }
361 }
362 } } bar.finish_with_message("Done!");
365 let _ = self.rewind();
366 nframes
367 } fn progress_file(&mut self) -> Option<()> {
374 if self.file_index == self.filenames.len() -1 {
375 return None;
376 } else {
377 self.file_index += 1;
378 let nextfilename = self.filenames[self.file_index].clone();
379 let nextfile = OpenOptions::new().create(false).append(false).read(true).open(nextfilename).expect("Unable to open file {nextfilename}");
380 self.file_reader = BufReader::new(nextfile);
381 self.cursor = 0;
382 return Some(());
383 }
384 }
385
386
387 pub fn rewind(&mut self) -> io::Result<()> {
390 let firstfile = &self.filenames[0];
391 let file = OpenOptions::new().create(false).append(false).read(true).open(&firstfile)?;
392 self.file_reader = BufReader::new(file);
393 self.file_index = 0;
394 self.cursor = 0;
395 Ok(())
396 }
397
398 pub fn get_next_frame(&mut self) -> Option<CRFrame> {
404 let mut buffer = [0];
407 loop {
408 match self.file_reader.read_exact(&mut buffer) {
409 Err(err) => {
410 debug!("Unable to read from file! {err}");
411 self.progress_file()?;
413 return self.get_next_frame();
414 }
415 Ok(_) => {
416 self.cursor += 1;
417 }
418 }
419 if buffer[0] != 0xAA {
420 continue;
421 } else {
422 match self.file_reader.read_exact(&mut buffer) {
423 Err(err) => {
424 debug!("Unable to read from file! {err}");
425 self.progress_file()?;
426 return self.get_next_frame();
427 }
428 Ok(_) => {
429 self.cursor += 1;
430 }
431 }
432
433 if buffer[0] != 0xAA {
434 continue;
435 } else {
436 let mut buffer_psize = [0,0,0,0,0,0,0,0];
438 match self.file_reader.read_exact(&mut buffer_psize) {
439 Err(err) => {
440 debug!("Unable to read from file! {err}");
441 self.progress_file()?;
442 return self.get_next_frame();
443 }
444 Ok(_) => {
445 self.cursor += 8;
446 }
447 }
448
449 let vec_data = buffer_psize.to_vec();
450 let size = parse_u64(&vec_data, &mut 0);
452 if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
456 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
458 Err(err) => {
459 debug!("Unable to read more data! {err}");
460 self.progress_file()?;
461 return self.get_next_frame();
462 }
463 Ok(_) => {
464 self.n_packs_skipped += 1;
465 self.cursor += size as usize;
466 }
467 }
468 continue; }
470 if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
471 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
473 Err(err) => {
474 debug!("Unable to read more data! {err}");
475 self.progress_file()?;
476 return self.get_next_frame();
477 }
478 Ok(_) => {
479 self.cursor += size as usize;
480 }
481 }
482 continue; }
484
485 let mut frame = CRFrame::new();
486 let mut payload = vec![0u8;size as usize];
487
488 match self.file_reader.read_exact(&mut payload) {
489 Err(err) => {
490 debug!("Unable to read from file! {err}");
491 self.progress_file()?;
492 return self.get_next_frame();
493 }
494 Ok(_) => {
495 self.cursor += size as usize;
496 }
497 }
498 let mut in_frame_pos = 0usize;
499 frame.index = CRFrame::parse_index(&payload, &mut in_frame_pos);
500 frame.bytestorage = payload[in_frame_pos..].to_vec();
501
502 let mut tail = vec![0u8; 2];
505 match self.file_reader.read_exact(&mut tail) {
506 Err(err) => {
507 debug!("Unable to read from file! {err}");
508 self.progress_file()?;
509 return self.get_next_frame();
510 }
511 Ok(_) => {
512 self.cursor += 2;
513 }
514 }
515 let tail = parse_u16(&tail,&mut 0);
516 if tail != CRFrame::CRTAIL {
517 debug!("CRFrame TAIL signature wrong!");
518 return None;
519 }
520 self.n_packs_read += 1;
521 return Some(frame);
522 }
523 } } } }
527
528impl Iterator for CRReader {
529 type Item = CRFrame;
530
531 fn next(&mut self) -> Option<Self::Item> {
532 self.get_next_frame()
533 }
534}
535