1use crate::prelude::*;
10
11use crossbeam_channel::Sender;
12
13pub fn socket_wrap_tofstream(address : &str,
16 tp_sender : Sender<TofPacket>) {
17 let ctx = zmq::Context::new();
20 let socket = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
22 socket.connect(address).expect("Unable to connect to data (PUB) socket {adress}");
23 socket.set_subscribe(b"").expect("Can't subscribe to any message on 0MQ socket!");
24 info!("0MQ SUB socket connected to address {address}");
26 loop {
30 match socket.recv_bytes(0) {
31 Err(err) => error!("Can't receive TofPacket! {err}"),
32 Ok(payload) => {
33 match TofPacket::from_bytestream(&payload, &mut 0) {
34 Ok(tp) => {
35 match tp_sender.send(tp) {
36 Ok(_) => (),
37 Err(err) => error!("Can't send TofPacket over channel! {err}")
38 }
39 }
40 Err(err) => {
41 debug!("Can't decode payload! {err}");
42 match TofPacket::from_bytestream(&payload, &mut 4) {
45 Err(err) => {
46 error!("Don't understand bytestream! {err}");
47 },
48 Ok(tp) => {
49 match tp_sender.send(tp) {
50 Ok(_) => (),
51 Err(err) => error!("Can't send TofPacket over channel! {err}")
52 }
53 }
54 }
55 }
56 }
57 }
58 }
59 }
60}
61
62pub fn socket_wrap_telemetry(address : &str,
77 cachesize : usize,
78 tele_sender : Sender<TelemetryPacket>) {
79 let ctx = zmq::Context::new();
80 let socket = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
85 match socket.connect(&address) {
86 Err(err) => {
87 error!("Unable to connect to data (PUB) socket {address}! {err}");
88 panic!("Can not connect to zmq PUB socket!");
89 }
90 Ok(_) => ()
91 }
92 let mut cache = VecDeque::<u16>::with_capacity(cachesize);
93 socket.set_subscribe(b"") .expect("Can't subscribe to any message on 0MQ socket! {err}");
94 loop {
95 match socket.recv_bytes(0) {
96 Err(err) => error!("Can't receive TofPacket! {err}"),
97 Ok(mut payload) => {
98 match TelemetryPacketHeader::from_bytestream(&payload, &mut 0) {
99 Err(err) => {
100 error!("Can not decode telemtry header! {err}");
101 }
105 Ok(header) => {
106 let mut packet = TelemetryPacket::new();
107 if payload.len() > TelemetryPacketHeader::SIZE {
108 payload.drain(0..TelemetryPacketHeader::SIZE);
109 }
110 if cache.contains(&header.counter) {
111 continue;
113 } else {
114 cache.push_back(header.counter);
115 }
116 if cache.len() == cachesize {
117 cache.pop_front();
118 }
119
120 packet.header = header;
121 packet.payload = payload;
122 match tele_sender.send(packet) {
123 Err(err) => error!("Can not send telemetry packet to downstream! {err}"),
124 Ok(_) => ()
125 }
126 }
127 }
128 }
129 }
130 }
131}
132
133
134
135
136use crc::Crc;
138
139const REVERSE_WORDS : bool = true;
143const ALGO : crc::Algorithm<u32> = crc::Algorithm {
144 width : 32u8,
145 init : 0xFFFFFFFF,
146 poly : 0x04C11DB7,
148 refin : true,
149 refout : true,
150 xorout : 0xFFFFFFFF,
151 check : 0,
152 residue : 0,
153 };
154
155pub struct RBEventMemoryStreamer {
164 pub stream : Vec<u8>,
166 pub check_channel_errors : bool,
169 pub mask : Vec<u8>,
171
172 pos : usize,
174 pos_at_head : bool,
177 pub tp_sender : Option<Sender<TofPacket>>,
180 n_events_ext : usize,
183 pub is_depleted : bool,
184 pub calc_crc32 : bool,
187 crc32_sum : Crc::<u32>,
189 pub request_mode : bool,
190 pub request_cache : VecDeque<(u32,u8)>,
191 pub event_map : HashMap<u32,(usize,usize)>,
195 pub first_evid : u32,
196 pub last_evid : u32,
197 pub last_event_complete : bool,
198 pub last_event_pos : usize,
199 pub is_behind_by : usize,
202 pub is_ahead_by : usize,
205}
206
207impl RBEventMemoryStreamer {
208
209 pub fn new() -> Self {
210 Self {
211 stream : Vec::<u8>::new(),
212 check_channel_errors : false,
213 mask : Vec::<u8>::new(),
214 pos : 0,
215 pos_at_head : false,
216 tp_sender : None,
217 n_events_ext : 0,
218 is_depleted : false,
219 calc_crc32 : false,
220 crc32_sum : Crc::<u32>::new(&ALGO),
221 request_mode : false,
222 request_cache : VecDeque::<(u32,u8)>::new(),
223 event_map : HashMap::<u32,(usize,usize)>::new(),
224 first_evid : 0,
225 last_evid : 0,
226 last_event_complete : false,
227 last_event_pos : 0,
228 is_behind_by : 0,
229 is_ahead_by : 0,
230 }
231 }
232
233 pub fn create_event_index(&mut self) { let begin_pos = self.pos;
238 let mut event_id = 0u32;
239 loop {
242 let mut result = (0usize, 0usize);
243 if !self.seek_next_header(0xaaaa) {
244 debug!("Could not find another header...");
245 self.pos = begin_pos;
246 self.last_evid = event_id;
247 if result.0 + result.1 > self.stream.len() - 1 {
248 self.last_event_complete = false;
249 } else {
250 self.last_event_complete = true;
251 }
252 info!("Indexed {} events from {} to {}", self.event_map.len(), self.first_evid, self.last_evid);
253 return;
254 }
255 result.0 = self.pos;
256 self.pos += 4;let packet_len = parse_u16(&self.stream, &mut self.pos) as usize * 2;
258 if self.stream.len() < self.pos -6 + packet_len {
259 self.pos = begin_pos;
261 self.last_evid = event_id;
262 info!("Indexed {} events from {} to {}", self.event_map.len(), self.first_evid, self.last_evid);
263 return;
264 }
266 result.1 = packet_len;
267 if packet_len < 6 {
268 self.pos = begin_pos;
269 self.last_evid = event_id;
270 info!("Indexed {} events from {} to {}", self.event_map.len(), self.first_evid, self.last_evid);
271 return;
272 }
274 self.pos -= 6;
276 self.pos += 22;
278 let event_id0 = parse_u16(&self.stream, &mut self.pos) as u32;
279 let event_id1 = parse_u16(&self.stream, &mut self.pos) as u32;
280 if REVERSE_WORDS {
281 event_id = event_id0 << 16 | event_id1;
282 } else {
283 event_id = event_id1 << 16 | event_id0;
284 }
285 if self.first_evid == 0 {
286 self.first_evid = event_id;
287 }
288 self.pos += packet_len - 26;
289 self.event_map.insert(event_id,result);
290 }
291 }
292
293 pub fn print_event_map(&self) {
294 for k in self.event_map.keys() {
295 let pos = self.event_map[&k];
296 println!("-- --> {} -> {},{}", k, pos.0, pos.1);
297 }
298 }
299
300 pub fn init_sender(&mut self, tp_sender : Sender<TofPacket>) {
302 self.tp_sender = Some(tp_sender);
303 }
304
305 pub fn send_all(&mut self) {
307 loop {
308 match self.next() {
309 None => {
310 info!("Streamer drained!");
311 break;
312 },
313 Some(ev) => {
314 let tp = ev.pack();
315 match self.tp_sender.as_ref().expect("Sender needs to be initialized first!").send(tp) {
316 Ok(_) => (),
317 Err(err) => {
318 error!("Unable to send TofPacket! {err}");
319 }
320 }
321 }
322 }
323 }
324 }
325
326
327 pub fn add(&mut self, stream : &Vec<u8>, nbytes : usize) {
331 self.is_depleted = false;
335 self.stream.extend_from_slice(&stream[0..nbytes]);
336 }
339
340 pub fn consume(&mut self, stream : &mut Vec<u8>) {
343 self.is_depleted = false;
344 self.stream.append(stream);
348 }
351
352 pub fn seek_next_header(&mut self, header : u16) -> bool{
362 match seek_marker(&self.stream, header, self.pos) {
363 Err(_) => {
365 return false;
366 }
367 Ok(head_pos) => {
368 self.pos = head_pos;
369 self.pos_at_head = true;
370 return true;
371 }
372 }
373 }
374
375 pub fn next_tofpacket(&mut self) -> Option<TofPacket> {
376 let begin_pos = self.pos; let foot_pos : usize;
379 let head_pos : usize;
380 if self.stream.len() == 0 {
381 trace!("Stream empty!");
382 return None;
383 }
384 if !self.pos_at_head {
385 if !self.seek_next_header(0xaaaa) {
386 debug!("Could not find another header...");
387 self.pos = begin_pos;
388 return None;
389 }
390 }
391 head_pos = self.pos;
392 if !self.seek_next_header(0x5555) {
395 debug!("Could not find another footer...");
396 self.pos = begin_pos;
397 return None;
398 }
399 foot_pos = self.pos;
401 self.n_events_ext += 1;
402 let mut tp = TofPacket::new();
403 tp.packet_type = TofPacketType::RBEventMemoryView;
404 tp.payload.extend_from_slice(&self.stream[head_pos..foot_pos+2]);
406 self.pos_at_head = false;
409 if self.n_events_ext % 200 == 0 {
412 self.stream.drain(0..foot_pos+3);
413 self.pos = 0;
414 }
415 Some(tp)
416 }
417
418
419 pub fn get_event_at_pos_unchecked(&mut self,
421 replace_channel_mask : Option<u16>)
422 -> Option<RBEvent> {
423 let mut header = RBEventHeader::new();
424 let mut event = RBEvent::new();
425 let mut event_status = EventStatus::Unknown;
426 if self.calc_crc32 && self.check_channel_errors {
428 event_status = EventStatus::Perfect;
429 }
430 if !self.calc_crc32 && !self.check_channel_errors {
431 event_status = EventStatus::GoodNoCRCOrErrBitCheck;
432 }
433 if !self.calc_crc32 && self.check_channel_errors {
434 event_status = EventStatus::GoodNoCRCCheck;
435 }
436 if self.calc_crc32 && !self.check_channel_errors {
437 event_status = EventStatus::GoodNoErrBitCheck;
438 }
439 let head = parse_u16(&self.stream, &mut self.pos);
442 if head != RBEventHeader::HEAD {
443 error!("Event does not start with {}", RBEventHeader::HEAD);
444 return None;
445 }
446
447 let status = parse_u16(&self.stream, &mut self.pos);
448 header.parse_status(status);
451 let packet_len = parse_u16(&self.stream, &mut self.pos) as usize * 2;
452 let nwords = parse_u16(&self.stream, &mut self.pos) as usize + 1; if self.pos - 8 + packet_len > self.stream.len() { error!("Stream is too short! Stream len is {}, packet len is {}. We are at pos {}", self.stream.len(), packet_len, self.pos);
455 self.is_depleted = true;
456 self.pos -= 8;
457 return None;
458 }
459 self.pos += 10;
462 self.pos += 1; header.rb_id = parse_u8(&self.stream, &mut self.pos);
464 header.set_channel_mask(parse_u16(&self.stream, &mut self.pos));
465 match replace_channel_mask {
466 None => (),
467 Some(mask) => {
468 println!("==> Replacing ch mask {} with {}", header.get_channel_mask(), mask);
469 header.set_channel_mask(mask);
470 }
471 }
472 let event_id0 = parse_u16(&self.stream, &mut self.pos) as u32;
473 let event_id1 = parse_u16(&self.stream, &mut self.pos) as u32;
474 let event_id : u32;
475 if REVERSE_WORDS {
476 event_id = event_id0 << 16 | event_id1;
477 } else {
478 event_id = event_id1 << 16 | event_id0;
479 }
480
481 header.event_id = event_id;
482 self.pos += 4;
486 let timestamp0 = parse_u16(&self.stream, &mut self.pos);
487 let timestamp1 = parse_u16(&self.stream, &mut self.pos) as u32;
488 let timestamp2 = parse_u16(&self.stream, &mut self.pos);
489 let timestamp16 : u16;
491 let timestamp32 : u32;
492 if REVERSE_WORDS {
493 timestamp16 = timestamp0;
494 timestamp32 = timestamp1 << 16 | timestamp2 as u32;
495 } else {
496 timestamp16 = timestamp2;
497 timestamp32 = (timestamp0 as u32) << 16 | timestamp1;
498 }
499 header.timestamp16 = timestamp16;
500 header.timestamp32 = timestamp32;
501 if header.drs_lost_trigger() {
505 event.status = EventStatus::IncompleteReadout;
506 event.header = header;
507 return Some(event);
509 }
510 let mut any_cell_error = false;
515 let mut header_channels = header.get_channels().clone();
516 for k in &self.mask {
517 header_channels.retain(|x| x != k);
518 }
519
520 for ch in header_channels.iter() {
521 let ch_id = parse_u16(&self.stream, &mut self.pos);
522 if ch_id != *ch as u16 {
523 let search_pos = self.pos;
525 match seek_marker(&self.stream, TofPacket::HEAD, search_pos) {
526 Err(_) => (),
528 Ok(result) => {
529 info!("The channel data is corrupt, but we found a header at {} for remaining stream len {}", result, self.stream.len());
530 }
531 }
532 let mut stream_view = Vec::<u8>::new();
533 let foo_pos = self.pos;
534 for k in foo_pos -3..foo_pos + 3 {
535 stream_view.push(self.stream[k]);
536 }
537 error!("We got {ch_id} but expected {ch} for event {}. The parsed ch id is not in the channel mask! We will fill this channel with u16::MAX .... Stream view +- 3 around the ch id {:?}", header.event_id, stream_view);
538 event_status = EventStatus::ChannelIDWrong;
539 event.adc[*ch as usize] = vec![u16::MAX;NWORDS];
541 self.pos += 2*nwords + 4;
542 continue;
543 } else {
544 let mut dig = self.crc32_sum.digest();
551 if self.calc_crc32 {
552 let mut this_ch_adc = Vec::<u16>::with_capacity(nwords);
553 for _ in 0..nwords {
554 let this_field = parse_u16(&self.stream, &mut self.pos);
555 dig.update(&this_field.to_le_bytes());
556 if self.check_channel_errors {
557 if ((0x8000 & this_field) >> 15) == 0x1 {
558 error!("Ch error bit set for ch {}!", ch);
559 event_status = EventStatus::ChnSyncErrors;
560 }
561 if ((0x4000 & this_field) >> 14) == 0x1 {
562 error!("Cell error bit set for ch {}!", ch);
563 event_status = EventStatus::CellSyncErrors;
564 any_cell_error = true;
565 }
566 }
567 this_ch_adc.push(0x3fff & this_field)
568 }
569 event.adc[*ch as usize] = this_ch_adc;
570 } else {
571 if self.check_channel_errors {
572 let adc_w_errs = u8_to_u16_err_check(&self.stream[self.pos..self.pos + 2*nwords]);
573 if adc_w_errs.1 {
574 error!("Ch error bit set for ch {}!", ch);
575 event_status = EventStatus::ChnSyncErrors;
576 any_cell_error = true;
577 } else if adc_w_errs.2 {
578 error!("Cell error bit set for ch {}!", ch);
579 event_status = EventStatus::CellSyncErrors;
580 }
581 event.adc[*ch as usize] = adc_w_errs.0;
582 } else {
583 event.adc[*ch as usize] = u8_to_u16_14bit(&self.stream[self.pos..self.pos + 2*nwords]);
584 }
585 self.pos += 2*nwords;
586 }
587 let crc320 = parse_u16(&self.stream, &mut self.pos) as u32;
590 let crc321 = parse_u16(&self.stream, &mut self.pos) as u32;
591 if self.calc_crc32 {
593 let crc32 : u32;
594 if REVERSE_WORDS {
595 crc32 = crc320 << 16 | crc321;
596 } else {
597 crc32 = crc321 << 16 | crc320;
598 }
599 let checksum = dig.finalize();
600 if checksum != crc32 {
601 event_status = EventStatus::CRC32Wrong;
602 }
603 println!("== ==> Checksum {}, channel checksum {}!", checksum, crc32);
604 }
605 }
606 }
607 if any_cell_error {
608 if event_status == EventStatus::ChnSyncErrors {
609 event_status = EventStatus::CellAndChnSyncErrors;
610 }
611 }
612
613 if !header.drs_lost_trigger() {
614 header.stop_cell = parse_u16(&self.stream, &mut self.pos);
615 }
616 self.pos += 4;
620
621 let tail = parse_u16(&self.stream, &mut self.pos);
640 if tail != 0x5555 {
641 error!("Tail signature {} for event {} is invalid!", tail, header.event_id);
642 event_status = EventStatus::TailWrong;
643 }
644 self.pos_at_head = false;
646 event.header = header;
647 event.status = event_status;
648 if event_status == EventStatus::TailWrong {
649 info!("{}", event);
650 }
651 Some(event)
652 }
653
654 pub fn get_event_at_id(&mut self, event_id : u32, replace_channel_mask : Option<u16>) -> Option<RBEvent> {
655 let begin_pos = self.pos; let pos = self.event_map.remove(&event_id)?;
667 if self.stream.len() < pos.0 + pos.1 {
668 trace!("Stream is too short!");
669 self.is_depleted = true;
670 self.pos = begin_pos;
671 return None;
672 }
673 self.pos = pos.0;
674 self.get_event_at_pos_unchecked(replace_channel_mask)
675 }
676}
677
678impl Iterator for RBEventMemoryStreamer {
679 type Item = RBEvent;
680
681 fn next(&mut self) -> Option<Self::Item> {
682 let begin_pos : usize; self.pos_at_head = false;
689 begin_pos = self.pos; if self.stream.len() == 0 {
692 trace!("Stream empty!");
693 self.is_depleted = true;
694 self.pos = 0;
695 return None;
696 }
697 if !self.pos_at_head {
698 if !self.seek_next_header(0xaaaa) {
699 debug!("Could not find another header...");
700 self.pos = begin_pos;
701 self.is_depleted = true;
702 return None;
703 }
704 }
705
706 let event = self.get_event_at_pos_unchecked(None)?;
707 self.n_events_ext += 1;
708 self.stream.drain(0..self.pos);
709 self.pos = 0;
710 self.pos_at_head = false;
711 Some(event)
712 }
713}
714
715