crossterm/event/
read.rs

1use std::{collections::vec_deque::VecDeque, io, time::Duration};
2
3#[cfg(unix)]
4use crate::event::source::unix::UnixInternalEventSource;
5#[cfg(windows)]
6use crate::event::source::windows::WindowsEventSource;
7#[cfg(feature = "event-stream")]
8use crate::event::sys::Waker;
9use crate::event::{filter::Filter, source::EventSource, timeout::PollTimeout, InternalEvent};
10
11/// Can be used to read `InternalEvent`s.
12pub(crate) struct InternalEventReader {
13    events: VecDeque<InternalEvent>,
14    source: Option<Box<dyn EventSource>>,
15    skipped_events: Vec<InternalEvent>,
16}
17
18impl Default for InternalEventReader {
19    fn default() -> Self {
20        #[cfg(windows)]
21        let source = WindowsEventSource::new();
22        #[cfg(unix)]
23        let source = UnixInternalEventSource::new();
24
25        let source = source.ok().map(|x| Box::new(x) as Box<dyn EventSource>);
26
27        InternalEventReader {
28            source,
29            events: VecDeque::with_capacity(32),
30            skipped_events: Vec::with_capacity(32),
31        }
32    }
33}
34
35impl InternalEventReader {
36    /// Returns a `Waker` allowing to wake/force the `poll` method to return `Ok(false)`.
37    #[cfg(feature = "event-stream")]
38    pub(crate) fn waker(&self) -> Waker {
39        self.source.as_ref().expect("reader source not set").waker()
40    }
41
42    pub(crate) fn poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> io::Result<bool>
43    where
44        F: Filter,
45    {
46        for event in &self.events {
47            if filter.eval(event) {
48                return Ok(true);
49            }
50        }
51
52        let event_source = match self.source.as_mut() {
53            Some(source) => source,
54            None => {
55                return Err(std::io::Error::new(
56                    std::io::ErrorKind::Other,
57                    "Failed to initialize input reader",
58                ))
59            }
60        };
61
62        let poll_timeout = PollTimeout::new(timeout);
63
64        loop {
65            let maybe_event = match event_source.try_read(poll_timeout.leftover()) {
66                Ok(None) => None,
67                Ok(Some(event)) => {
68                    if filter.eval(&event) {
69                        Some(event)
70                    } else {
71                        self.skipped_events.push(event);
72                        None
73                    }
74                }
75                Err(e) => {
76                    if e.kind() == io::ErrorKind::Interrupted {
77                        return Ok(false);
78                    }
79
80                    return Err(e);
81                }
82            };
83
84            if poll_timeout.elapsed() || maybe_event.is_some() {
85                self.events.extend(self.skipped_events.drain(..));
86
87                if let Some(event) = maybe_event {
88                    self.events.push_front(event);
89                    return Ok(true);
90                }
91
92                return Ok(false);
93            }
94        }
95    }
96
97    pub(crate) fn read<F>(&mut self, filter: &F) -> io::Result<InternalEvent>
98    where
99        F: Filter,
100    {
101        let mut skipped_events = VecDeque::new();
102
103        loop {
104            while let Some(event) = self.events.pop_front() {
105                if filter.eval(&event) {
106                    while let Some(event) = skipped_events.pop_front() {
107                        self.events.push_back(event);
108                    }
109
110                    return Ok(event);
111                } else {
112                    // We can not directly write events back to `self.events`.
113                    // If we did, we would put our self's into an endless loop
114                    // that would enqueue -> dequeue -> enqueue etc.
115                    // This happens because `poll` in this function will always return true if there are events in it's.
116                    // And because we just put the non-fulfilling event there this is going to be the case.
117                    // Instead we can store them into the temporary buffer,
118                    // and then when the filter is fulfilled write all events back in order.
119                    skipped_events.push_back(event);
120                }
121            }
122
123            let _ = self.poll(None, filter)?;
124        }
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use std::io;
131    use std::{collections::VecDeque, time::Duration};
132
133    #[cfg(unix)]
134    use super::super::filter::CursorPositionFilter;
135    use super::{
136        super::{filter::InternalEventFilter, Event},
137        EventSource, InternalEvent, InternalEventReader,
138    };
139
140    #[test]
141    fn test_poll_fails_without_event_source() {
142        let mut reader = InternalEventReader {
143            events: VecDeque::new(),
144            source: None,
145            skipped_events: Vec::with_capacity(32),
146        };
147
148        assert!(reader.poll(None, &InternalEventFilter).is_err());
149        assert!(reader
150            .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
151            .is_err());
152        assert!(reader
153            .poll(Some(Duration::from_secs(10)), &InternalEventFilter)
154            .is_err());
155    }
156
157    #[test]
158    fn test_poll_returns_true_for_matching_event_in_queue_at_front() {
159        let mut reader = InternalEventReader {
160            events: vec![InternalEvent::Event(Event::Resize(10, 10))].into(),
161            source: None,
162            skipped_events: Vec::with_capacity(32),
163        };
164
165        assert!(reader.poll(None, &InternalEventFilter).unwrap());
166    }
167
168    #[test]
169    #[cfg(unix)]
170    fn test_poll_returns_true_for_matching_event_in_queue_at_back() {
171        let mut reader = InternalEventReader {
172            events: vec![
173                InternalEvent::Event(Event::Resize(10, 10)),
174                InternalEvent::CursorPosition(10, 20),
175            ]
176            .into(),
177            source: None,
178            skipped_events: Vec::with_capacity(32),
179        };
180
181        assert!(reader.poll(None, &CursorPositionFilter).unwrap());
182    }
183
184    #[test]
185    fn test_read_returns_matching_event_in_queue_at_front() {
186        const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
187
188        let mut reader = InternalEventReader {
189            events: vec![EVENT].into(),
190            source: None,
191            skipped_events: Vec::with_capacity(32),
192        };
193
194        assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
195    }
196
197    #[test]
198    #[cfg(unix)]
199    fn test_read_returns_matching_event_in_queue_at_back() {
200        const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
201
202        let mut reader = InternalEventReader {
203            events: vec![InternalEvent::Event(Event::Resize(10, 10)), CURSOR_EVENT].into(),
204            source: None,
205            skipped_events: Vec::with_capacity(32),
206        };
207
208        assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
209    }
210
211    #[test]
212    #[cfg(unix)]
213    fn test_read_does_not_consume_skipped_event() {
214        const SKIPPED_EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
215        const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
216
217        let mut reader = InternalEventReader {
218            events: vec![SKIPPED_EVENT, CURSOR_EVENT].into(),
219            source: None,
220            skipped_events: Vec::with_capacity(32),
221        };
222
223        assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
224        assert_eq!(reader.read(&InternalEventFilter).unwrap(), SKIPPED_EVENT);
225    }
226
227    #[test]
228    fn test_poll_timeouts_if_source_has_no_events() {
229        let source = FakeSource::default();
230
231        let mut reader = InternalEventReader {
232            events: VecDeque::new(),
233            source: Some(Box::new(source)),
234            skipped_events: Vec::with_capacity(32),
235        };
236
237        assert!(!reader
238            .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
239            .unwrap());
240    }
241
242    #[test]
243    fn test_poll_returns_true_if_source_has_at_least_one_event() {
244        let source = FakeSource::with_events(&[InternalEvent::Event(Event::Resize(10, 10))]);
245
246        let mut reader = InternalEventReader {
247            events: VecDeque::new(),
248            source: Some(Box::new(source)),
249            skipped_events: Vec::with_capacity(32),
250        };
251
252        assert!(reader.poll(None, &InternalEventFilter).unwrap());
253        assert!(reader
254            .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
255            .unwrap());
256    }
257
258    #[test]
259    fn test_reads_returns_event_if_source_has_at_least_one_event() {
260        const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
261
262        let source = FakeSource::with_events(&[EVENT]);
263
264        let mut reader = InternalEventReader {
265            events: VecDeque::new(),
266            source: Some(Box::new(source)),
267            skipped_events: Vec::with_capacity(32),
268        };
269
270        assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
271    }
272
273    #[test]
274    fn test_read_returns_events_if_source_has_events() {
275        const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
276
277        let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
278
279        let mut reader = InternalEventReader {
280            events: VecDeque::new(),
281            source: Some(Box::new(source)),
282            skipped_events: Vec::with_capacity(32),
283        };
284
285        assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
286        assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
287        assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
288    }
289
290    #[test]
291    fn test_poll_returns_false_after_all_source_events_are_consumed() {
292        const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
293
294        let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
295
296        let mut reader = InternalEventReader {
297            events: VecDeque::new(),
298            source: Some(Box::new(source)),
299            skipped_events: Vec::with_capacity(32),
300        };
301
302        assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
303        assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
304        assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
305        assert!(!reader
306            .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
307            .unwrap());
308    }
309
310    #[test]
311    fn test_poll_propagates_error() {
312        let mut reader = InternalEventReader {
313            events: VecDeque::new(),
314            source: Some(Box::new(FakeSource::new(&[]))),
315            skipped_events: Vec::with_capacity(32),
316        };
317
318        assert_eq!(
319            reader
320                .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
321                .err()
322                .map(|e| format!("{:?}", &e.kind())),
323            Some(format!("{:?}", io::ErrorKind::Other))
324        );
325    }
326
327    #[test]
328    fn test_read_propagates_error() {
329        let mut reader = InternalEventReader {
330            events: VecDeque::new(),
331            source: Some(Box::new(FakeSource::new(&[]))),
332            skipped_events: Vec::with_capacity(32),
333        };
334
335        assert_eq!(
336            reader
337                .read(&InternalEventFilter)
338                .err()
339                .map(|e| format!("{:?}", &e.kind())),
340            Some(format!("{:?}", io::ErrorKind::Other))
341        );
342    }
343
344    #[test]
345    fn test_poll_continues_after_error() {
346        const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
347
348        let source = FakeSource::new(&[EVENT, EVENT]);
349
350        let mut reader = InternalEventReader {
351            events: VecDeque::new(),
352            source: Some(Box::new(source)),
353            skipped_events: Vec::with_capacity(32),
354        };
355
356        assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
357        assert!(reader.read(&InternalEventFilter).is_err());
358        assert!(reader
359            .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
360            .unwrap());
361    }
362
363    #[test]
364    fn test_read_continues_after_error() {
365        const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
366
367        let source = FakeSource::new(&[EVENT, EVENT]);
368
369        let mut reader = InternalEventReader {
370            events: VecDeque::new(),
371            source: Some(Box::new(source)),
372            skipped_events: Vec::with_capacity(32),
373        };
374
375        assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
376        assert!(reader.read(&InternalEventFilter).is_err());
377        assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
378    }
379
380    #[derive(Default)]
381    struct FakeSource {
382        events: VecDeque<InternalEvent>,
383        error: Option<io::Error>,
384    }
385
386    impl FakeSource {
387        fn new(events: &[InternalEvent]) -> FakeSource {
388            FakeSource {
389                events: events.to_vec().into(),
390                error: Some(io::Error::new(io::ErrorKind::Other, "")),
391            }
392        }
393
394        fn with_events(events: &[InternalEvent]) -> FakeSource {
395            FakeSource {
396                events: events.to_vec().into(),
397                error: None,
398            }
399        }
400    }
401
402    impl EventSource for FakeSource {
403        fn try_read(&mut self, _timeout: Option<Duration>) -> io::Result<Option<InternalEvent>> {
404            // Return error if set in case there's just one remaining event
405            if self.events.len() == 1 {
406                if let Some(error) = self.error.take() {
407                    return Err(error);
408                }
409            }
410
411            // Return all events from the queue
412            if let Some(event) = self.events.pop_front() {
413                return Ok(Some(event));
414            }
415
416            // Return error if there're no more events
417            if let Some(error) = self.error.take() {
418                return Err(error);
419            }
420
421            // Timeout
422            Ok(None)
423        }
424
425        #[cfg(feature = "event-stream")]
426        fn waker(&self) -> super::super::sys::Waker {
427            unimplemented!();
428        }
429    }
430}