zmq/
lib.rs

1//! Module: zmq
2
3#![allow(trivial_numeric_casts)]
4
5use bitflags::bitflags;
6use libc::{c_int, c_long, c_short};
7
8use std::ffi;
9use std::fmt;
10use std::marker::PhantomData;
11use std::os::raw::c_void;
12#[cfg(unix)]
13use std::os::unix::io::{AsRawFd, RawFd as UnixRawFd};
14#[cfg(windows)]
15use std::os::windows::io::{AsRawSocket, RawSocket};
16use std::result;
17use std::string::FromUtf8Error;
18use std::sync::Arc;
19use std::{mem, ptr, str};
20
21use zmq_sys::{errno, RawFd};
22
23macro_rules! zmq_try {
24    ($($tt:tt)*) => {{
25        let rc = $($tt)*;
26        if rc == -1 {
27            return Err(crate::errno_to_error());
28        }
29        rc
30    }}
31}
32
33mod message;
34mod sockopt;
35
36use crate::message::msg_ptr;
37pub use crate::message::Message;
38pub use crate::SocketType::*;
39
40/// `zmq`-specific Result type.
41pub type Result<T> = result::Result<T, Error>;
42
43/// Socket types
44#[allow(non_camel_case_types)]
45#[derive(Clone, Copy, Debug, PartialEq, Eq)]
46pub enum SocketType {
47    PAIR,
48    PUB,
49    SUB,
50    REQ,
51    REP,
52    DEALER,
53    ROUTER,
54    PULL,
55    PUSH,
56    XPUB,
57    XSUB,
58    STREAM,
59}
60
61impl SocketType {
62    fn to_raw(self) -> c_int {
63        let raw = match self {
64            PAIR => zmq_sys::ZMQ_PAIR,
65            PUB => zmq_sys::ZMQ_PUB,
66            SUB => zmq_sys::ZMQ_SUB,
67            REQ => zmq_sys::ZMQ_REQ,
68            REP => zmq_sys::ZMQ_REP,
69            DEALER => zmq_sys::ZMQ_DEALER,
70            ROUTER => zmq_sys::ZMQ_ROUTER,
71            PULL => zmq_sys::ZMQ_PULL,
72            PUSH => zmq_sys::ZMQ_PUSH,
73            XPUB => zmq_sys::ZMQ_XPUB,
74            XSUB => zmq_sys::ZMQ_XSUB,
75            STREAM => zmq_sys::ZMQ_STREAM,
76        };
77        raw as c_int
78    }
79    fn from_raw(raw: c_int) -> SocketType {
80        match raw as u32 {
81            zmq_sys::ZMQ_PAIR => PAIR,
82            zmq_sys::ZMQ_PUB => PUB,
83            zmq_sys::ZMQ_SUB => SUB,
84            zmq_sys::ZMQ_REQ => REQ,
85            zmq_sys::ZMQ_REP => REP,
86            zmq_sys::ZMQ_DEALER => DEALER,
87            zmq_sys::ZMQ_ROUTER => ROUTER,
88            zmq_sys::ZMQ_PULL => PULL,
89            zmq_sys::ZMQ_PUSH => PUSH,
90            zmq_sys::ZMQ_XPUB => XPUB,
91            zmq_sys::ZMQ_XSUB => XSUB,
92            zmq_sys::ZMQ_STREAM => STREAM,
93            _ => panic!("socket type is out of range!"),
94        }
95    }
96}
97
98/// Socket Events
99#[allow(non_camel_case_types)]
100#[derive(Clone, Copy, Debug, PartialEq, Eq)]
101pub enum SocketEvent {
102    // TODO: This should become a proper enum, including the data.
103    CONNECTED = zmq_sys::ZMQ_EVENT_CONNECTED as isize,
104    CONNECT_DELAYED = zmq_sys::ZMQ_EVENT_CONNECT_DELAYED as isize,
105    CONNECT_RETRIED = zmq_sys::ZMQ_EVENT_CONNECT_RETRIED as isize,
106    LISTENING = zmq_sys::ZMQ_EVENT_LISTENING as isize,
107    BIND_FAILED = zmq_sys::ZMQ_EVENT_BIND_FAILED as isize,
108    ACCEPTED = zmq_sys::ZMQ_EVENT_ACCEPTED as isize,
109    ACCEPT_FAILED = zmq_sys::ZMQ_EVENT_ACCEPT_FAILED as isize,
110    CLOSED = zmq_sys::ZMQ_EVENT_CLOSED as isize,
111    CLOSE_FAILED = zmq_sys::ZMQ_EVENT_CLOSE_FAILED as isize,
112    DISCONNECTED = zmq_sys::ZMQ_EVENT_DISCONNECTED as isize,
113    MONITOR_STOPPED = zmq_sys::ZMQ_EVENT_MONITOR_STOPPED as isize,
114    HANDSHAKE_FAILED_NO_DETAIL = zmq_sys::ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL as isize,
115    HANDSHAKE_SUCCEEDED = zmq_sys::ZMQ_EVENT_HANDSHAKE_SUCCEEDED as isize,
116    HANDSHAKE_FAILED_PROTOCOL = zmq_sys::ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL as isize,
117    HANDSHAKE_FAILED_AUTH = zmq_sys::ZMQ_EVENT_HANDSHAKE_FAILED_AUTH as isize,
118    ALL = zmq_sys::ZMQ_EVENT_ALL as isize,
119}
120
121impl SocketEvent {
122    pub fn to_raw(self) -> u16 {
123        self as u16
124    }
125
126    // TODO: this should not need to be public
127    pub fn from_raw(raw: u16) -> SocketEvent {
128        use SocketEvent::*;
129        match u32::from(raw) {
130            zmq_sys::ZMQ_EVENT_CONNECTED => CONNECTED,
131            zmq_sys::ZMQ_EVENT_CONNECT_DELAYED => CONNECT_DELAYED,
132            zmq_sys::ZMQ_EVENT_CONNECT_RETRIED => CONNECT_RETRIED,
133            zmq_sys::ZMQ_EVENT_LISTENING => LISTENING,
134            zmq_sys::ZMQ_EVENT_BIND_FAILED => BIND_FAILED,
135            zmq_sys::ZMQ_EVENT_ACCEPTED => ACCEPTED,
136            zmq_sys::ZMQ_EVENT_ACCEPT_FAILED => ACCEPT_FAILED,
137            zmq_sys::ZMQ_EVENT_CLOSED => CLOSED,
138            zmq_sys::ZMQ_EVENT_CLOSE_FAILED => CLOSE_FAILED,
139            zmq_sys::ZMQ_EVENT_DISCONNECTED => DISCONNECTED,
140            zmq_sys::ZMQ_EVENT_MONITOR_STOPPED => MONITOR_STOPPED,
141            zmq_sys::ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL => HANDSHAKE_FAILED_NO_DETAIL,
142            zmq_sys::ZMQ_EVENT_HANDSHAKE_SUCCEEDED => HANDSHAKE_SUCCEEDED,
143            zmq_sys::ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL => HANDSHAKE_FAILED_PROTOCOL,
144            zmq_sys::ZMQ_EVENT_HANDSHAKE_FAILED_AUTH => HANDSHAKE_FAILED_AUTH,
145            zmq_sys::ZMQ_EVENT_ALL => ALL,
146            x => panic!("unknown event type {}", x),
147        }
148    }
149}
150
151/// Flag for socket `send` methods that specifies non-blocking mode.
152pub static DONTWAIT: i32 = zmq_sys::ZMQ_DONTWAIT as i32;
153/// Flag for socket `send` methods that specifies that more frames of a
154/// multipart message will follow.
155pub static SNDMORE: i32 = zmq_sys::ZMQ_SNDMORE as i32;
156
157/// Security Mechanism
158#[allow(non_camel_case_types)]
159#[derive(Clone, Copy, Debug, PartialEq, Eq)]
160pub enum Mechanism {
161    // TODO: Fix the naming
162    ZMQ_NULL,
163    ZMQ_PLAIN,
164    ZMQ_CURVE,
165    ZMQ_GSSAPI,
166}
167
168/// An error returned by a 0MQ API function.
169#[derive(Clone, Copy, Eq, PartialEq)]
170pub enum Error {
171    EACCES,
172    EADDRINUSE,
173    EAGAIN,
174    EBUSY,
175    ECONNREFUSED,
176    EFAULT,
177    EINTR,
178    EHOSTUNREACH,
179    EINPROGRESS,
180    EINVAL,
181    EMFILE,
182    EMSGSIZE,
183    ENAMETOOLONG,
184    ENODEV,
185    ENOENT,
186    ENOMEM,
187    ENOTCONN,
188    ENOTSOCK,
189    EPROTO,
190    EPROTONOSUPPORT,
191    ENOTSUP,
192    ENOBUFS,
193    ENETDOWN,
194    EADDRNOTAVAIL,
195
196    // native zmq error codes
197    EFSM,
198    ENOCOMPATPROTO,
199    ETERM,
200    EMTHREAD,
201}
202
203impl Error {
204    pub fn to_raw(self) -> i32 {
205        match self {
206            Error::EACCES => errno::EACCES,
207            Error::EADDRINUSE => errno::EADDRINUSE,
208            Error::EAGAIN => errno::EAGAIN,
209            Error::EBUSY => errno::EBUSY,
210            Error::ECONNREFUSED => errno::ECONNREFUSED,
211            Error::EFAULT => errno::EFAULT,
212            Error::EINTR => errno::EINTR,
213            Error::EHOSTUNREACH => errno::EHOSTUNREACH,
214            Error::EINPROGRESS => errno::EINPROGRESS,
215            Error::EINVAL => errno::EINVAL,
216            Error::EMFILE => errno::EMFILE,
217            Error::EMSGSIZE => errno::EMSGSIZE,
218            Error::ENAMETOOLONG => errno::ENAMETOOLONG,
219            Error::ENODEV => errno::ENODEV,
220            Error::ENOENT => errno::ENOENT,
221            Error::ENOMEM => errno::ENOMEM,
222            Error::ENOTCONN => errno::ENOTCONN,
223            Error::ENOTSOCK => errno::ENOTSOCK,
224            Error::EPROTO => errno::EPROTO,
225            Error::EPROTONOSUPPORT => errno::EPROTONOSUPPORT,
226            Error::ENOTSUP => errno::ENOTSUP,
227            Error::ENOBUFS => errno::ENOBUFS,
228            Error::ENETDOWN => errno::ENETDOWN,
229            Error::EADDRNOTAVAIL => errno::EADDRNOTAVAIL,
230
231            Error::EFSM => errno::EFSM,
232            Error::ENOCOMPATPROTO => errno::ENOCOMPATPROTO,
233            Error::ETERM => errno::ETERM,
234            Error::EMTHREAD => errno::EMTHREAD,
235        }
236    }
237
238    pub fn from_raw(raw: i32) -> Error {
239        match raw {
240            errno::EACCES => Error::EACCES,
241            errno::EADDRINUSE => Error::EADDRINUSE,
242            errno::EAGAIN => Error::EAGAIN,
243            errno::EBUSY => Error::EBUSY,
244            errno::ECONNREFUSED => Error::ECONNREFUSED,
245            errno::EFAULT => Error::EFAULT,
246            errno::EHOSTUNREACH => Error::EHOSTUNREACH,
247            errno::EINPROGRESS => Error::EINPROGRESS,
248            errno::EINVAL => Error::EINVAL,
249            errno::EMFILE => Error::EMFILE,
250            errno::EMSGSIZE => Error::EMSGSIZE,
251            errno::ENAMETOOLONG => Error::ENAMETOOLONG,
252            errno::ENODEV => Error::ENODEV,
253            errno::ENOENT => Error::ENOENT,
254            errno::ENOMEM => Error::ENOMEM,
255            errno::ENOTCONN => Error::ENOTCONN,
256            errno::ENOTSOCK => Error::ENOTSOCK,
257            errno::EPROTO => Error::EPROTO,
258            errno::EPROTONOSUPPORT => Error::EPROTONOSUPPORT,
259            errno::ENOTSUP => Error::ENOTSUP,
260            errno::ENOBUFS => Error::ENOBUFS,
261            errno::ENETDOWN => Error::ENETDOWN,
262            errno::EADDRNOTAVAIL => Error::EADDRNOTAVAIL,
263            errno::EINTR => Error::EINTR,
264
265            // These may turn up on platforms that don't support these
266            // errno codes natively (Windows)
267            errno::ENOTSUP_ALT => Error::ENOTSUP,
268            errno::EPROTONOSUPPORT_ALT => Error::EPROTONOSUPPORT,
269            errno::ENOBUFS_ALT => Error::ENOBUFS,
270            errno::ENETDOWN_ALT => Error::ENETDOWN,
271            errno::EADDRINUSE_ALT => Error::EADDRINUSE,
272            errno::EADDRNOTAVAIL_ALT => Error::EADDRNOTAVAIL,
273            errno::ECONNREFUSED_ALT => Error::ECONNREFUSED,
274            errno::EINPROGRESS_ALT => Error::EINPROGRESS,
275            errno::ENOTSOCK_ALT => Error::ENOTSOCK,
276            errno::EMSGSIZE_ALT => Error::EMSGSIZE,
277
278            // TODO: these are present in `zmq-sys`, but not handled, as that
279            // would break backwards-compatibility for the `Error` enum.
280
281            // errno::EAFNOSUPPORT_ALT => Error::EAFNOSUPPORT,
282            // errno::ENETUNREACH_ALT => Error::ENETUNREACH,
283            // errno::ECONNABORTED_ALT => Error::ECONNABORTED,
284            // errno::ECONNRESET_ALT => Error::ECONNRESET,
285            // errno::ENOTCONN_ALT => Error::ENOTCONN,
286            // errno::ETIMEDOUT_ALT => Error::ETIMEDOUT,
287            // errno::EHOSTUNREACH_ALT => Error::EHOSTUNREACH,
288            // errno::ENETRESET_ALT => Error::ENETRESET,
289
290            // 0MQ native error codes
291            errno::EFSM => Error::EFSM,
292            errno::ENOCOMPATPROTO => Error::ENOCOMPATPROTO,
293            errno::ETERM => Error::ETERM,
294            errno::EMTHREAD => Error::EMTHREAD,
295
296            x => unsafe {
297                let s = zmq_sys::zmq_strerror(x);
298                panic!(
299                    "unknown error [{}]: {}",
300                    x,
301                    str::from_utf8(ffi::CStr::from_ptr(s).to_bytes()).unwrap()
302                )
303            },
304        }
305    }
306
307    /// Returns the error message provided by 0MQ.
308    pub fn message(self) -> &'static str {
309        unsafe {
310            let s = zmq_sys::zmq_strerror(self.to_raw());
311            let v: &'static [u8] = mem::transmute(ffi::CStr::from_ptr(s).to_bytes());
312            str::from_utf8(v).unwrap()
313        }
314    }
315}
316
317impl std::error::Error for Error {
318    fn description(&self) -> &str {
319        self.message()
320    }
321}
322
323impl std::fmt::Display for Error {
324    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
325        write!(f, "{}", self.message())
326    }
327}
328
329impl fmt::Debug for Error {
330    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
331        // FIXME: An unquoted string is not a good `Debug` output.
332        write!(f, "{}", self.message())
333    }
334}
335
336impl From<Error> for std::io::Error {
337    fn from(error: Error) -> Self {
338        use std::io::ErrorKind;
339
340        let kind = match error {
341            Error::ENOENT => ErrorKind::NotFound,
342            Error::EACCES => ErrorKind::PermissionDenied,
343            Error::ECONNREFUSED => ErrorKind::ConnectionRefused,
344            Error::ENOTCONN => ErrorKind::NotConnected,
345            Error::EADDRINUSE => ErrorKind::AddrInUse,
346            Error::EADDRNOTAVAIL => ErrorKind::AddrNotAvailable,
347            Error::EAGAIN => ErrorKind::WouldBlock,
348            Error::EINVAL => ErrorKind::InvalidInput,
349            Error::EINTR => ErrorKind::Interrupted,
350            _ => ErrorKind::Other,
351        };
352        // TODO: With rust 1.14 and up there is an optimization
353        // opportunity using `std::io::Error: From<ErrorKind>` when
354        // `kind != Other`. We should do that once 1.14 has been
355        // stable for a bit.
356        std::io::Error::new(kind, error)
357    }
358}
359
360fn errno_to_error() -> Error {
361    Error::from_raw(unsafe { zmq_sys::zmq_errno() })
362}
363
364/// Return the current zeromq version, as `(major, minor, patch)`.
365pub fn version() -> (i32, i32, i32) {
366    let mut major = 0;
367    let mut minor = 0;
368    let mut patch = 0;
369
370    unsafe {
371        zmq_sys::zmq_version(&mut major, &mut minor, &mut patch);
372    }
373
374    (major as i32, minor as i32, patch as i32)
375}
376
377struct RawContext {
378    ctx: *mut c_void,
379}
380
381impl RawContext {
382    fn term(&self) -> Result<()> {
383        zmq_try!(unsafe { zmq_sys::zmq_ctx_term(self.ctx) });
384        Ok(())
385    }
386}
387
388unsafe impl Send for RawContext {}
389unsafe impl Sync for RawContext {}
390
391impl Drop for RawContext {
392    fn drop(&mut self) {
393        let mut e = self.term();
394        while e == Err(Error::EINTR) {
395            e = self.term();
396        }
397    }
398}
399
400/// Handle for a 0MQ context, used to create sockets.
401///
402/// It is thread safe, and can be safely cloned and shared. Each clone
403/// references the same underlying C context. Internally, an `Arc` is
404/// used to implement this in a threadsafe way.
405///
406/// Also note that this binding deviates from the C API in that each
407/// socket created from a context initially owns a clone of that
408/// context. This reference is kept to avoid a potential deadlock
409/// situation that would otherwise occur:
410///
411/// Destroying the underlying C context is an operation which
412/// blocks waiting for all sockets created from it to be closed
413/// first. If one of the sockets belongs to thread issuing the
414/// destroy operation, you have established a deadlock.
415///
416/// You can still deadlock yourself (or intentionally close sockets in
417/// other threads, see `zmq_ctx_destroy`(3)) by explicitly calling
418/// `Context::destroy`.
419///
420#[derive(Clone)]
421pub struct Context {
422    raw: Arc<RawContext>,
423}
424
425impl Context {
426    /// Create a new reference-counted context handle.
427    pub fn new() -> Context {
428        Context {
429            raw: Arc::new(RawContext {
430                ctx: unsafe { zmq_sys::zmq_ctx_new() },
431            }),
432        }
433    }
434
435    /// Get the size of the ØMQ thread pool to handle I/O operations.
436    pub fn get_io_threads(&self) -> Result<i32> {
437        let rc =
438            zmq_try!(unsafe { zmq_sys::zmq_ctx_get(self.raw.ctx, zmq_sys::ZMQ_IO_THREADS as _) });
439        Ok(rc as i32)
440    }
441
442    /// Set the size of the ØMQ thread pool to handle I/O operations.
443    pub fn set_io_threads(&self, value: i32) -> Result<()> {
444        zmq_try!(unsafe {
445            zmq_sys::zmq_ctx_set(self.raw.ctx, zmq_sys::ZMQ_IO_THREADS as _, value as i32)
446        });
447        Ok(())
448    }
449
450    /// Create a new socket.
451    ///
452    /// Note that the returned socket keeps a an `Arc` reference to
453    /// the context it was created from, and will keep that context
454    /// from being dropped while being live.
455    pub fn socket(&self, socket_type: SocketType) -> Result<Socket> {
456        let sock = unsafe { zmq_sys::zmq_socket(self.raw.ctx, socket_type.to_raw()) };
457
458        if sock.is_null() {
459            return Err(errno_to_error());
460        }
461
462        Ok(Socket {
463            sock,
464            context: Some(self.clone()),
465            owned: true,
466        })
467    }
468
469    /// Try to destroy the context. This is different than the destructor; the
470    /// destructor will loop when zmq_ctx_term returns EINTR.
471    pub fn destroy(&mut self) -> Result<()> {
472        self.raw.term()
473    }
474}
475
476impl Default for Context {
477    fn default() -> Self {
478        Context::new()
479    }
480}
481
482/// A socket, the central object in 0MQ.
483pub struct Socket {
484    sock: *mut c_void,
485    // The `context` field is never accessed, but implicitly does
486    // reference counting via the `Drop` trait.
487    #[allow(dead_code)]
488    context: Option<Context>,
489    owned: bool,
490}
491
492unsafe impl Send for Socket {}
493
494impl Drop for Socket {
495    fn drop(&mut self) {
496        if self.owned && unsafe { zmq_sys::zmq_close(self.sock) } == -1 {
497            panic!("{}", errno_to_error());
498        }
499    }
500}
501
502#[cfg(unix)]
503impl AsRawFd for Socket {
504    fn as_raw_fd(&self) -> UnixRawFd {
505        self.get_fd().unwrap() as UnixRawFd
506    }
507}
508
509#[cfg(windows)]
510impl AsRawSocket for Socket {
511    fn as_raw_socket(&self) -> RawSocket {
512        self.get_fd().unwrap() as RawSocket
513    }
514}
515
516macro_rules! sockopt_getter {
517    ( $(#[$meta:meta])*
518      pub $getter:ident => $constant_name:ident as $ty:ty
519    ) => {
520        $(#[$meta])*
521        pub fn $getter(&self) -> Result<$ty> {
522            <$ty as sockopt::Getter>::get(self.sock, zmq_sys::$constant_name as c_int)
523        }
524    };
525}
526
527macro_rules! sockopt_setter {
528    ( $(#[$meta:meta])*
529      pub $setter:ident => $constant_name:ident as $ty:ty
530    ) => {
531        $(#[$meta])*
532        pub fn $setter(&self, value: $ty) -> Result<()> {
533            <$ty as sockopt::Setter>::set(self.sock, zmq_sys::$constant_name as c_int, value)
534        }
535    };
536}
537
538macro_rules! sockopt_seq {
539    ( META { $($meta:meta)* }, ) => ();
540    ( META { $($meta:meta)* }, $(#[$item_meta:meta])* (_, $setter:ident) => $constant_name:ident as $ty:ty,
541      $($rest:tt)*
542    ) => {
543        sockopt_setter! {
544            $(#[$meta])* $(#[$item_meta])*
545            pub $setter => $constant_name as $ty
546        }
547        sockopt_seq!(META { $($meta)* }, $($rest)*);
548    };
549    ( META { $($meta:meta)* }, $(#[$item_meta:meta])* ($getter:ident) => $constant_name:ident as $ty:ty,
550      $($rest:tt)*
551    ) => {
552        sockopt_getter! {
553            $(#[$meta])* $(#[$item_meta])*
554            pub $getter => $constant_name as $ty
555        }
556        sockopt_seq!(META { $($meta)* }, $($rest)*);
557    };
558    ( META { $($meta:meta)* }, $(#[$item_meta:meta])* ($getter:ident, $setter:ident) => $constant_name:ident as $ty:ty,
559      $($rest:tt)*
560    ) => {
561        sockopt_getter! {
562            $(#[$meta])* $(#[$item_meta])*
563            pub $getter => $constant_name as $ty
564        }
565        sockopt_setter! {
566            $(#[$meta])* $(#[$item_meta])*
567            pub $setter => $constant_name as $ty
568        }
569        sockopt_seq!(META { $($meta)* }, $($rest)*);
570    };
571}
572
573macro_rules! sockopts {
574    () => ();
575    ( $($rest:tt)* ) => {
576        sockopt_seq!(META {}, $($rest)*);
577    };
578}
579
580/// Sendable over a `Socket`.
581///
582/// A type can implement this trait there is an especially efficient
583/// implementation for sending it as a message over a zmq socket.
584///
585/// If the type needs to be directly passed to `Socket::send()`, but
586/// the overhead of allocating a `Message` instance is not an issue,
587/// `Into<Message>` should be implemented instead.
588///
589pub trait Sendable {
590    fn send(self, socket: &Socket, flags: i32) -> Result<()>;
591}
592
593impl<T> Sendable for T
594where
595    T: Into<Message>,
596{
597    fn send(self, socket: &Socket, flags: i32) -> Result<()> {
598        let mut msg = self.into();
599        zmq_try!(unsafe { zmq_sys::zmq_msg_send(msg_ptr(&mut msg), socket.sock, flags as c_int) });
600        Ok(())
601    }
602}
603
604impl Socket {
605    /// Consume the Socket and return the raw socket pointer.
606    ///
607    /// Failure to close the raw socket manually or call `from_raw`
608    /// will lead to a memory leak. Also note that is function
609    /// relinquishes the reference on the context is was created from.
610    pub fn into_raw(mut self) -> *mut c_void {
611        self.owned = false;
612        self.sock
613    }
614
615    /// Create a Socket from a raw socket pointer.
616    ///
617    /// The Socket assumes ownership of the pointer and will close the socket
618    /// when it is dropped. The returned socket will not reference any context.
619    ///
620    /// # Safety
621    ///
622    /// The socket pointer must be a socket created via the `into_raw`
623    /// method. The ownership of the socket is transferred the returned Socket,
624    /// so any other pointers to the same socket may only be used until it is
625    /// dropped.
626    pub unsafe fn from_raw(sock: *mut c_void) -> Socket {
627        Socket {
628            sock,
629            context: None,
630            owned: true,
631        }
632    }
633
634    /// Return the inner pointer to this Socket.
635    ///
636    /// **WARNING**:
637    /// It is your responsibility to make sure that the underlying
638    /// memory is not freed too early.
639    pub fn as_mut_ptr(&mut self) -> *mut c_void {
640        self.sock
641    }
642
643    /// Accept connections on a socket.
644    pub fn bind(&self, endpoint: &str) -> Result<()> {
645        let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
646        zmq_try!(unsafe { zmq_sys::zmq_bind(self.sock, c_str.as_ptr()) });
647        Ok(())
648    }
649
650    /// Stop accepting connections on a socket
651    pub fn unbind(&self, endpoint: &str) -> Result<()> {
652        let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
653        zmq_try!(unsafe { zmq_sys::zmq_unbind(self.sock, c_str.as_ptr()) });
654        Ok(())
655    }
656
657    /// Connect a socket.
658    pub fn connect(&self, endpoint: &str) -> Result<()> {
659        let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
660        zmq_try!(unsafe { zmq_sys::zmq_connect(self.sock, c_str.as_ptr()) });
661        Ok(())
662    }
663
664    /// Disconnect a previously connected socket
665    pub fn disconnect(&self, endpoint: &str) -> Result<()> {
666        let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
667        zmq_try!(unsafe { zmq_sys::zmq_disconnect(self.sock, c_str.as_ptr()) });
668        Ok(())
669    }
670
671    /// Configure the socket for monitoring
672    pub fn monitor(&self, monitor_endpoint: &str, events: i32) -> Result<()> {
673        let c_str = ffi::CString::new(monitor_endpoint.as_bytes()).unwrap();
674        zmq_try!(unsafe {
675            zmq_sys::zmq_socket_monitor(self.sock, c_str.as_ptr(), events as c_int)
676        });
677        Ok(())
678    }
679
680    /// Send a message.
681    ///
682    /// Due to the provided `From` implementations, this works for
683    /// `&[u8]`, `Vec<u8>` and `&str` `Message` itself.
684    pub fn send<T>(&self, data: T, flags: i32) -> Result<()>
685    where
686        T: Sendable,
687    {
688        data.send(self, flags)
689    }
690
691    /// Send a `Message` message.
692    #[deprecated(since = "0.9.0", note = "Use `send` instead")]
693    pub fn send_msg(&self, msg: Message, flags: i32) -> Result<()> {
694        self.send(msg, flags)
695    }
696
697    #[deprecated(since = "0.9.0", note = "Use `send` instead")]
698    pub fn send_str(&self, data: &str, flags: i32) -> Result<()> {
699        self.send(data, flags)
700    }
701
702    pub fn send_multipart<I, T>(&self, iter: I, flags: i32) -> Result<()>
703    where
704        I: IntoIterator<Item = T>,
705        T: Into<Message>,
706    {
707        let mut last_part: Option<T> = None;
708        for part in iter {
709            let maybe_last = last_part.take();
710            if let Some(last) = maybe_last {
711                self.send(last.into(), flags | SNDMORE)?;
712            }
713            last_part = Some(part);
714        }
715        if let Some(last) = last_part {
716            self.send(last.into(), flags)
717        } else {
718            Ok(())
719        }
720    }
721
722    /// Receive a message into a `Message`. The length passed to zmq_msg_recv
723    /// is the length of the buffer.
724    pub fn recv(&self, msg: &mut Message, flags: i32) -> Result<()> {
725        zmq_try!(unsafe { zmq_sys::zmq_msg_recv(msg_ptr(msg), self.sock, flags as c_int) });
726        Ok(())
727    }
728
729    /// Receive bytes into a slice. The length passed to `zmq_recv` is the length of the slice. The
730    /// return value is the number of bytes in the message, which may be larger than the length of
731    /// the slice, indicating truncation.
732    pub fn recv_into(&self, bytes: &mut [u8], flags: i32) -> Result<usize> {
733        let bytes_ptr = bytes.as_mut_ptr() as *mut c_void;
734        let rc = zmq_try!(unsafe {
735            zmq_sys::zmq_recv(self.sock, bytes_ptr, bytes.len(), flags as c_int)
736        });
737        Ok(rc as usize)
738    }
739
740    /// Receive a message into a fresh `Message`.
741    pub fn recv_msg(&self, flags: i32) -> Result<Message> {
742        let mut msg = Message::new();
743        self.recv(&mut msg, flags).map(|_| msg)
744    }
745
746    /// Receive a message as a byte vector.
747    pub fn recv_bytes(&self, flags: i32) -> Result<Vec<u8>> {
748        self.recv_msg(flags).map(|msg| msg.to_vec())
749    }
750
751    /// Receive a `String` from the socket.
752    ///
753    /// If the received message is not valid UTF-8, it is returned as the original
754    /// Vec in the `Err` part of the inner result.
755    pub fn recv_string(&self, flags: i32) -> Result<result::Result<String, Vec<u8>>> {
756        self.recv_bytes(flags)
757            .map(|bytes| String::from_utf8(bytes).map_err(FromUtf8Error::into_bytes))
758    }
759
760    /// Receive a multipart message from the socket.
761    ///
762    /// Note that this will allocate a new vector for each message part; for
763    /// many applications it will be possible to process the different parts
764    /// sequentially and reuse allocations that way.
765    pub fn recv_multipart(&self, flags: i32) -> Result<Vec<Vec<u8>>> {
766        let mut parts: Vec<Vec<u8>> = vec![];
767        loop {
768            let part = self.recv_bytes(flags)?;
769            parts.push(part);
770
771            let more_parts = self.get_rcvmore()?;
772            if !more_parts {
773                break;
774            }
775        }
776        Ok(parts)
777    }
778
779    sockopts! {
780        /// Accessor for the `ZMQ_IPV6` option.
781        (is_ipv6, set_ipv6) => ZMQ_IPV6 as bool,
782        /// Accessor for the `ZMQ_IMMEDIATE` option.
783        (is_immediate, set_immediate) => ZMQ_IMMEDIATE as bool,
784        /// Accessor for the `ZMQ_PLAIN_SERVER` option.
785        (is_plain_server, set_plain_server) => ZMQ_PLAIN_SERVER as bool,
786        /// Accessor for the `ZMQ_CONFLATE` option.
787        (is_conflate, set_conflate) => ZMQ_CONFLATE as bool,
788        (is_probe_router, set_probe_router) => ZMQ_PROBE_ROUTER as bool,
789        (is_router_mandatory, set_router_mandatory) => ZMQ_ROUTER_MANDATORY as bool,
790        (is_router_handover, set_router_handover) => ZMQ_ROUTER_HANDOVER as bool,
791        (is_curve_server, set_curve_server) => ZMQ_CURVE_SERVER as bool,
792        (is_gssapi_server, set_gssapi_server) => ZMQ_GSSAPI_SERVER as bool,
793        (is_gssapi_plaintext, set_gssapi_plaintext) => ZMQ_GSSAPI_PLAINTEXT as bool,
794        (_, set_req_relaxed) => ZMQ_REQ_RELAXED as bool,
795        (_, set_req_correlate) => ZMQ_REQ_CORRELATE as bool,
796    }
797
798    /// Return the type of this socket.
799    pub fn get_socket_type(&self) -> Result<SocketType> {
800        sockopt::get(self.sock, zmq_sys::ZMQ_TYPE as c_int).map(SocketType::from_raw)
801    }
802
803    /// Return true if there are more frames of a multipart message to receive.
804    pub fn get_rcvmore(&self) -> Result<bool> {
805        sockopt::get(self.sock, zmq_sys::ZMQ_RCVMORE as c_int).map(|o: i64| o == 1i64)
806    }
807
808    sockopts! {
809        (get_maxmsgsize, set_maxmsgsize) => ZMQ_MAXMSGSIZE as i64,
810        (get_sndhwm, set_sndhwm) => ZMQ_SNDHWM as i32,
811        (get_rcvhwm, set_rcvhwm) => ZMQ_RCVHWM as i32,
812        (get_affinity, set_affinity) => ZMQ_AFFINITY as u64,
813        (get_rate, set_rate) => ZMQ_RATE as i32,
814        (get_recovery_ivl, set_recovery_ivl) => ZMQ_RECOVERY_IVL as i32,
815        (get_sndbuf, set_sndbuf) => ZMQ_SNDBUF as i32,
816        (get_rcvbuf, set_rcvbuf) => ZMQ_RCVBUF as i32,
817        (get_tos, set_tos) => ZMQ_TOS as i32,
818        (get_linger, set_linger) => ZMQ_LINGER as i32,
819        (get_reconnect_ivl, set_reconnect_ivl) => ZMQ_RECONNECT_IVL as i32,
820        (get_reconnect_ivl_max, set_reconnect_ivl_max) => ZMQ_RECONNECT_IVL_MAX as i32,
821        (get_backlog, set_backlog) => ZMQ_BACKLOG as i32,
822
823        /// Get the event notification file descriptor.
824        ///
825        /// Getter for the `ZMQ_FD` option. Note that the returned
826        /// type is platform-specific; it aliases either
827        /// `std::os::unix::io::RawFd` and or
828        /// `std::os::windows::io::RawSocket`.
829        ///
830        /// Note that the returned file desciptor has special
831        /// semantics: it should only used with an operating system
832        /// facility like Unix `poll()` to check its readability.
833        (get_fd) => ZMQ_FD as RawFd,
834
835        /// Get the currently pending events.
836        ///
837        /// Note that the result of this function can also change due
838        /// to receiving or sending a message on the socket, without
839        /// the signalling FD (see `Socket::get_fd()`).
840        ///
841        /// # Examples
842        ///
843        /// ```
844        /// use zmq;
845        /// let ctx = zmq::Context::new();
846        /// let socket = ctx.socket(zmq::REQ).unwrap();
847        /// let events = socket.get_events().unwrap();
848        /// if events.contains(zmq::POLLIN) {
849        ///   println!("socket readable")
850        /// }
851        /// drop(socket);
852        /// ```
853        ///
854        /// # Compatibility
855        ///
856        /// This function currently returns the bitmask as an `i32`
857        /// for backwards compatibility; in effect it should have been
858        /// using the same type as `PollItem::get_revents()` all
859        /// along.
860        ///
861        /// In the `0.9` series, this will be rectified.
862        (get_events) => ZMQ_EVENTS as PollEvents,
863
864        (get_multicast_hops, set_multicast_hops) => ZMQ_MULTICAST_HOPS as i32,
865        (get_rcvtimeo, set_rcvtimeo) => ZMQ_RCVTIMEO as i32,
866        (get_sndtimeo, set_sndtimeo) => ZMQ_SNDTIMEO as i32,
867        (get_tcp_keepalive, set_tcp_keepalive) => ZMQ_TCP_KEEPALIVE as i32,
868        (get_tcp_keepalive_cnt, set_tcp_keepalive_cnt) => ZMQ_TCP_KEEPALIVE_CNT as i32,
869        (get_tcp_keepalive_idle, set_tcp_keepalive_idle) => ZMQ_TCP_KEEPALIVE_IDLE as i32,
870        (get_tcp_keepalive_intvl, set_tcp_keepalive_intvl) => ZMQ_TCP_KEEPALIVE_INTVL as i32,
871        (get_handshake_ivl, set_handshake_ivl) => ZMQ_HANDSHAKE_IVL as i32,
872        // TODO: deprecate to align with ZMQ's preferred naming
873        (_, set_identity) => ZMQ_ROUTING_ID as &[u8],
874        (_, set_subscribe) => ZMQ_SUBSCRIBE as &[u8],
875        (_, set_unsubscribe) => ZMQ_UNSUBSCRIBE as &[u8],
876        (get_heartbeat_ivl, set_heartbeat_ivl) => ZMQ_HEARTBEAT_IVL as i32,
877        (get_heartbeat_ttl, set_heartbeat_ttl) => ZMQ_HEARTBEAT_TTL as i32,
878        (get_heartbeat_timeout, set_heartbeat_timeout) => ZMQ_HEARTBEAT_TIMEOUT as i32,
879        (get_connect_timeout, set_connect_timeout) => ZMQ_CONNECT_TIMEOUT as i32,
880    }
881
882    // TODO: deprecate to align with ZMQ's preferred naming
883    pub fn get_identity(&self) -> Result<Vec<u8>> {
884        // 255 = identity max length
885        sockopt::get_bytes(self.sock, zmq_sys::ZMQ_ROUTING_ID as c_int, 255)
886    }
887
888    pub fn get_socks_proxy(&self) -> Result<result::Result<String, Vec<u8>>> {
889        // 255 = longest allowable domain name is 253 so this should
890        // be a reasonable size.
891        sockopt::get_string(self.sock, zmq_sys::ZMQ_SOCKS_PROXY as c_int, 255, true)
892    }
893
894    pub fn get_mechanism(&self) -> Result<Mechanism> {
895        sockopt::get(self.sock, zmq_sys::ZMQ_MECHANISM as c_int).map(|mech| match mech {
896            zmq_sys::ZMQ_NULL => Mechanism::ZMQ_NULL,
897            zmq_sys::ZMQ_PLAIN => Mechanism::ZMQ_PLAIN,
898            zmq_sys::ZMQ_CURVE => Mechanism::ZMQ_CURVE,
899            zmq_sys::ZMQ_GSSAPI => Mechanism::ZMQ_GSSAPI,
900            _ => panic!("Mechanism is out of range!"),
901        })
902    }
903
904    pub fn get_plain_username(&self) -> Result<result::Result<String, Vec<u8>>> {
905        // 255 = arbitrary size
906        sockopt::get_string(self.sock, zmq_sys::ZMQ_PLAIN_USERNAME as c_int, 255, true)
907    }
908
909    pub fn get_plain_password(&self) -> Result<result::Result<String, Vec<u8>>> {
910        // 256 = arbitrary size based on std crypto key size
911        sockopt::get_string(self.sock, zmq_sys::ZMQ_PLAIN_PASSWORD as c_int, 256, true)
912    }
913
914    pub fn get_zap_domain(&self) -> Result<result::Result<String, Vec<u8>>> {
915        // 255 = arbitrary size
916        sockopt::get_string(self.sock, zmq_sys::ZMQ_ZAP_DOMAIN as c_int, 255, true)
917    }
918
919    /// Return the address of the last endpoint this socket was bound to.
920    ///
921    /// Note that the returned address is not guaranteed to be the
922    /// same as the one used with `bind`, and might also not be
923    /// directly usable with `connect`. In particular, when `bind` is
924    /// used with the wildcard address (`"*"`), in the address
925    /// returned, the wildcard will be expanded into the any address
926    /// (i.e. `0.0.0.0` with IPv4).
927    pub fn get_last_endpoint(&self) -> Result<result::Result<String, Vec<u8>>> {
928        // 256 + 9 + 1 = maximum inproc name size (= 256) + "inproc://".len() (= 9), plus null byte
929        sockopt::get_string(
930            self.sock,
931            zmq_sys::ZMQ_LAST_ENDPOINT as c_int,
932            256 + 9 + 1,
933            true,
934        )
935    }
936
937    /// Set the `ZMQ_CURVE_PUBLICKEY` option value.
938    ///
939    /// The key is returned as raw bytes. Use `z85_encode` on the
940    /// resulting data to get the Z85-encoded string representation of
941    /// the key.
942    pub fn get_curve_publickey(&self) -> Result<Vec<u8>> {
943        sockopt::get_bytes(self.sock, zmq_sys::ZMQ_CURVE_PUBLICKEY as c_int, 32)
944    }
945
946    /// Get the `ZMQ_CURVE_SECRETKEY` option value.
947    ///
948    /// The key is returned as raw bytes. Use `z85_encode` on the
949    /// resulting data to get the Z85-encoded string representation of
950    /// the key.
951    pub fn get_curve_secretkey(&self) -> Result<Vec<u8>> {
952        sockopt::get_bytes(self.sock, zmq_sys::ZMQ_CURVE_SECRETKEY as c_int, 32)
953    }
954
955    /// Get `ZMQ_CURVE_SERVERKEY` option value.
956    ///
957    /// Note that the key is returned as raw bytes, as a 32-byte
958    /// vector. Use `z85_encode()` explicitly to obtain the
959    /// Z85-encoded string variant.
960    pub fn get_curve_serverkey(&self) -> Result<Vec<u8>> {
961        // 41 = Z85 encoded keysize + 1 for null byte
962        sockopt::get_bytes(self.sock, zmq_sys::ZMQ_CURVE_SERVERKEY as c_int, 32)
963    }
964
965    pub fn get_gssapi_principal(&self) -> Result<result::Result<String, Vec<u8>>> {
966        // 260 = best guess of max length based on docs.
967        sockopt::get_string(self.sock, zmq_sys::ZMQ_GSSAPI_PRINCIPAL as c_int, 260, true)
968    }
969
970    pub fn get_gssapi_service_principal(&self) -> Result<result::Result<String, Vec<u8>>> {
971        // 260 = best guess of max length based on docs.
972        sockopt::get_string(
973            self.sock,
974            zmq_sys::ZMQ_GSSAPI_SERVICE_PRINCIPAL as c_int,
975            260,
976            true,
977        )
978    }
979
980    sockopts! {
981        (_, set_socks_proxy) => ZMQ_SOCKS_PROXY as Option<&str>,
982        (_, set_plain_username) => ZMQ_PLAIN_USERNAME as Option<&str>,
983        (_, set_plain_password) => ZMQ_PLAIN_PASSWORD as Option<&str>,
984        (_, set_zap_domain) => ZMQ_ZAP_DOMAIN as &str,
985        (_, set_xpub_welcome_msg) => ZMQ_XPUB_WELCOME_MSG as Option<&str>,
986        (_, set_xpub_verbose) => ZMQ_XPUB_VERBOSE as bool,
987
988        (_, set_curve_publickey) => ZMQ_CURVE_PUBLICKEY as &[u8],
989        (_, set_curve_secretkey) => ZMQ_CURVE_SECRETKEY as &[u8],
990        (_, set_curve_serverkey) => ZMQ_CURVE_SERVERKEY as &[u8],
991        (_, set_gssapi_principal) => ZMQ_GSSAPI_PRINCIPAL as &str,
992        (_, set_gssapi_service_principal) => ZMQ_GSSAPI_SERVICE_PRINCIPAL as &str,
993    }
994
995    /// Create a `PollItem` from the socket.
996    pub fn as_poll_item(&self, events: PollEvents) -> PollItem {
997        PollItem {
998            socket: self.sock,
999            fd: 0,
1000            events: events.bits(),
1001            revents: 0,
1002            marker: PhantomData,
1003        }
1004    }
1005
1006    /// Do a call to `zmq_poll` with only this socket.
1007    ///
1008    /// The return value on success will be either zero (no event) or one (some
1009    /// event was signaled).
1010    pub fn poll(&self, events: PollEvents, timeout_ms: i64) -> Result<i32> {
1011        poll(&mut [self.as_poll_item(events)], timeout_ms)
1012    }
1013}
1014
1015// TODO: Duplicating the values inside the bitflags struct and on the top level
1016// is unfortunate.
1017bitflags! {
1018    /// Type representing pending socket events.
1019    pub struct PollEvents: i16 {
1020        /// For `poll()`, specifies to signal when a message/some data
1021        /// can be read from a socket.
1022        const POLLIN = zmq_sys::ZMQ_POLLIN as i16;
1023        /// For `poll()`, specifies to signal when a message/some data
1024        /// can be written to a socket.
1025        const POLLOUT = zmq_sys::ZMQ_POLLOUT as i16;
1026        /// For `poll()`, specifies to signal when an error condition
1027        /// is present on a socket.  This only applies to non-0MQ
1028        /// sockets.
1029        const POLLERR = zmq_sys::ZMQ_POLLERR as i16;
1030    }
1031}
1032
1033/// For `poll()`, specifies to signal when a message/some data can be
1034/// read from a socket.
1035pub const POLLIN: PollEvents = PollEvents::POLLIN;
1036
1037/// For `poll()`, specifies to signal when a message/some data can be
1038/// written to a socket.
1039pub const POLLOUT: PollEvents = PollEvents::POLLOUT;
1040
1041/// For `poll()`, specifies to signal when an error condition is
1042/// present on a socket.  This only applies to non-0MQ sockets.
1043pub const POLLERR: PollEvents = PollEvents::POLLERR;
1044
1045/// Represents a handle that can be `poll()`ed.
1046///
1047/// This is either a reference to a 0MQ socket, or a standard socket.
1048/// Apart from that it contains the requested event mask, and is updated
1049/// with the occurred events after `poll()` finishes.
1050#[repr(C)]
1051pub struct PollItem<'a> {
1052    socket: *mut c_void,
1053    fd: RawFd,
1054    events: c_short,
1055    revents: c_short,
1056    marker: PhantomData<&'a Socket>,
1057}
1058
1059impl<'a> PollItem<'a> {
1060    /// Construct a PollItem from a non-0MQ socket, given by its file
1061    /// descriptor and the events that should be polled.
1062    pub fn from_fd(fd: RawFd, events: PollEvents) -> PollItem<'a> {
1063        PollItem {
1064            socket: ptr::null_mut(),
1065            fd,
1066            events: events.bits(),
1067            revents: 0,
1068            marker: PhantomData,
1069        }
1070    }
1071
1072    /// Change the events to wait for.
1073    pub fn set_events(&mut self, events: PollEvents) {
1074        self.events = events.bits();
1075    }
1076
1077    /// Retrieve the events that occurred for this handle.
1078    pub fn get_revents(&self) -> PollEvents {
1079        PollEvents::from_bits_truncate(self.revents)
1080    }
1081
1082    /// Returns true if the polled socket has messages ready to receive.
1083    pub fn is_readable(&self) -> bool {
1084        (self.revents & POLLIN.bits()) != 0
1085    }
1086
1087    /// Returns true if the polled socket can accept messages to be sent
1088    /// without blocking.
1089    pub fn is_writable(&self) -> bool {
1090        (self.revents & POLLOUT.bits()) != 0
1091    }
1092
1093    /// Returns true if the polled socket encountered an error condition.
1094    pub fn is_error(&self) -> bool {
1095        (self.revents & POLLERR.bits()) != 0
1096    }
1097
1098    /// Returns true if the polled socket is the given 0MQ socket.
1099    pub fn has_socket(&self, socket: &Socket) -> bool {
1100        self.socket == socket.sock
1101    }
1102
1103    /// Returns true if the polled socket is the given file descriptor.
1104    pub fn has_fd(&self, fd: RawFd) -> bool {
1105        self.socket.is_null() && self.fd == fd
1106    }
1107}
1108
1109/// Poll for events on multiple sockets.
1110///
1111/// For every poll item given, the events given in the `events` bitmask are
1112/// monitored, and signaled in `revents` when they occur. Any number of poll
1113/// items can have events signaled when the function returns.
1114///
1115/// The given timeout is in milliseconds and can be zero. A timeout of `-1`
1116/// indicates to block indefinitely until an event has occurred.
1117///
1118/// The result, if not `Err`, indicates the number of poll items that have
1119/// events signaled.
1120pub fn poll(items: &mut [PollItem], timeout: i64) -> Result<i32> {
1121    let rc = zmq_try!(unsafe {
1122        zmq_sys::zmq_poll(
1123            items.as_mut_ptr() as *mut zmq_sys::zmq_pollitem_t,
1124            items.len() as c_int,
1125            timeout as c_long,
1126        )
1127    });
1128    Ok(rc as i32)
1129}
1130
1131/// Start a 0MQ proxy in the current thread.
1132///
1133/// A proxy connects a frontend socket with a backend socket, where the exact
1134/// behavior depends on the type of both sockets.
1135///
1136/// This function only returns (always with an `Err`) when the sockets' context
1137/// has been closed.
1138pub fn proxy(frontend: &Socket, backend: &Socket) -> Result<()> {
1139    zmq_try!(unsafe { zmq_sys::zmq_proxy(frontend.sock, backend.sock, ptr::null_mut()) });
1140    Ok(())
1141}
1142
1143/// Start a 0MQ proxy in the current thread, with a capture socket.
1144///
1145/// The capture socket is sent all messages received on the frontend and backend
1146/// sockets.
1147pub fn proxy_with_capture(
1148    frontend: &mut Socket,
1149    backend: &mut Socket,
1150    capture: &mut Socket,
1151) -> Result<()> {
1152    zmq_try!(unsafe { zmq_sys::zmq_proxy(frontend.sock, backend.sock, capture.sock) });
1153    Ok(())
1154}
1155
1156/// Start a 0MQ proxy in the current thread, with a control socket.
1157///
1158/// If PAUSE is received on the control socket, the proxy suspends its activities. If RESUME is received,
1159/// it goes on. If TERMINATE is received, it terminates smoothly. At start, the proxy runs normally
1160/// as if `proxy` was used.
1161pub fn proxy_steerable(
1162    frontend: &mut Socket,
1163    backend: &mut Socket,
1164    control: &mut Socket,
1165) -> Result<()> {
1166    zmq_try!(unsafe {
1167        zmq_sys::zmq_proxy_steerable(frontend.sock, backend.sock, ptr::null_mut(), control.sock)
1168    });
1169    Ok(())
1170}
1171
1172/// Start a 0MQ proxy in the current thread, with capture and control sockets.
1173///
1174/// Provides a steerable proxy with a capture socket. See `proxy_with_capture`
1175pub fn proxy_steerable_with_capture(
1176    frontend: &mut Socket,
1177    backend: &mut Socket,
1178    capture: &mut Socket,
1179    control: &mut Socket,
1180) -> Result<()> {
1181    zmq_try!(unsafe {
1182        zmq_sys::zmq_proxy_steerable(frontend.sock, backend.sock, capture.sock, control.sock)
1183    });
1184    Ok(())
1185}
1186
1187/// Return true if the used 0MQ library has the given capability.
1188///
1189/// The return value is always the `Some` variant; it used to return
1190/// `None` for older, now unsupported versions of 0MQ that didn't have
1191/// the wrapped `zmq_has` function. Thus, for code that requires `zmq`
1192/// version 0.9.0 or newer, you can safely call `unwrap` on the return
1193/// value.
1194///
1195/// For a list of capabilities, please consult the `zmq_has` manual
1196/// page.
1197///
1198/// # Compatibility
1199///
1200/// In the `zmq` 0.10.0, this function will simply return `bool`.
1201///
1202pub fn has(capability: &str) -> Option<bool> {
1203    let c_str = ffi::CString::new(capability).unwrap();
1204    unsafe { Some(zmq_sys::zmq_has(c_str.as_ptr()) == 1) }
1205}
1206
1207/// A CURVE key pair generated by 0MQ.
1208///
1209/// Note that for API consistency reasons, since version 0.9, the key
1210/// pair is represented in the binary form. This is in contrast to
1211/// libzmq, which returns the z85-encoded representation.
1212#[derive(Debug)]
1213pub struct CurveKeyPair {
1214    pub public_key: [u8; 32],
1215    pub secret_key: [u8; 32],
1216}
1217
1218impl CurveKeyPair {
1219    /// Create a new key pair.
1220    pub fn new() -> Result<CurveKeyPair> {
1221        // Curve keypairs are currently 40 bytes long, plus terminating NULL.
1222        let mut ffi_public_key = [0u8; 41];
1223        let mut ffi_secret_key = [0u8; 41];
1224
1225        zmq_try!(unsafe {
1226            zmq_sys::zmq_curve_keypair(
1227                ffi_public_key.as_mut_ptr() as *mut libc::c_char,
1228                ffi_secret_key.as_mut_ptr() as *mut libc::c_char,
1229            )
1230        });
1231
1232        let mut pair = CurveKeyPair {
1233            public_key: [0; 32],
1234            secret_key: [0; 32],
1235        };
1236        unsafe {
1237            // No need to check return code here, as zmq_curve_keypair
1238            // is supposed to generate valid z85-encoded keys
1239            zmq_sys::zmq_z85_decode(
1240                pair.public_key.as_mut_ptr(),
1241                ffi_public_key.as_ptr() as *mut libc::c_char,
1242            );
1243            zmq_sys::zmq_z85_decode(
1244                pair.secret_key.as_mut_ptr(),
1245                ffi_secret_key.as_ptr() as *mut libc::c_char,
1246            );
1247        }
1248
1249        Ok(pair)
1250    }
1251}
1252
1253/// Errors that can occur while encoding Z85.
1254#[derive(Debug)]
1255pub enum EncodeError {
1256    BadLength,
1257    FromUtf8Error(FromUtf8Error),
1258}
1259
1260impl From<FromUtf8Error> for EncodeError {
1261    fn from(err: FromUtf8Error) -> Self {
1262        EncodeError::FromUtf8Error(err)
1263    }
1264}
1265
1266impl fmt::Display for EncodeError {
1267    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1268        match *self {
1269            EncodeError::BadLength => write!(f, "Invalid data length. Should be multiple of 4."),
1270            EncodeError::FromUtf8Error(ref e) => write!(f, "UTF8 conversion error: {}", e),
1271        }
1272    }
1273}
1274
1275impl std::error::Error for EncodeError {}
1276
1277/// Encode a binary key as Z85 printable text.
1278///
1279/// Z85 is an encoding similar to Base64, but operates on 4-byte chunks,
1280/// which are encoded into 5-byte sequences.
1281///
1282/// The input slice *must* have a length divisible by 4.
1283pub fn z85_encode(data: &[u8]) -> result::Result<String, EncodeError> {
1284    if data.len() % 4 != 0 {
1285        return Err(EncodeError::BadLength);
1286    }
1287
1288    let len = data.len() * 5 / 4 + 1;
1289    let mut dest = vec![0u8; len];
1290
1291    unsafe {
1292        zmq_sys::zmq_z85_encode(
1293            dest.as_mut_ptr() as *mut libc::c_char,
1294            data.as_ptr(),
1295            data.len(),
1296        );
1297    }
1298
1299    dest.truncate(len - 1);
1300    String::from_utf8(dest).map_err(EncodeError::FromUtf8Error)
1301}
1302
1303/// Errors that can occur while decoding Z85.
1304#[derive(Debug)]
1305pub enum DecodeError {
1306    /// The input string slice's length was not a multiple of 5.
1307    BadLength,
1308    /// The input string slice had embedded NUL bytes.
1309    NulError(ffi::NulError),
1310}
1311
1312impl From<ffi::NulError> for DecodeError {
1313    fn from(err: ffi::NulError) -> Self {
1314        DecodeError::NulError(err)
1315    }
1316}
1317
1318impl fmt::Display for DecodeError {
1319    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1320        match *self {
1321            DecodeError::BadLength => write!(f, "Invalid data length. Should be multiple of 5."),
1322            DecodeError::NulError(ref e) => write!(f, "Nul byte error: {}", e),
1323        }
1324    }
1325}
1326
1327impl std::error::Error for DecodeError {}
1328
1329/// Decode a binary key from Z85-encoded text.
1330///
1331/// The input string must have a length divisible by 5.
1332///
1333/// Note that 0MQ silently accepts characters outside the range defined for
1334/// the Z85 encoding.
1335pub fn z85_decode(data: &str) -> result::Result<Vec<u8>, DecodeError> {
1336    if data.len() % 5 != 0 {
1337        return Err(DecodeError::BadLength);
1338    }
1339
1340    let len = data.len() * 4 / 5;
1341    let mut dest = vec![0u8; len];
1342
1343    let c_str = ffi::CString::new(data)?;
1344
1345    unsafe {
1346        zmq_sys::zmq_z85_decode(dest.as_mut_ptr(), c_str.into_raw());
1347    }
1348
1349    Ok(dest)
1350}