gondola_core/io/
tof_reader.rs1use crate::prelude::*;
9
10#[derive(Debug)]
16#[cfg_attr(feature="pybindings", pyclass)]
17pub struct TofPacketReader {
18 pub filenames : Vec<String>,
20 file_reader : BufReader<File>,
21 cursor : usize,
23 pub filter : TofPacketType,
25 n_packs_read : usize,
27 n_packs_skipped : usize,
29 pub skip_ahead : usize,
31 pub stop_after : usize,
33 pub file_idx : usize,
35 pub tof_paddles : Arc<HashMap<u8,TofPaddle>>,
38}
39
40impl TofPacketReader {
41
42 pub fn new(filename_or_directory : &str) -> Self {
46 let firstfile : String;
47 #[cfg(feature="database")]
48 let mut paddles = HashMap::<u8, TofPaddle>::new();
49 #[cfg(not(feature="database"))]
50 let paddles = HashMap::<u8, TofPaddle>::new();
51 #[cfg(feature="database")]
52 match TofPaddle::all_as_dict() {
53 Err(err) => {
54 error!("Unable to retrieve paddle information from DB! {err}");
55 }
56 Ok(pdls) => {
57 paddles = pdls;
58 }
59 }
60 match list_path_contents_sorted(&filename_or_directory, None) {
61 Err(err) => {
62 error!("{} does not seem to be either a valid directory or an existing file! {err}", filename_or_directory);
63 panic!("Unable to open files!");
64 }
65 Ok(files) => {
66 firstfile = files[0].clone();
67 match OpenOptions::new().create(false).append(false).read(true).open(&firstfile) {
68 Err(err) => {
69 error!("Unable to open file {firstfile}! {err}");
70 panic!("Unable to create reader from {filename_or_directory}!");
71 }
72 Ok(file) => {
73 let packet_reader = Self {
74 filenames : files,
75 file_reader : BufReader::new(file),
76 cursor : 0,
77 filter : TofPacketType::Unknown,
78 n_packs_read : 0,
79 skip_ahead : 0,
80 stop_after : 0,
81 n_packs_skipped : 0,
82 file_idx : 0,
83 tof_paddles : Arc::new(paddles),
84 };
85 packet_reader
86 }
87 }
88 }
89 }
90 }
91
92
93 pub fn read_next_item(&mut self) -> Option<TofPacket> {
103 let mut buffer = [0];
105 loop {
106 match self.file_reader.read_exact(&mut buffer) {
107 Err(err) => {
108 debug!("Unable to read from file! {err}");
109 self.prime_next_file()?;
110 return self.read_next_item();
111 }
112 Ok(_) => {
113 self.cursor += 1;
114 }
115 }
116 if buffer[0] != 0xAA {
117 continue;
118 } else {
119 match self.file_reader.read_exact(&mut buffer) {
120 Err(err) => {
121 debug!("Unable to read from file! {err}");
122 self.prime_next_file()?;
123 return self.read_next_item();
124 }
125 Ok(_) => {
126 self.cursor += 1;
127 }
128 }
129
130 if buffer[0] != 0xAA {
131 continue;
132 } else {
133 match self.file_reader.read_exact(&mut buffer) {
135 Err(err) => {
136 debug!("Unable to read from file! {err}");
137 self.prime_next_file()?;
138 return self.read_next_item();
139 }
140 Ok(_) => {
141 self.cursor += 1;
142 }
143 }
144 let ptype = TofPacketType::from(buffer[0]);
145 let mut buffer_psize = [0,0,0,0];
147 match self.file_reader.read_exact(&mut buffer_psize) {
148 Err(err) => {
149 debug!("Unable to read from file! {err}");
150 self.prime_next_file()?;
151 return self.read_next_item();
152 }
153 Ok(_) => {
154 self.cursor += 4;
155 }
156 }
157 let vec_data = buffer_psize.to_vec();
158 let size = parse_u32(&vec_data, &mut 0);
159 if ptype != self.filter && self.filter != TofPacketType::Unknown {
160 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
161 Err(err) => {
162 debug!("Unable to read more data! {err}");
163 self.prime_next_file()?;
164 return self.read_next_item();
165 }
166 Ok(_) => {
167 self.cursor += size as usize;
168 }
169 }
170 continue; }
172 if self.skip_ahead > 0 && self.n_packs_skipped < self.skip_ahead {
175 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
177 Err(err) => {
178 debug!("Unable to read more data! {err}");
179 self.prime_next_file()?;
180 return self.read_next_item();
181 }
182 Ok(_) => {
183 self.n_packs_skipped += 1;
184 self.cursor += size as usize;
185 }
186 }
187 continue; }
189 if self.stop_after > 0 && self.n_packs_read >= self.stop_after {
190 match self.file_reader.seek(SeekFrom::Current(size as i64)) {
192 Err(err) => {
193 debug!("Unable to read more data! {err}");
194 self.prime_next_file()?;
195 return self.read_next_item();
196 }
197 Ok(_) => {
198 self.cursor += size as usize;
199 }
200 }
201 continue; }
203
204 let mut tp = TofPacket::new();
205 tp.packet_type = ptype;
206 let mut payload = vec![0u8;size as usize];
207
208 match self.file_reader.read_exact(&mut payload) {
209 Err(err) => {
210 debug!("Unable to read from file! {err}");
211 self.prime_next_file()?;
212 return self.read_next_item();
213 }
214 Ok(_) => {
215 self.cursor += size as usize;
216 }
217 }
218 tp.payload = payload;
219 let mut tail = vec![0u8; 2];
221 match self.file_reader.read_exact(&mut tail) {
222 Err(err) => {
223 debug!("Unable to read from file! {err}");
224 self.prime_next_file()?;
225 return self.read_next_item();
226 }
227 Ok(_) => {
228 self.cursor += 2;
229 }
230 }
231 let tail = parse_u16(&tail,&mut 0);
232 if tail != TofPacket::TAIL {
233 debug!("TofPacket TAIL signature wrong!");
234 return None;
235 }
236 if tp.packet_type == TofPacketType::TofEvent {
237 tp.tof_paddles = self.tof_paddles.clone();
238 }
239 self.n_packs_read += 1;
240 return Some(tp);
241 }
242 } } } pub fn get_current_filename(&self) -> Option<String> {
249 if self.filenames.len() <= self.file_idx {
251 return None;
252 }
253 Some(self.filenames[self.file_idx].clone())
254 }
255
256 pub fn count_packets(&mut self) -> usize {
265 let _ = self.rewind();
266 let mut nframes = 0usize;
267 let mut buffer = [0];
268 let bar_template : &str = "[{elapsed_precise}] {prefix} {msg} {spinner} {bar:60.blue/grey} {pos:>7}/{len:7}";
269 let bar_style = ProgressStyle::with_template(bar_template).expect("Unable to set progressbar style!");
270 let bar = ProgressBar::new(self.filenames.len() as u64);
271 bar.set_position(0);
272 bar.set_message (String::from("Counting packets.."));
273 bar.set_prefix ("\u{2728}");
274 bar.set_style (bar_style);
275 bar.set_position(self.file_idx as u64);
276 loop {
277 match self.file_reader.read_exact(&mut buffer) {
278 Err(err) => {
279 debug!("Unable to read from file! {err}");
280 match self.prime_next_file() {
281 None => break,
282 Some(_) => {
283 bar.set_position(self.file_idx as u64);
284 continue;
285 }
286 };
287 }
288 Ok(_) => {
289 self.cursor += 1;
290 }
291 }
292 if buffer[0] != 0xAA {
293 continue;
294 } else {
295 match self.file_reader.read_exact(&mut buffer) {
296 Err(err) => {
297 debug!("Unable to read from file! {err}");
298 match self.prime_next_file() {
299 None => break,
300 Some(_) => {
301 bar.set_position(self.file_idx as u64);
302 continue;
303 }
304 };
305 }
306 Ok(_) => {
307 self.cursor += 1;
308 }
309 }
310 if buffer[0] != 0xAA {
312 continue;
313 } else {
314 match self.file_reader.read_exact(&mut buffer) {
317 Err(err) => {
318 debug!("Unable to read from file! {err}");
319 match self.prime_next_file() {
320 None => break,
321 Some(_) => {
322 bar.set_position(self.file_idx as u64);
323 continue;
324 }
325 };
326 }
327 Ok(_) => {
328 self.cursor += 1;
329 }
330 }
331 let mut buffer_psize = [0,0,0,0];
332 match self.file_reader.read_exact(&mut buffer_psize) {
333 Err(_err) => {
334 match self.prime_next_file() {
335 None => break,
336 Some(_) => {
337 bar.set_position(self.file_idx as u64);
338 continue;
339 }
340 }
341 }
342 Ok(_) => {
343 self.cursor += 4;
344 }
345 }
346 let vec_data = buffer_psize.to_vec();
347 let size = parse_u32(&vec_data, &mut 0);
348 let mut temp_buffer = vec![0; size as usize];
349 match self.file_reader.read_exact(&mut temp_buffer) {
350 Err(err) => {
353 error!("Unable to read {size} bytes from {}! {err}", self.get_current_filename().unwrap());
354 match self.prime_next_file() {
355 None => break,
356 Some(_) => {
357 bar.set_position(self.file_idx as u64);
358 continue;
359 }
360 }
361 }
362 Ok(_) => {
363 self.cursor += size as usize;
364 nframes += 1;
365 }
366 }
367 }
368 } } bar.finish_with_message("Done!");
371 let _ = self.rewind();
372 nframes
373 } pub fn first_packet(&mut self) -> Option<TofPacket> {
377 match self.rewind() {
378 Err(err) => {
379 error!("Error when rewinding files! {err}");
380 }
381 Ok(_) => ()
382 }
383 let pack = self.read_next_item();
384 match self.rewind() {
385 Err(err) => {
386 error!("Error when rewinding files! {err}");
387 }
388 Ok(_) => ()
389 }
390 return pack;
391 }
392
393 pub fn last_packet(&mut self) -> Option<TofPacket> {
395 self.file_idx = self.filenames.len() - 1;
396 let lastfilename = self.filenames[self.file_idx].clone();
397 let lastfile = OpenOptions::new().create(false).append(false).read(true).open(lastfilename).expect("Unable to open file {nextfilename}");
398 self.file_reader = BufReader::new(lastfile);
399 self.cursor = 0;
400 let mut tp = TofPacket::new();
401 let mut idx = 0;
402 loop {
403 match self.read_next_item() {
404 None => {
405 match self.rewind() {
406 Err(err) => {
407 error!("Error when rewinding files! {err}");
408 }
409 Ok(_) => ()
410 }
411 if idx == 0 {
412 return None;
413 } else {
414 return Some(tp);
415 }
416 }
417 Some(pack) => {
418 idx += 1;
419 tp = pack;
420 continue;
421 }
422 }
423 }
424 }
425
426
427}
428
429impl fmt::Display for TofPacketReader {
430 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
431 let mut range_repr = String::from("");
432 if self.skip_ahead > 0 {
433 range_repr += &(format!("({}", self.skip_ahead));
434 } else {
435 range_repr += "(";
436 }
437 if self.stop_after > 0 {
438 range_repr += &(format!("..{})", self.stop_after));
439 } else {
440 range_repr += "..)";
441 }
442 let repr = format!("<TofPacketReader :read {} packets, filter {}, range {}\n files {:?}>", self.n_packs_read, self.filter, range_repr, self.filenames);
443 write!(f, "{}", repr)
444 }
445}
446
447reader!(TofPacketReader,TofPacket);
448
449#[cfg(feature="pybindings")]
450#[pymethods]
451impl TofPacketReader {
452
453 #[new]
454 #[pyo3(signature = (filename_or_directory, filter = None))]
455 fn new_py(filename_or_directory : &Bound<'_,PyAny>, filter : Option<TofPacketType>) -> PyResult<Self> {
456 let mut string_value = String::from("foo");
457 if let Ok(s) = filename_or_directory.extract::<String>() {
458 string_value = s;
459 } if let Ok(fspath_method) = filename_or_directory.getattr("__fspath__") {
461 if let Ok(fspath_result) = fspath_method.call0() {
462 if let Ok(py_string) = fspath_result.extract::<String>() {
463 string_value = py_string;
464 }
465 }
466 }
467 let mut reader = Self::new(&string_value);
468 match filter {
469 None => (),
470 Some(ftr) => {
471 reader.filter = ftr;
472 }
473 }
474 Ok(reader)
475 }
484
485 #[getter]
486 fn first(&mut self) -> Option<TofPacket> {
487 self.first_packet()
488 }
489
490 #[getter]
491 fn last(&mut self) -> Option<TofPacket> {
492 self.last_packet()
493 }
494
495 #[getter]
496 fn filenames(&self) -> Vec<String> {
497 self.filenames.clone()
498 }
499 #[getter]
512 #[pyo3(name="current_filename")]
513 fn get_current_filename_py(&self) -> Option<String> {
514 self.get_current_filename()
515 }
516
517 #[pyo3(name="rewind")]
521 fn rewind_py(&mut self) -> PyResult<()> {
522 match self.rewind() {
523 Err(err) => {
524 return Err(PyValueError::new_err(err.to_string()));
525 }
526 Ok(_) => Ok(())
527 }
528 }
529
530 #[pyo3(name="count_packets")]
539 fn count_packets_py(&mut self) -> usize {
540 self.count_packets()
541 }
542
543 fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
544 slf
545 }
546
547 fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<TofPacket> {
548 slf.next()
549 }
558}
559
560#[cfg(feature="pybindings")]
561pythonize_display!(TofPacketReader);
562
563