crossterm/event/source/unix/
mio.rs

1use std::{collections::VecDeque, io, time::Duration};
2
3use mio::{unix::SourceFd, Events, Interest, Poll, Token};
4use signal_hook_mio::v1_0::Signals;
5
6#[cfg(feature = "event-stream")]
7use crate::event::sys::Waker;
8use crate::event::{
9    source::EventSource, sys::unix::parse::parse_event, timeout::PollTimeout, Event, InternalEvent,
10};
11use crate::terminal::sys::file_descriptor::{tty_fd, FileDesc};
12
13// Tokens to identify file descriptor
14const TTY_TOKEN: Token = Token(0);
15const SIGNAL_TOKEN: Token = Token(1);
16#[cfg(feature = "event-stream")]
17const WAKE_TOKEN: Token = Token(2);
18
19// I (@zrzka) wasn't able to read more than 1_022 bytes when testing
20// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
21// is enough.
22const TTY_BUFFER_SIZE: usize = 1_024;
23
24pub(crate) struct UnixInternalEventSource {
25    poll: Poll,
26    events: Events,
27    parser: Parser,
28    tty_buffer: [u8; TTY_BUFFER_SIZE],
29    tty_fd: FileDesc<'static>,
30    signals: Signals,
31    #[cfg(feature = "event-stream")]
32    waker: Waker,
33}
34
35impl UnixInternalEventSource {
36    pub fn new() -> io::Result<Self> {
37        UnixInternalEventSource::from_file_descriptor(tty_fd()?)
38    }
39
40    pub(crate) fn from_file_descriptor(input_fd: FileDesc<'static>) -> io::Result<Self> {
41        let poll = Poll::new()?;
42        let registry = poll.registry();
43
44        let tty_raw_fd = input_fd.raw_fd();
45        let mut tty_ev = SourceFd(&tty_raw_fd);
46        registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?;
47
48        let mut signals = Signals::new([signal_hook::consts::SIGWINCH])?;
49        registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?;
50
51        #[cfg(feature = "event-stream")]
52        let waker = Waker::new(registry, WAKE_TOKEN)?;
53
54        Ok(UnixInternalEventSource {
55            poll,
56            events: Events::with_capacity(3),
57            parser: Parser::default(),
58            tty_buffer: [0u8; TTY_BUFFER_SIZE],
59            tty_fd: input_fd,
60            signals,
61            #[cfg(feature = "event-stream")]
62            waker,
63        })
64    }
65}
66
67impl EventSource for UnixInternalEventSource {
68    fn try_read(&mut self, timeout: Option<Duration>) -> io::Result<Option<InternalEvent>> {
69        if let Some(event) = self.parser.next() {
70            return Ok(Some(event));
71        }
72
73        let timeout = PollTimeout::new(timeout);
74
75        loop {
76            if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) {
77                // Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds.
78                // Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned).
79                // https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes
80                if e.kind() == io::ErrorKind::Interrupted {
81                    continue;
82                } else {
83                    return Err(e);
84                }
85            };
86
87            if self.events.is_empty() {
88                // No readiness events = timeout
89                return Ok(None);
90            }
91
92            for token in self.events.iter().map(|x| x.token()) {
93                match token {
94                    TTY_TOKEN => {
95                        loop {
96                            match self.tty_fd.read(&mut self.tty_buffer) {
97                                Ok(read_count) => {
98                                    if read_count > 0 {
99                                        self.parser.advance(
100                                            &self.tty_buffer[..read_count],
101                                            read_count == TTY_BUFFER_SIZE,
102                                        );
103                                    }
104                                }
105                                Err(e) => {
106                                    // No more data to read at the moment. We will receive another event
107                                    if e.kind() == io::ErrorKind::WouldBlock {
108                                        break;
109                                    }
110                                    // once more data is available to read.
111                                    else if e.kind() == io::ErrorKind::Interrupted {
112                                        continue;
113                                    }
114                                }
115                            };
116
117                            if let Some(event) = self.parser.next() {
118                                return Ok(Some(event));
119                            }
120                        }
121                    }
122                    SIGNAL_TOKEN => {
123                        if self.signals.pending().next() == Some(signal_hook::consts::SIGWINCH) {
124                            // TODO Should we remove tput?
125                            //
126                            // This can take a really long time, because terminal::size can
127                            // launch new process (tput) and then it parses its output. It's
128                            // not a really long time from the absolute time point of view, but
129                            // it's a really long time from the mio, async-std/tokio executor, ...
130                            // point of view.
131                            let new_size = crate::terminal::size()?;
132                            return Ok(Some(InternalEvent::Event(Event::Resize(
133                                new_size.0, new_size.1,
134                            ))));
135                        }
136                    }
137                    #[cfg(feature = "event-stream")]
138                    WAKE_TOKEN => {
139                        return Err(std::io::Error::new(
140                            std::io::ErrorKind::Interrupted,
141                            "Poll operation was woken up by `Waker::wake`",
142                        ));
143                    }
144                    _ => unreachable!("Synchronize Evented handle registration & token handling"),
145                }
146            }
147
148            // Processing above can take some time, check if timeout expired
149            if timeout.elapsed() {
150                return Ok(None);
151            }
152        }
153    }
154
155    #[cfg(feature = "event-stream")]
156    fn waker(&self) -> Waker {
157        self.waker.clone()
158    }
159}
160
161//
162// Following `Parser` structure exists for two reasons:
163//
164//  * mimic anes Parser interface
165//  * move the advancing, parsing, ... stuff out of the `try_read` method
166//
167#[derive(Debug)]
168struct Parser {
169    buffer: Vec<u8>,
170    internal_events: VecDeque<InternalEvent>,
171}
172
173impl Default for Parser {
174    fn default() -> Self {
175        Parser {
176            // This buffer is used for -> 1 <- ANSI escape sequence. Are we
177            // aware of any ANSI escape sequence that is bigger? Can we make
178            // it smaller?
179            //
180            // Probably not worth spending more time on this as "there's a plan"
181            // to use the anes crate parser.
182            buffer: Vec::with_capacity(256),
183            // TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can
184            // fit? What is an average sequence length? Let's guess here
185            // and say that the average ANSI escape sequence length is 8 bytes. Thus
186            // the buffer size should be 1024/8=128 to avoid additional allocations
187            // when processing large amounts of data.
188            //
189            // There's no need to make it bigger, because when you look at the `try_read`
190            // method implementation, all events are consumed before the next TTY_BUFFER
191            // is processed -> events pushed.
192            internal_events: VecDeque::with_capacity(128),
193        }
194    }
195}
196
197impl Parser {
198    fn advance(&mut self, buffer: &[u8], more: bool) {
199        for (idx, byte) in buffer.iter().enumerate() {
200            let more = idx + 1 < buffer.len() || more;
201
202            self.buffer.push(*byte);
203
204            match parse_event(&self.buffer, more) {
205                Ok(Some(ie)) => {
206                    self.internal_events.push_back(ie);
207                    self.buffer.clear();
208                }
209                Ok(None) => {
210                    // Event can't be parsed, because we don't have enough bytes for
211                    // the current sequence. Keep the buffer and process next bytes.
212                }
213                Err(_) => {
214                    // Event can't be parsed (not enough parameters, parameter is not a number, ...).
215                    // Clear the buffer and continue with another sequence.
216                    self.buffer.clear();
217                }
218            }
219        }
220    }
221}
222
223impl Iterator for Parser {
224    type Item = InternalEvent;
225
226    fn next(&mut self) -> Option<Self::Item> {
227        self.internal_events.pop_front()
228    }
229}