gondola_core/io/
streamers.rs

1//! 
2//! * RBEventMemoryStreamer: Walk over "raw" RBEvents
3//!   representations ("RBEventMemoryView") and extract
4//!   RBEvents
5//!
6// This file is part of gaps-online-software and published 
7// under the GPLv3 license
8
9use crate::prelude::*;
10
11use crossbeam_channel::Sender;
12
13/// ZMQ socket wrapper for the zmq socket which is 
14/// supposed to receive data from the TOF system.
15pub fn socket_wrap_tofstream(address   : &str,
16                             tp_sender : Sender<TofPacket>) {
17  // FIXME - would be nice to make this generic, but 
18  //         Telemetry and Tofstream are too different 
19  let ctx = zmq::Context::new();
20  // FIXME - don't hardcode this IP
21  let socket = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
22  socket.connect(address).expect("Unable to connect to data (PUB) socket {adress}");
23  socket.set_subscribe(b"").expect("Can't subscribe to any message on 0MQ socket!");
24  //let mut n_pack = 0usize;
25  info!("0MQ SUB socket connected to address {address}");
26  // per default, we create master trigger packets from TofEventSummary, 
27  // except we have "real" mtb packets
28  //let mut craft_mte_packets = true;
29  loop {
30    match socket.recv_bytes(0) {
31      Err(err) => error!("Can't receive TofPacket! {err}"),
32      Ok(payload)    => {
33        match TofPacket::from_bytestream(&payload, &mut 0) {
34          Ok(tp) => {
35            match tp_sender.send(tp) {
36              Ok(_) => (),
37              Err(err) => error!("Can't send TofPacket over channel! {err}")
38            }
39          }
40          Err(err) => {
41            debug!("Can't decode payload! {err}");
42            // that might have an RB prefix, forward 
43            // it 
44            match TofPacket::from_bytestream(&payload, &mut 4) {
45              Err(err) => {
46                error!("Don't understand bytestream! {err}"); 
47              },
48              Ok(tp) => {
49                match tp_sender.send(tp) {
50                  Ok(_) => (),
51                  Err(err) => error!("Can't send TofPacket over channel! {err}")
52                }
53              }
54            }
55          }  
56        }
57      }
58    }
59  }
60}
61
62/// Get the GAPS merged event telemetry stream and 
63/// broadcast it to the relevant tab
64///
65/// # Arguments
66///
67/// * address     : Address to susbscribe to for telemetry 
68///                 stream (must be zmq.PUB) on the Sender
69///                 side
70/// * cachesize   : Getting the packets from the funneled stream leads
71///                 to duplicates. To eliminate these, we store the 
72///                 packet counter variable in a Dequee of a given 
73///                 size
74/// * tele_sender : Channel to forward the received telemetry
75///                 packets
76pub fn socket_wrap_telemetry(address     : &str,
77                             cachesize   : usize,
78                             tele_sender : Sender<TelemetryPacket>) {
79  let ctx = zmq::Context::new();
80  // FIXME - don't hardcode this IP
81  // typically how it is done is that this program runs either on a gse
82  // or there is a local forwarding of the port thrugh ssh
83  //let address : &str = "tcp://127.0.0.1:55555";
84  let socket = ctx.socket(zmq::SUB).expect("Unable to create 0MQ SUB socket!");
85  match socket.connect(&address) {
86    Err(err) => {
87      error!("Unable to connect to data (PUB) socket {address}! {err}");
88      panic!("Can not connect to zmq PUB socket!");
89    }
90    Ok(_) => ()
91  }
92  let mut cache = VecDeque::<u16>::with_capacity(cachesize);
93  socket.set_subscribe(b"") .expect("Can't subscribe to any message on 0MQ socket! {err}");
94  loop {
95    match socket.recv_bytes(0) {
96      Err(err)    => error!("Can't receive TofPacket! {err}"),
97      Ok(mut payload) => {
98        match TelemetryPacketHeader::from_bytestream(&payload, &mut 0) {
99          Err(err) => {
100            error!("Can not decode telemtry header! {err}");
101            //for k in pos - 5 .. pos + 5 {
102            //  println!("{}",stream[k]);
103            //}
104          }
105          Ok(header) => {
106            let mut packet = TelemetryPacket::new();
107            if payload.len() > TelemetryPacketHeader::SIZE {
108              payload.drain(0..TelemetryPacketHeader::SIZE);
109            }
110            if cache.contains(&header.counter) {
111              // drop this packet
112              continue;
113            } else {
114              cache.push_back(header.counter); 
115            }
116            if cache.len() == cachesize {
117              cache.pop_front();
118            }
119
120            packet.header  = header;
121            packet.payload = payload;
122            match tele_sender.send(packet) {
123              Err(err) => error!("Can not send telemetry packet to downstream! {err}"),
124              Ok(_)    => ()
125            }
126          }
127        }
128      }
129    }
130  }
131}
132
133
134
135
136// only used here
137use crc::Crc;
138
139// change if we switch to a firmware
140// where the byteorder of u32 and larger 
141// is correct.
142const REVERSE_WORDS : bool = true;
143const ALGO : crc::Algorithm<u32> = crc::Algorithm {
144      width   : 32u8,
145      init    : 0xFFFFFFFF,
146      //poly    : 0xEDB88320,
147      poly    : 0x04C11DB7,
148      refin   : true,
149      refout  : true,
150      xorout  : 0xFFFFFFFF,
151      check   : 0,
152      residue : 0,
153    };
154
155/// Emit RBEvents from a stream of bytes
156/// from RBMemory
157///
158/// The layout of the stream has to have
159/// the fpga fw memory layout.
160///
161/// This provides a next() method to act
162/// as a generator for RBEvents
163pub struct RBEventMemoryStreamer {
164  /// Raw stream read out from the RB buffers.
165  pub stream               : Vec<u8>,
166  /// Error checking mode - check error bits for 
167  /// channels/cells
168  pub check_channel_errors : bool,
169  /// Ignore channels in this list
170  pub mask                 : Vec<u8>,
171
172  /// Current position in the stream
173  pos                      : usize,
174  /// The current posion marker points to a header 
175  /// signature in the stream.
176  pos_at_head              : bool,
177  /// An optional crossbeam::channel Sender, which 
178  /// will allow to send TofPackets
179  pub tp_sender            : Option<Sender<TofPacket>>,
180  /// number of extracted events from stream
181  /// this manages how we are draining the stream
182  n_events_ext             : usize,
183  pub is_depleted          : bool,
184  /// Calculate the crc32 checksum for the channels 
185  /// everytime next() is called
186  pub calc_crc32           : bool,
187  /// placeholder for checksum calculator
188  crc32_sum                : Crc::<u32>,
189  pub request_mode         : bool,
190  pub request_cache        : VecDeque<(u32,u8)>,
191  /// an index for the events in the stream
192  /// this links eventid and start position
193  /// in the stream together
194  pub event_map            : HashMap<u32,(usize,usize)>,
195  pub first_evid           : u32,
196  pub last_evid            : u32,
197  pub last_event_complete  : bool,
198  pub last_event_pos       : usize,
199  /// When in request mode, number of events the last event in the stream is behind the
200  /// first request
201  pub is_behind_by         : usize,
202  /// When in request mode, number of events the last event in the stream is ahead the
203  /// last request
204  pub is_ahead_by          : usize,
205}
206
207impl RBEventMemoryStreamer {
208
209  pub fn new() -> Self {
210    Self {
211      stream               : Vec::<u8>::new(),
212      check_channel_errors : false,
213      mask                 : Vec::<u8>::new(),
214      pos                  : 0,
215      pos_at_head          : false,
216      tp_sender            : None,
217      n_events_ext         : 0,
218      is_depleted          : false,
219      calc_crc32           : false,
220      crc32_sum            : Crc::<u32>::new(&ALGO),
221      request_mode         : false,
222      request_cache        : VecDeque::<(u32,u8)>::new(),
223      event_map            : HashMap::<u32,(usize,usize)>::new(),
224      first_evid           : 0,
225      last_evid            : 0,
226      last_event_complete  : false,
227      last_event_pos       : 0,
228      is_behind_by         : 0,
229      is_ahead_by          : 0,
230    }
231  }
232 
233  /// Create the event index, which is
234  /// a map of event ids and position 
235  /// + length in the stream
236  pub fn create_event_index(&mut self) { //-> Result<Ok, SerializationError> {
237    let begin_pos = self.pos;
238    let mut event_id = 0u32;
239    // we are now at head, 
240    // read packet len and event id
241    loop {
242      let mut result = (0usize, 0usize);
243      if !self.seek_next_header(0xaaaa) {
244        debug!("Could not find another header...");
245        self.pos = begin_pos;
246        self.last_evid = event_id;
247        if result.0 + result.1 > self.stream.len() - 1 {
248          self.last_event_complete = false;
249        } else {
250          self.last_event_complete = true;
251        }
252        info!("Indexed {} events from {} to {}", self.event_map.len(), self.first_evid, self.last_evid);
253        return;
254      }
255      result.0 = self.pos;
256      self.pos += 4;//header, status
257      let packet_len = parse_u16(&self.stream, &mut self.pos) as usize * 2;
258      if self.stream.len() < self.pos -6 + packet_len {
259        //self.is_depleted = true;
260        self.pos = begin_pos;
261        self.last_evid = event_id;
262        info!("Indexed {} events from {} to {}", self.event_map.len(), self.first_evid, self.last_evid);
263        return;
264        //return Err(SerializationError::StreamTooShort);
265      }
266      result.1 = packet_len;
267      if packet_len < 6 {
268        self.pos = begin_pos;
269        self.last_evid = event_id;
270        info!("Indexed {} events from {} to {}", self.event_map.len(), self.first_evid, self.last_evid);
271        return;
272        //return Err(SerializationError::StreamTooShort);
273      }
274      // rewind
275      self.pos -= 6;
276      // event id is at pos 22
277      self.pos += 22;
278      let event_id0    = parse_u16(&self.stream, &mut self.pos) as u32;
279      let event_id1    = parse_u16(&self.stream, &mut self.pos) as u32;
280      if REVERSE_WORDS {
281        event_id = event_id0 << 16 | event_id1;
282      } else {
283        event_id = event_id1 << 16 | event_id0;
284      }
285      if self.first_evid == 0 {
286        self.first_evid = event_id;
287      }
288      self.pos += packet_len - 26;
289      self.event_map.insert(event_id,result);
290    }
291  }
292
293  pub fn print_event_map(&self) {
294    for k in self.event_map.keys() {
295      let pos = self.event_map[&k];
296      println!("-- --> {} -> {},{}", k, pos.0, pos.1);
297    }
298  }
299
300  // EXPERIMENTAL
301  pub fn init_sender(&mut self, tp_sender : Sender<TofPacket>) {
302    self.tp_sender = Some(tp_sender);
303  }
304
305  // EXPERIMENTAL
306  pub fn send_all(&mut self) {
307    loop {
308      match self.next() {
309        None => {
310          info!("Streamer drained!");
311          break;
312        },
313        Some(ev) => {
314          let tp = ev.pack();
315          match self.tp_sender.as_ref().expect("Sender needs to be initialized first!").send(tp) {
316            Ok(_) => (),
317            Err(err) => {
318              error!("Unable to send TofPacket! {err}");
319            }
320          }
321        }
322      }
323    }
324  }
325
326
327  // FIXME - performance. Don't extend it. It would be
328  // better if we'd consume the stream without 
329  // reallocating memory.
330  pub fn add(&mut self, stream : &Vec<u8>, nbytes : usize) {
331    //self.stream.extend(stream.iter().copied());
332    //println!("self.pos {}", self.pos);
333    //println!("Stream before {}",self.stream.len());
334    self.is_depleted = false;
335    self.stream.extend_from_slice(&stream[0..nbytes]);
336    //self.create_event_index();
337    //println!("Stream after {}",self.stream.len());
338  }
339
340  /// Take in a stream by consuming it, that means moving
341  /// This will avoid clones.
342  pub fn consume(&mut self, stream : &mut Vec<u8>) {
343    self.is_depleted = false;
344    // FIXME: append can panic
345    // we use it here, since it does not clone
346    //println!("[io.rs] consuming {} bytes", stream.len());
347    self.stream.append(stream);
348    //println!("[io.rs] stream has now {} bytes", self.stream.len());
349    //self.create_event_index();
350  }
351
352  /// Headers are expected to be a 2byte signature, 
353  /// e.g. 0xaaaa. 
354  ///
355  /// # Arguments:
356  ///   header : 2byte header.
357  ///
358  /// # Returns
359  /// 
360  ///   * success   : header found
361  pub fn seek_next_header(&mut self, header : u16) -> bool{
362    match seek_marker(&self.stream, header, self.pos) { 
363    //match search_for_u16(header, &self.stream, self.pos) {
364      Err(_) => {
365        return false;
366      }
367      Ok(head_pos) => {
368        self.pos = head_pos;
369        self.pos_at_head = true;
370        return true;
371      }
372    }
373  }
374
375  pub fn next_tofpacket(&mut self) -> Option<TofPacket> {
376    let begin_pos = self.pos; // in case we need
377                              // to reset the position
378    let foot_pos : usize;
379    let head_pos : usize;
380    if self.stream.len() == 0 {
381      trace!("Stream empty!");
382      return None;
383    }
384    if !self.pos_at_head {
385      if !self.seek_next_header(0xaaaa) {
386        debug!("Could not find another header...");
387        self.pos = begin_pos;
388        return None;
389      }
390    }
391    head_pos  = self.pos;
392    //let mut foot_pos  = self.pos;
393    //head_pos = self.pos;
394    if !self.seek_next_header(0x5555) {
395      debug!("Could not find another footer...");
396      self.pos = begin_pos;
397      return None;
398    }
399    //println!("{} {} {}", self.stream.len(), head_pos, foot_pos);
400    foot_pos = self.pos;
401    self.n_events_ext += 1;
402    let mut tp = TofPacket::new();
403    tp.packet_type = TofPacketType::RBEventMemoryView;
404    //let mut payload = Vec::<u8>::with_capacity(18530);
405    tp.payload.extend_from_slice(&self.stream[head_pos..foot_pos+2]);
406    //tp.payload = payload;
407    //self.pos += 2;
408    self.pos_at_head = false;
409    //self.stream.drain(0..foot_pos);
410    //self.pos = 0;
411    if self.n_events_ext % 200 == 0 {
412      self.stream.drain(0..foot_pos+3);
413      self.pos = 0;
414    }
415    Some(tp)
416  }
417
418
419  /// Retrive an RBEvent from a certain position
420  pub fn get_event_at_pos_unchecked(&mut self,
421                                    replace_channel_mask : Option<u16>)
422      -> Option<RBEvent> {
423    let mut header       = RBEventHeader::new();
424    let mut event        = RBEvent::new();
425    let mut event_status = EventStatus::Unknown;
426    //let begin_pos = self.pos;
427    if self.calc_crc32 && self.check_channel_errors {
428      event_status = EventStatus::Perfect;
429    }
430    if !self.calc_crc32 && !self.check_channel_errors {
431      event_status = EventStatus::GoodNoCRCOrErrBitCheck;
432    }
433    if !self.calc_crc32 && self.check_channel_errors {
434      event_status = EventStatus::GoodNoCRCCheck;
435    }
436    if self.calc_crc32 && !self.check_channel_errors {
437      event_status = EventStatus::GoodNoErrBitCheck;
438    }
439    // start parsing
440    //let first_pos = self.pos;
441    let head   = parse_u16(&self.stream, &mut self.pos);
442    if head != RBEventHeader::HEAD {
443      error!("Event does not start with {}", RBEventHeader::HEAD);
444      return None;
445    }
446
447    let status = parse_u16(&self.stream, &mut self.pos);
448    // At this state, this can be a header or a full event. Check here and
449    // proceed depending on the options
450    header.parse_status(status);
451    let packet_len = parse_u16(&self.stream, &mut self.pos) as usize * 2;
452    let nwords     = parse_u16(&self.stream, &mut self.pos) as usize + 1; // the field will tell you the 
453    if self.pos - 8 + packet_len > self.stream.len() { // -1?
454      error!("Stream is too short! Stream len is {}, packet len is {}. We are at pos {}", self.stream.len(), packet_len, self.pos);
455      self.is_depleted = true;
456      self.pos -= 8;
457      return None;
458    }
459    // now we skip the next 10 bytes, 
460    // they are dna, rsv, rsv, rsv, fw_hash
461    self.pos += 10;
462    self.pos += 1; // rb id first byte is rsvd
463    header.rb_id        = parse_u8(&self.stream, &mut self.pos);
464    header.set_channel_mask(parse_u16(&self.stream, &mut self.pos)); 
465    match replace_channel_mask {
466      None => (),
467      Some(mask) => {
468        println!("==> Replacing ch mask {} with {}", header.get_channel_mask(), mask);
469        header.set_channel_mask(mask); 
470      }
471    }
472    let event_id0       = parse_u16(&self.stream, &mut self.pos) as u32;
473    let event_id1       = parse_u16(&self.stream, &mut self.pos) as u32;
474    let event_id : u32;
475    if REVERSE_WORDS {
476      event_id = event_id0 << 16 | event_id1;
477    } else {
478      event_id = event_id1 << 16 | event_id0;
479    }
480    
481    header.event_id  = event_id;
482    // we are currently not using these
483    //let _dtap0       = parse_u16(&self.stream, &mut self.pos);
484    //let _drs4_temp   = parse_u16(&self.stream, &mut self.pos);
485    self.pos += 4;
486    let timestamp0   = parse_u16(&self.stream, &mut self.pos);
487    let timestamp1   = parse_u16(&self.stream, &mut self.pos) as u32;
488    let timestamp2   = parse_u16(&self.stream, &mut self.pos);
489    //println!("TIMESTAMPS {} {} {}", timestamp0, timestamp1, timestamp2);
490    let timestamp16 : u16;
491    let timestamp32 : u32;
492    if REVERSE_WORDS {
493      timestamp16 = timestamp0;
494      timestamp32 = timestamp1 << 16 | timestamp2 as u32;
495    } else {
496      timestamp16 = timestamp2;
497      timestamp32 = (timestamp0 as u32) << 16 | timestamp1;
498    }
499    header.timestamp16 = timestamp16;
500    header.timestamp32 = timestamp32;
501    // now the payload
502    //println!("{}", header);
503    //println!("{}", nwords);
504    if header.drs_lost_trigger() {
505      event.status = EventStatus::IncompleteReadout;
506      event.header = header;
507      //self.pos_at_head = false;
508      return Some(event);
509    }
510    // make sure we can read them!
511    //let expected_packet_size =   header.get_channels().len()*nwords*2 
512    //                           + header.get_channels().len()*2 
513    //                           + header.get_channels().len()*4;
514    let mut any_cell_error = false;
515    let mut header_channels = header.get_channels().clone();
516    for k in &self.mask {
517      header_channels.retain(|x| x != k);
518    }
519
520    for ch in header_channels.iter() {
521      let ch_id = parse_u16(&self.stream, &mut self.pos);
522      if ch_id != *ch as u16 {
523        // check where is the next header
524        let search_pos = self.pos;
525        match seek_marker(&self.stream, TofPacket::HEAD, search_pos) { 
526        //match search_for_u16(TofPacket::HEAD, &self.stream, search_pos) {
527          Err(_) => (),
528          Ok(result) => {
529            info!("The channel data is corrupt, but we found a header at {} for remaining stream len {}", result, self.stream.len()); 
530          }
531        }
532        let mut stream_view = Vec::<u8>::new();
533        let foo_pos = self.pos;
534        for k in foo_pos -3..foo_pos + 3 {
535          stream_view.push(self.stream[k]);
536        }
537        error!("We got {ch_id} but expected {ch} for event {}. The parsed ch id is not in the channel mask! We will fill this channel with u16::MAX .... Stream view +- 3 around the ch id {:?}", header.event_id, stream_view);
538        event_status = EventStatus::ChannelIDWrong;
539        // we fill the channel with MAX values:
540        event.adc[*ch as usize] = vec![u16::MAX;NWORDS];
541        self.pos += 2*nwords + 4;
542        continue;
543      } else {
544      //if ch_id == *ch as u16 {
545        //println!("Got ch id {}", ch_id);
546        //let header = parse_u16(&self.stream, &mut self.pos);
547        // noice!!
548        //let data : Vec<u8> = self.stream.iter().skip(self.pos).take(2*nwords).map(|&x| x).collect();
549         
550        let mut dig = self.crc32_sum.digest();
551        if self.calc_crc32 {
552          let mut this_ch_adc = Vec::<u16>::with_capacity(nwords);
553          for _ in 0..nwords {
554            let this_field = parse_u16(&self.stream, &mut self.pos);
555            dig.update(&this_field.to_le_bytes());
556            if self.check_channel_errors {
557              if ((0x8000 & this_field) >> 15) == 0x1 {
558                error!("Ch error bit set for ch {}!", ch);
559                event_status = EventStatus::ChnSyncErrors;
560              }
561              if ((0x4000 & this_field) >> 14) == 0x1 {
562                error!("Cell error bit set for ch {}!", ch);
563                event_status = EventStatus::CellSyncErrors;
564                any_cell_error = true;
565              }
566            }
567            this_ch_adc.push(0x3fff & this_field)
568          }
569          event.adc[*ch as usize] = this_ch_adc;
570        } else {
571          if self.check_channel_errors {
572            let adc_w_errs = u8_to_u16_err_check(&self.stream[self.pos..self.pos + 2*nwords]);
573            if adc_w_errs.1 {
574              error!("Ch error bit set for ch {}!", ch);
575              event_status = EventStatus::ChnSyncErrors;
576              any_cell_error = true;
577            } else if adc_w_errs.2 {
578              error!("Cell error bit set for ch {}!", ch);
579              event_status = EventStatus::CellSyncErrors;
580            }
581            event.adc[*ch as usize] = adc_w_errs.0;
582          } else {
583            event.adc[*ch as usize] = u8_to_u16_14bit(&self.stream[self.pos..self.pos + 2*nwords]);
584          }
585          self.pos += 2*nwords;
586        } 
587        //let data = &self.stream[self.pos..self.pos+2*nwords];
588        //self.pos += 2*nwords;
589        let crc320 = parse_u16(&self.stream, &mut self.pos) as u32;
590        let crc321 = parse_u16(&self.stream, &mut self.pos) as u32;
591        //let checksum = self.crc32_sum.clone().finalize();
592        if self.calc_crc32 {
593          let crc32 : u32;
594          if REVERSE_WORDS {
595            crc32 = crc320 << 16 | crc321;
596          } else {
597            crc32 = crc321 << 16 | crc320;
598          }
599          let checksum = dig.finalize();
600          if checksum != crc32 {
601            event_status = EventStatus::CRC32Wrong;
602          }
603          println!("== ==> Checksum {}, channel checksum {}!", checksum, crc32); 
604        }
605      }
606    }
607    if any_cell_error {
608      if event_status == EventStatus::ChnSyncErrors {
609        event_status = EventStatus::CellAndChnSyncErrors;
610      }
611    }
612    
613    if !header.drs_lost_trigger() {
614      header.stop_cell = parse_u16(&self.stream, &mut self.pos);
615    }
616    // CRC32 checksum - next 4 bytes
617    // FIXME
618    // skip crc32 checksum
619    self.pos += 4;
620
621    // in principle there is a checksum for the whole event, whcih
622    // we are currently not using (it is easy to spot wrong bytes
623    // in the header)
624    //let crc320         = parse_u16(&self.stream, &mut self.pos);
625    //let crc321         = parse_u16(&self.stream, &mut self.pos);
626    //if self.calc_crc32 {
627    //  let crc32 : u32;
628    //  if REVERSE_WORDS {
629    //    crc32 = u32::from(crc320) << 16 | u32::from(crc321);
630    //  } else {
631    //    crc32 = u32::from(crc321) << 16 | u32::from(crc320);
632    //  }
633    //  warn!("Checksum test for the whole event is not yet implemented!");
634    //  //if event.header.crc32 != crc32 {
635    //  //  trace!("Checksum test for the whole event is not yet implemented!");
636    //  //}
637    //}
638    
639    let tail         = parse_u16(&self.stream, &mut self.pos);
640    if tail != 0x5555 {
641      error!("Tail signature {} for event {} is invalid!", tail, header.event_id);
642      event_status = EventStatus::TailWrong;
643    } 
644    //self.stream.drain(0..self.pos);
645    self.pos_at_head = false;
646    event.header = header;
647    event.status = event_status;
648    if event_status == EventStatus::TailWrong {
649      info!("{}", event);
650    }
651    Some(event)
652  }
653
654  pub fn get_event_at_id(&mut self, event_id : u32, replace_channel_mask : Option<u16>) -> Option<RBEvent> {
655    let begin_pos = self.pos; // in case we need
656                              // to reset the position
657    //println!("--> Requested {}", event_id);
658    //if self.event_map.contains_key(&event_id) {
659    //  //println!("-- We have it!");
660    //} else {
661    //  //println!("-- We DON'T have it, event_map len {}", self.event_map.len());
662    //  //self.print_event_map();
663    //  //println!("-- last event id {}", self.last_evid);
664    //  //println!("-- first event id {}", self.first_evid);
665    //}
666    let pos = self.event_map.remove(&event_id)?;
667    if self.stream.len() < pos.0 + pos.1 {
668      trace!("Stream is too short!");
669      self.is_depleted = true;
670      self.pos = begin_pos;
671      return None;
672    }
673    self.pos = pos.0;
674    self.get_event_at_pos_unchecked(replace_channel_mask)
675  }
676}
677
678impl Iterator for RBEventMemoryStreamer {
679  type Item = RBEvent;
680
681  fn next(&mut self) -> Option<Self::Item> {
682    // FIXME - we should init this only once
683    // event id from stream
684    //let event_id  = 0u32;
685    let begin_pos : usize; // in case we need
686                           // to rewind
687     
688    self.pos_at_head = false;
689    begin_pos = self.pos; // in case we need
690                                // to reset the position
691    if self.stream.len() == 0 {
692      trace!("Stream empty!");
693      self.is_depleted = true;
694      self.pos = 0;
695      return None;
696    }
697    if !self.pos_at_head {
698      if !self.seek_next_header(0xaaaa) {
699        debug!("Could not find another header...");
700        self.pos = begin_pos;
701        self.is_depleted = true;
702        return None;
703      }
704    }
705    
706    let event          = self.get_event_at_pos_unchecked(None)?;
707    self.n_events_ext += 1;
708    self.stream.drain(0..self.pos);
709    self.pos           = 0;
710    self.pos_at_head   = false;
711    Some(event)
712  }
713}
714
715