crossterm/event/source/unix/mio.rs
1use std::{collections::VecDeque, io, time::Duration};
2
3use mio::{unix::SourceFd, Events, Interest, Poll, Token};
4use signal_hook_mio::v1_0::Signals;
5
6#[cfg(feature = "event-stream")]
7use crate::event::sys::Waker;
8use crate::event::{
9 source::EventSource, sys::unix::parse::parse_event, timeout::PollTimeout, Event, InternalEvent,
10};
11use crate::terminal::sys::file_descriptor::{tty_fd, FileDesc};
12
13// Tokens to identify file descriptor
14const TTY_TOKEN: Token = Token(0);
15const SIGNAL_TOKEN: Token = Token(1);
16#[cfg(feature = "event-stream")]
17const WAKE_TOKEN: Token = Token(2);
18
19// I (@zrzka) wasn't able to read more than 1_022 bytes when testing
20// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
21// is enough.
22const TTY_BUFFER_SIZE: usize = 1_024;
23
24pub(crate) struct UnixInternalEventSource {
25 poll: Poll,
26 events: Events,
27 parser: Parser,
28 tty_buffer: [u8; TTY_BUFFER_SIZE],
29 tty_fd: FileDesc<'static>,
30 signals: Signals,
31 #[cfg(feature = "event-stream")]
32 waker: Waker,
33}
34
35impl UnixInternalEventSource {
36 pub fn new() -> io::Result<Self> {
37 UnixInternalEventSource::from_file_descriptor(tty_fd()?)
38 }
39
40 pub(crate) fn from_file_descriptor(input_fd: FileDesc<'static>) -> io::Result<Self> {
41 let poll = Poll::new()?;
42 let registry = poll.registry();
43
44 let tty_raw_fd = input_fd.raw_fd();
45 let mut tty_ev = SourceFd(&tty_raw_fd);
46 registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?;
47
48 let mut signals = Signals::new([signal_hook::consts::SIGWINCH])?;
49 registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?;
50
51 #[cfg(feature = "event-stream")]
52 let waker = Waker::new(registry, WAKE_TOKEN)?;
53
54 Ok(UnixInternalEventSource {
55 poll,
56 events: Events::with_capacity(3),
57 parser: Parser::default(),
58 tty_buffer: [0u8; TTY_BUFFER_SIZE],
59 tty_fd: input_fd,
60 signals,
61 #[cfg(feature = "event-stream")]
62 waker,
63 })
64 }
65}
66
67impl EventSource for UnixInternalEventSource {
68 fn try_read(&mut self, timeout: Option<Duration>) -> io::Result<Option<InternalEvent>> {
69 if let Some(event) = self.parser.next() {
70 return Ok(Some(event));
71 }
72
73 let timeout = PollTimeout::new(timeout);
74
75 loop {
76 if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) {
77 // Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds.
78 // Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned).
79 // https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes
80 if e.kind() == io::ErrorKind::Interrupted {
81 continue;
82 } else {
83 return Err(e);
84 }
85 };
86
87 if self.events.is_empty() {
88 // No readiness events = timeout
89 return Ok(None);
90 }
91
92 for token in self.events.iter().map(|x| x.token()) {
93 match token {
94 TTY_TOKEN => {
95 loop {
96 match self.tty_fd.read(&mut self.tty_buffer) {
97 Ok(read_count) => {
98 if read_count > 0 {
99 self.parser.advance(
100 &self.tty_buffer[..read_count],
101 read_count == TTY_BUFFER_SIZE,
102 );
103 }
104 }
105 Err(e) => {
106 // No more data to read at the moment. We will receive another event
107 if e.kind() == io::ErrorKind::WouldBlock {
108 break;
109 }
110 // once more data is available to read.
111 else if e.kind() == io::ErrorKind::Interrupted {
112 continue;
113 }
114 }
115 };
116
117 if let Some(event) = self.parser.next() {
118 return Ok(Some(event));
119 }
120 }
121 }
122 SIGNAL_TOKEN => {
123 if self.signals.pending().next() == Some(signal_hook::consts::SIGWINCH) {
124 // TODO Should we remove tput?
125 //
126 // This can take a really long time, because terminal::size can
127 // launch new process (tput) and then it parses its output. It's
128 // not a really long time from the absolute time point of view, but
129 // it's a really long time from the mio, async-std/tokio executor, ...
130 // point of view.
131 let new_size = crate::terminal::size()?;
132 return Ok(Some(InternalEvent::Event(Event::Resize(
133 new_size.0, new_size.1,
134 ))));
135 }
136 }
137 #[cfg(feature = "event-stream")]
138 WAKE_TOKEN => {
139 return Err(std::io::Error::new(
140 std::io::ErrorKind::Interrupted,
141 "Poll operation was woken up by `Waker::wake`",
142 ));
143 }
144 _ => unreachable!("Synchronize Evented handle registration & token handling"),
145 }
146 }
147
148 // Processing above can take some time, check if timeout expired
149 if timeout.elapsed() {
150 return Ok(None);
151 }
152 }
153 }
154
155 #[cfg(feature = "event-stream")]
156 fn waker(&self) -> Waker {
157 self.waker.clone()
158 }
159}
160
161//
162// Following `Parser` structure exists for two reasons:
163//
164// * mimic anes Parser interface
165// * move the advancing, parsing, ... stuff out of the `try_read` method
166//
167#[derive(Debug)]
168struct Parser {
169 buffer: Vec<u8>,
170 internal_events: VecDeque<InternalEvent>,
171}
172
173impl Default for Parser {
174 fn default() -> Self {
175 Parser {
176 // This buffer is used for -> 1 <- ANSI escape sequence. Are we
177 // aware of any ANSI escape sequence that is bigger? Can we make
178 // it smaller?
179 //
180 // Probably not worth spending more time on this as "there's a plan"
181 // to use the anes crate parser.
182 buffer: Vec::with_capacity(256),
183 // TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can
184 // fit? What is an average sequence length? Let's guess here
185 // and say that the average ANSI escape sequence length is 8 bytes. Thus
186 // the buffer size should be 1024/8=128 to avoid additional allocations
187 // when processing large amounts of data.
188 //
189 // There's no need to make it bigger, because when you look at the `try_read`
190 // method implementation, all events are consumed before the next TTY_BUFFER
191 // is processed -> events pushed.
192 internal_events: VecDeque::with_capacity(128),
193 }
194 }
195}
196
197impl Parser {
198 fn advance(&mut self, buffer: &[u8], more: bool) {
199 for (idx, byte) in buffer.iter().enumerate() {
200 let more = idx + 1 < buffer.len() || more;
201
202 self.buffer.push(*byte);
203
204 match parse_event(&self.buffer, more) {
205 Ok(Some(ie)) => {
206 self.internal_events.push_back(ie);
207 self.buffer.clear();
208 }
209 Ok(None) => {
210 // Event can't be parsed, because we don't have enough bytes for
211 // the current sequence. Keep the buffer and process next bytes.
212 }
213 Err(_) => {
214 // Event can't be parsed (not enough parameters, parameter is not a number, ...).
215 // Clear the buffer and continue with another sequence.
216 self.buffer.clear();
217 }
218 }
219 }
220 }
221}
222
223impl Iterator for Parser {
224 type Item = InternalEvent;
225
226 fn next(&mut self) -> Option<Self::Item> {
227 self.internal_events.pop_front()
228 }
229}