1#![allow(trivial_numeric_casts)]
4
5use bitflags::bitflags;
6use libc::{c_int, c_long, c_short};
7
8use std::ffi;
9use std::fmt;
10use std::marker::PhantomData;
11use std::os::raw::c_void;
12#[cfg(unix)]
13use std::os::unix::io::{AsRawFd, RawFd as UnixRawFd};
14#[cfg(windows)]
15use std::os::windows::io::{AsRawSocket, RawSocket};
16use std::result;
17use std::string::FromUtf8Error;
18use std::sync::Arc;
19use std::{mem, ptr, str};
20
21use zmq_sys::{errno, RawFd};
22
23macro_rules! zmq_try {
24 ($($tt:tt)*) => {{
25 let rc = $($tt)*;
26 if rc == -1 {
27 return Err(crate::errno_to_error());
28 }
29 rc
30 }}
31}
32
33mod message;
34mod sockopt;
35
36use crate::message::msg_ptr;
37pub use crate::message::Message;
38pub use crate::SocketType::*;
39
40pub type Result<T> = result::Result<T, Error>;
42
43#[allow(non_camel_case_types)]
45#[derive(Clone, Copy, Debug, PartialEq, Eq)]
46pub enum SocketType {
47 PAIR,
48 PUB,
49 SUB,
50 REQ,
51 REP,
52 DEALER,
53 ROUTER,
54 PULL,
55 PUSH,
56 XPUB,
57 XSUB,
58 STREAM,
59}
60
61impl SocketType {
62 fn to_raw(self) -> c_int {
63 let raw = match self {
64 PAIR => zmq_sys::ZMQ_PAIR,
65 PUB => zmq_sys::ZMQ_PUB,
66 SUB => zmq_sys::ZMQ_SUB,
67 REQ => zmq_sys::ZMQ_REQ,
68 REP => zmq_sys::ZMQ_REP,
69 DEALER => zmq_sys::ZMQ_DEALER,
70 ROUTER => zmq_sys::ZMQ_ROUTER,
71 PULL => zmq_sys::ZMQ_PULL,
72 PUSH => zmq_sys::ZMQ_PUSH,
73 XPUB => zmq_sys::ZMQ_XPUB,
74 XSUB => zmq_sys::ZMQ_XSUB,
75 STREAM => zmq_sys::ZMQ_STREAM,
76 };
77 raw as c_int
78 }
79 fn from_raw(raw: c_int) -> SocketType {
80 match raw as u32 {
81 zmq_sys::ZMQ_PAIR => PAIR,
82 zmq_sys::ZMQ_PUB => PUB,
83 zmq_sys::ZMQ_SUB => SUB,
84 zmq_sys::ZMQ_REQ => REQ,
85 zmq_sys::ZMQ_REP => REP,
86 zmq_sys::ZMQ_DEALER => DEALER,
87 zmq_sys::ZMQ_ROUTER => ROUTER,
88 zmq_sys::ZMQ_PULL => PULL,
89 zmq_sys::ZMQ_PUSH => PUSH,
90 zmq_sys::ZMQ_XPUB => XPUB,
91 zmq_sys::ZMQ_XSUB => XSUB,
92 zmq_sys::ZMQ_STREAM => STREAM,
93 _ => panic!("socket type is out of range!"),
94 }
95 }
96}
97
98#[allow(non_camel_case_types)]
100#[derive(Clone, Copy, Debug, PartialEq, Eq)]
101pub enum SocketEvent {
102 CONNECTED = zmq_sys::ZMQ_EVENT_CONNECTED as isize,
104 CONNECT_DELAYED = zmq_sys::ZMQ_EVENT_CONNECT_DELAYED as isize,
105 CONNECT_RETRIED = zmq_sys::ZMQ_EVENT_CONNECT_RETRIED as isize,
106 LISTENING = zmq_sys::ZMQ_EVENT_LISTENING as isize,
107 BIND_FAILED = zmq_sys::ZMQ_EVENT_BIND_FAILED as isize,
108 ACCEPTED = zmq_sys::ZMQ_EVENT_ACCEPTED as isize,
109 ACCEPT_FAILED = zmq_sys::ZMQ_EVENT_ACCEPT_FAILED as isize,
110 CLOSED = zmq_sys::ZMQ_EVENT_CLOSED as isize,
111 CLOSE_FAILED = zmq_sys::ZMQ_EVENT_CLOSE_FAILED as isize,
112 DISCONNECTED = zmq_sys::ZMQ_EVENT_DISCONNECTED as isize,
113 MONITOR_STOPPED = zmq_sys::ZMQ_EVENT_MONITOR_STOPPED as isize,
114 HANDSHAKE_FAILED_NO_DETAIL = zmq_sys::ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL as isize,
115 HANDSHAKE_SUCCEEDED = zmq_sys::ZMQ_EVENT_HANDSHAKE_SUCCEEDED as isize,
116 HANDSHAKE_FAILED_PROTOCOL = zmq_sys::ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL as isize,
117 HANDSHAKE_FAILED_AUTH = zmq_sys::ZMQ_EVENT_HANDSHAKE_FAILED_AUTH as isize,
118 ALL = zmq_sys::ZMQ_EVENT_ALL as isize,
119}
120
121impl SocketEvent {
122 pub fn to_raw(self) -> u16 {
123 self as u16
124 }
125
126 pub fn from_raw(raw: u16) -> SocketEvent {
128 use SocketEvent::*;
129 match u32::from(raw) {
130 zmq_sys::ZMQ_EVENT_CONNECTED => CONNECTED,
131 zmq_sys::ZMQ_EVENT_CONNECT_DELAYED => CONNECT_DELAYED,
132 zmq_sys::ZMQ_EVENT_CONNECT_RETRIED => CONNECT_RETRIED,
133 zmq_sys::ZMQ_EVENT_LISTENING => LISTENING,
134 zmq_sys::ZMQ_EVENT_BIND_FAILED => BIND_FAILED,
135 zmq_sys::ZMQ_EVENT_ACCEPTED => ACCEPTED,
136 zmq_sys::ZMQ_EVENT_ACCEPT_FAILED => ACCEPT_FAILED,
137 zmq_sys::ZMQ_EVENT_CLOSED => CLOSED,
138 zmq_sys::ZMQ_EVENT_CLOSE_FAILED => CLOSE_FAILED,
139 zmq_sys::ZMQ_EVENT_DISCONNECTED => DISCONNECTED,
140 zmq_sys::ZMQ_EVENT_MONITOR_STOPPED => MONITOR_STOPPED,
141 zmq_sys::ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL => HANDSHAKE_FAILED_NO_DETAIL,
142 zmq_sys::ZMQ_EVENT_HANDSHAKE_SUCCEEDED => HANDSHAKE_SUCCEEDED,
143 zmq_sys::ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL => HANDSHAKE_FAILED_PROTOCOL,
144 zmq_sys::ZMQ_EVENT_HANDSHAKE_FAILED_AUTH => HANDSHAKE_FAILED_AUTH,
145 zmq_sys::ZMQ_EVENT_ALL => ALL,
146 x => panic!("unknown event type {}", x),
147 }
148 }
149}
150
151pub static DONTWAIT: i32 = zmq_sys::ZMQ_DONTWAIT as i32;
153pub static SNDMORE: i32 = zmq_sys::ZMQ_SNDMORE as i32;
156
157#[allow(non_camel_case_types)]
159#[derive(Clone, Copy, Debug, PartialEq, Eq)]
160pub enum Mechanism {
161 ZMQ_NULL,
163 ZMQ_PLAIN,
164 ZMQ_CURVE,
165 ZMQ_GSSAPI,
166}
167
168#[derive(Clone, Copy, Eq, PartialEq)]
170pub enum Error {
171 EACCES,
172 EADDRINUSE,
173 EAGAIN,
174 EBUSY,
175 ECONNREFUSED,
176 EFAULT,
177 EINTR,
178 EHOSTUNREACH,
179 EINPROGRESS,
180 EINVAL,
181 EMFILE,
182 EMSGSIZE,
183 ENAMETOOLONG,
184 ENODEV,
185 ENOENT,
186 ENOMEM,
187 ENOTCONN,
188 ENOTSOCK,
189 EPROTO,
190 EPROTONOSUPPORT,
191 ENOTSUP,
192 ENOBUFS,
193 ENETDOWN,
194 EADDRNOTAVAIL,
195
196 EFSM,
198 ENOCOMPATPROTO,
199 ETERM,
200 EMTHREAD,
201}
202
203impl Error {
204 pub fn to_raw(self) -> i32 {
205 match self {
206 Error::EACCES => errno::EACCES,
207 Error::EADDRINUSE => errno::EADDRINUSE,
208 Error::EAGAIN => errno::EAGAIN,
209 Error::EBUSY => errno::EBUSY,
210 Error::ECONNREFUSED => errno::ECONNREFUSED,
211 Error::EFAULT => errno::EFAULT,
212 Error::EINTR => errno::EINTR,
213 Error::EHOSTUNREACH => errno::EHOSTUNREACH,
214 Error::EINPROGRESS => errno::EINPROGRESS,
215 Error::EINVAL => errno::EINVAL,
216 Error::EMFILE => errno::EMFILE,
217 Error::EMSGSIZE => errno::EMSGSIZE,
218 Error::ENAMETOOLONG => errno::ENAMETOOLONG,
219 Error::ENODEV => errno::ENODEV,
220 Error::ENOENT => errno::ENOENT,
221 Error::ENOMEM => errno::ENOMEM,
222 Error::ENOTCONN => errno::ENOTCONN,
223 Error::ENOTSOCK => errno::ENOTSOCK,
224 Error::EPROTO => errno::EPROTO,
225 Error::EPROTONOSUPPORT => errno::EPROTONOSUPPORT,
226 Error::ENOTSUP => errno::ENOTSUP,
227 Error::ENOBUFS => errno::ENOBUFS,
228 Error::ENETDOWN => errno::ENETDOWN,
229 Error::EADDRNOTAVAIL => errno::EADDRNOTAVAIL,
230
231 Error::EFSM => errno::EFSM,
232 Error::ENOCOMPATPROTO => errno::ENOCOMPATPROTO,
233 Error::ETERM => errno::ETERM,
234 Error::EMTHREAD => errno::EMTHREAD,
235 }
236 }
237
238 pub fn from_raw(raw: i32) -> Error {
239 match raw {
240 errno::EACCES => Error::EACCES,
241 errno::EADDRINUSE => Error::EADDRINUSE,
242 errno::EAGAIN => Error::EAGAIN,
243 errno::EBUSY => Error::EBUSY,
244 errno::ECONNREFUSED => Error::ECONNREFUSED,
245 errno::EFAULT => Error::EFAULT,
246 errno::EHOSTUNREACH => Error::EHOSTUNREACH,
247 errno::EINPROGRESS => Error::EINPROGRESS,
248 errno::EINVAL => Error::EINVAL,
249 errno::EMFILE => Error::EMFILE,
250 errno::EMSGSIZE => Error::EMSGSIZE,
251 errno::ENAMETOOLONG => Error::ENAMETOOLONG,
252 errno::ENODEV => Error::ENODEV,
253 errno::ENOENT => Error::ENOENT,
254 errno::ENOMEM => Error::ENOMEM,
255 errno::ENOTCONN => Error::ENOTCONN,
256 errno::ENOTSOCK => Error::ENOTSOCK,
257 errno::EPROTO => Error::EPROTO,
258 errno::EPROTONOSUPPORT => Error::EPROTONOSUPPORT,
259 errno::ENOTSUP => Error::ENOTSUP,
260 errno::ENOBUFS => Error::ENOBUFS,
261 errno::ENETDOWN => Error::ENETDOWN,
262 errno::EADDRNOTAVAIL => Error::EADDRNOTAVAIL,
263 errno::EINTR => Error::EINTR,
264
265 errno::ENOTSUP_ALT => Error::ENOTSUP,
268 errno::EPROTONOSUPPORT_ALT => Error::EPROTONOSUPPORT,
269 errno::ENOBUFS_ALT => Error::ENOBUFS,
270 errno::ENETDOWN_ALT => Error::ENETDOWN,
271 errno::EADDRINUSE_ALT => Error::EADDRINUSE,
272 errno::EADDRNOTAVAIL_ALT => Error::EADDRNOTAVAIL,
273 errno::ECONNREFUSED_ALT => Error::ECONNREFUSED,
274 errno::EINPROGRESS_ALT => Error::EINPROGRESS,
275 errno::ENOTSOCK_ALT => Error::ENOTSOCK,
276 errno::EMSGSIZE_ALT => Error::EMSGSIZE,
277
278 errno::EFSM => Error::EFSM,
292 errno::ENOCOMPATPROTO => Error::ENOCOMPATPROTO,
293 errno::ETERM => Error::ETERM,
294 errno::EMTHREAD => Error::EMTHREAD,
295
296 x => unsafe {
297 let s = zmq_sys::zmq_strerror(x);
298 panic!(
299 "unknown error [{}]: {}",
300 x,
301 str::from_utf8(ffi::CStr::from_ptr(s).to_bytes()).unwrap()
302 )
303 },
304 }
305 }
306
307 pub fn message(self) -> &'static str {
309 unsafe {
310 let s = zmq_sys::zmq_strerror(self.to_raw());
311 let v: &'static [u8] = mem::transmute(ffi::CStr::from_ptr(s).to_bytes());
312 str::from_utf8(v).unwrap()
313 }
314 }
315}
316
317impl std::error::Error for Error {
318 fn description(&self) -> &str {
319 self.message()
320 }
321}
322
323impl std::fmt::Display for Error {
324 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
325 write!(f, "{}", self.message())
326 }
327}
328
329impl fmt::Debug for Error {
330 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
331 write!(f, "{}", self.message())
333 }
334}
335
336impl From<Error> for std::io::Error {
337 fn from(error: Error) -> Self {
338 use std::io::ErrorKind;
339
340 let kind = match error {
341 Error::ENOENT => ErrorKind::NotFound,
342 Error::EACCES => ErrorKind::PermissionDenied,
343 Error::ECONNREFUSED => ErrorKind::ConnectionRefused,
344 Error::ENOTCONN => ErrorKind::NotConnected,
345 Error::EADDRINUSE => ErrorKind::AddrInUse,
346 Error::EADDRNOTAVAIL => ErrorKind::AddrNotAvailable,
347 Error::EAGAIN => ErrorKind::WouldBlock,
348 Error::EINVAL => ErrorKind::InvalidInput,
349 Error::EINTR => ErrorKind::Interrupted,
350 _ => ErrorKind::Other,
351 };
352 std::io::Error::new(kind, error)
357 }
358}
359
360fn errno_to_error() -> Error {
361 Error::from_raw(unsafe { zmq_sys::zmq_errno() })
362}
363
364pub fn version() -> (i32, i32, i32) {
366 let mut major = 0;
367 let mut minor = 0;
368 let mut patch = 0;
369
370 unsafe {
371 zmq_sys::zmq_version(&mut major, &mut minor, &mut patch);
372 }
373
374 (major as i32, minor as i32, patch as i32)
375}
376
377struct RawContext {
378 ctx: *mut c_void,
379}
380
381impl RawContext {
382 fn term(&self) -> Result<()> {
383 zmq_try!(unsafe { zmq_sys::zmq_ctx_term(self.ctx) });
384 Ok(())
385 }
386}
387
388unsafe impl Send for RawContext {}
389unsafe impl Sync for RawContext {}
390
391impl Drop for RawContext {
392 fn drop(&mut self) {
393 let mut e = self.term();
394 while e == Err(Error::EINTR) {
395 e = self.term();
396 }
397 }
398}
399
400#[derive(Clone)]
421pub struct Context {
422 raw: Arc<RawContext>,
423}
424
425impl Context {
426 pub fn new() -> Context {
428 Context {
429 raw: Arc::new(RawContext {
430 ctx: unsafe { zmq_sys::zmq_ctx_new() },
431 }),
432 }
433 }
434
435 pub fn get_io_threads(&self) -> Result<i32> {
437 let rc =
438 zmq_try!(unsafe { zmq_sys::zmq_ctx_get(self.raw.ctx, zmq_sys::ZMQ_IO_THREADS as _) });
439 Ok(rc as i32)
440 }
441
442 pub fn set_io_threads(&self, value: i32) -> Result<()> {
444 zmq_try!(unsafe {
445 zmq_sys::zmq_ctx_set(self.raw.ctx, zmq_sys::ZMQ_IO_THREADS as _, value as i32)
446 });
447 Ok(())
448 }
449
450 pub fn socket(&self, socket_type: SocketType) -> Result<Socket> {
456 let sock = unsafe { zmq_sys::zmq_socket(self.raw.ctx, socket_type.to_raw()) };
457
458 if sock.is_null() {
459 return Err(errno_to_error());
460 }
461
462 Ok(Socket {
463 sock,
464 context: Some(self.clone()),
465 owned: true,
466 })
467 }
468
469 pub fn destroy(&mut self) -> Result<()> {
472 self.raw.term()
473 }
474}
475
476impl Default for Context {
477 fn default() -> Self {
478 Context::new()
479 }
480}
481
482pub struct Socket {
484 sock: *mut c_void,
485 #[allow(dead_code)]
488 context: Option<Context>,
489 owned: bool,
490}
491
492unsafe impl Send for Socket {}
493
494impl Drop for Socket {
495 fn drop(&mut self) {
496 if self.owned && unsafe { zmq_sys::zmq_close(self.sock) } == -1 {
497 panic!("{}", errno_to_error());
498 }
499 }
500}
501
502#[cfg(unix)]
503impl AsRawFd for Socket {
504 fn as_raw_fd(&self) -> UnixRawFd {
505 self.get_fd().unwrap() as UnixRawFd
506 }
507}
508
509#[cfg(windows)]
510impl AsRawSocket for Socket {
511 fn as_raw_socket(&self) -> RawSocket {
512 self.get_fd().unwrap() as RawSocket
513 }
514}
515
516macro_rules! sockopt_getter {
517 ( $(#[$meta:meta])*
518 pub $getter:ident => $constant_name:ident as $ty:ty
519 ) => {
520 $(#[$meta])*
521 pub fn $getter(&self) -> Result<$ty> {
522 <$ty as sockopt::Getter>::get(self.sock, zmq_sys::$constant_name as c_int)
523 }
524 };
525}
526
527macro_rules! sockopt_setter {
528 ( $(#[$meta:meta])*
529 pub $setter:ident => $constant_name:ident as $ty:ty
530 ) => {
531 $(#[$meta])*
532 pub fn $setter(&self, value: $ty) -> Result<()> {
533 <$ty as sockopt::Setter>::set(self.sock, zmq_sys::$constant_name as c_int, value)
534 }
535 };
536}
537
538macro_rules! sockopt_seq {
539 ( META { $($meta:meta)* }, ) => ();
540 ( META { $($meta:meta)* }, $(#[$item_meta:meta])* (_, $setter:ident) => $constant_name:ident as $ty:ty,
541 $($rest:tt)*
542 ) => {
543 sockopt_setter! {
544 $(#[$meta])* $(#[$item_meta])*
545 pub $setter => $constant_name as $ty
546 }
547 sockopt_seq!(META { $($meta)* }, $($rest)*);
548 };
549 ( META { $($meta:meta)* }, $(#[$item_meta:meta])* ($getter:ident) => $constant_name:ident as $ty:ty,
550 $($rest:tt)*
551 ) => {
552 sockopt_getter! {
553 $(#[$meta])* $(#[$item_meta])*
554 pub $getter => $constant_name as $ty
555 }
556 sockopt_seq!(META { $($meta)* }, $($rest)*);
557 };
558 ( META { $($meta:meta)* }, $(#[$item_meta:meta])* ($getter:ident, $setter:ident) => $constant_name:ident as $ty:ty,
559 $($rest:tt)*
560 ) => {
561 sockopt_getter! {
562 $(#[$meta])* $(#[$item_meta])*
563 pub $getter => $constant_name as $ty
564 }
565 sockopt_setter! {
566 $(#[$meta])* $(#[$item_meta])*
567 pub $setter => $constant_name as $ty
568 }
569 sockopt_seq!(META { $($meta)* }, $($rest)*);
570 };
571}
572
573macro_rules! sockopts {
574 () => ();
575 ( $($rest:tt)* ) => {
576 sockopt_seq!(META {}, $($rest)*);
577 };
578}
579
580pub trait Sendable {
590 fn send(self, socket: &Socket, flags: i32) -> Result<()>;
591}
592
593impl<T> Sendable for T
594where
595 T: Into<Message>,
596{
597 fn send(self, socket: &Socket, flags: i32) -> Result<()> {
598 let mut msg = self.into();
599 zmq_try!(unsafe { zmq_sys::zmq_msg_send(msg_ptr(&mut msg), socket.sock, flags as c_int) });
600 Ok(())
601 }
602}
603
604impl Socket {
605 pub fn into_raw(mut self) -> *mut c_void {
611 self.owned = false;
612 self.sock
613 }
614
615 pub unsafe fn from_raw(sock: *mut c_void) -> Socket {
627 Socket {
628 sock,
629 context: None,
630 owned: true,
631 }
632 }
633
634 pub fn as_mut_ptr(&mut self) -> *mut c_void {
640 self.sock
641 }
642
643 pub fn bind(&self, endpoint: &str) -> Result<()> {
645 let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
646 zmq_try!(unsafe { zmq_sys::zmq_bind(self.sock, c_str.as_ptr()) });
647 Ok(())
648 }
649
650 pub fn unbind(&self, endpoint: &str) -> Result<()> {
652 let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
653 zmq_try!(unsafe { zmq_sys::zmq_unbind(self.sock, c_str.as_ptr()) });
654 Ok(())
655 }
656
657 pub fn connect(&self, endpoint: &str) -> Result<()> {
659 let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
660 zmq_try!(unsafe { zmq_sys::zmq_connect(self.sock, c_str.as_ptr()) });
661 Ok(())
662 }
663
664 pub fn disconnect(&self, endpoint: &str) -> Result<()> {
666 let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
667 zmq_try!(unsafe { zmq_sys::zmq_disconnect(self.sock, c_str.as_ptr()) });
668 Ok(())
669 }
670
671 pub fn monitor(&self, monitor_endpoint: &str, events: i32) -> Result<()> {
673 let c_str = ffi::CString::new(monitor_endpoint.as_bytes()).unwrap();
674 zmq_try!(unsafe {
675 zmq_sys::zmq_socket_monitor(self.sock, c_str.as_ptr(), events as c_int)
676 });
677 Ok(())
678 }
679
680 pub fn send<T>(&self, data: T, flags: i32) -> Result<()>
685 where
686 T: Sendable,
687 {
688 data.send(self, flags)
689 }
690
691 #[deprecated(since = "0.9.0", note = "Use `send` instead")]
693 pub fn send_msg(&self, msg: Message, flags: i32) -> Result<()> {
694 self.send(msg, flags)
695 }
696
697 #[deprecated(since = "0.9.0", note = "Use `send` instead")]
698 pub fn send_str(&self, data: &str, flags: i32) -> Result<()> {
699 self.send(data, flags)
700 }
701
702 pub fn send_multipart<I, T>(&self, iter: I, flags: i32) -> Result<()>
703 where
704 I: IntoIterator<Item = T>,
705 T: Into<Message>,
706 {
707 let mut last_part: Option<T> = None;
708 for part in iter {
709 let maybe_last = last_part.take();
710 if let Some(last) = maybe_last {
711 self.send(last.into(), flags | SNDMORE)?;
712 }
713 last_part = Some(part);
714 }
715 if let Some(last) = last_part {
716 self.send(last.into(), flags)
717 } else {
718 Ok(())
719 }
720 }
721
722 pub fn recv(&self, msg: &mut Message, flags: i32) -> Result<()> {
725 zmq_try!(unsafe { zmq_sys::zmq_msg_recv(msg_ptr(msg), self.sock, flags as c_int) });
726 Ok(())
727 }
728
729 pub fn recv_into(&self, bytes: &mut [u8], flags: i32) -> Result<usize> {
733 let bytes_ptr = bytes.as_mut_ptr() as *mut c_void;
734 let rc = zmq_try!(unsafe {
735 zmq_sys::zmq_recv(self.sock, bytes_ptr, bytes.len(), flags as c_int)
736 });
737 Ok(rc as usize)
738 }
739
740 pub fn recv_msg(&self, flags: i32) -> Result<Message> {
742 let mut msg = Message::new();
743 self.recv(&mut msg, flags).map(|_| msg)
744 }
745
746 pub fn recv_bytes(&self, flags: i32) -> Result<Vec<u8>> {
748 self.recv_msg(flags).map(|msg| msg.to_vec())
749 }
750
751 pub fn recv_string(&self, flags: i32) -> Result<result::Result<String, Vec<u8>>> {
756 self.recv_bytes(flags)
757 .map(|bytes| String::from_utf8(bytes).map_err(FromUtf8Error::into_bytes))
758 }
759
760 pub fn recv_multipart(&self, flags: i32) -> Result<Vec<Vec<u8>>> {
766 let mut parts: Vec<Vec<u8>> = vec![];
767 loop {
768 let part = self.recv_bytes(flags)?;
769 parts.push(part);
770
771 let more_parts = self.get_rcvmore()?;
772 if !more_parts {
773 break;
774 }
775 }
776 Ok(parts)
777 }
778
779 sockopts! {
780 (is_ipv6, set_ipv6) => ZMQ_IPV6 as bool,
782 (is_immediate, set_immediate) => ZMQ_IMMEDIATE as bool,
784 (is_plain_server, set_plain_server) => ZMQ_PLAIN_SERVER as bool,
786 (is_conflate, set_conflate) => ZMQ_CONFLATE as bool,
788 (is_probe_router, set_probe_router) => ZMQ_PROBE_ROUTER as bool,
789 (is_router_mandatory, set_router_mandatory) => ZMQ_ROUTER_MANDATORY as bool,
790 (is_router_handover, set_router_handover) => ZMQ_ROUTER_HANDOVER as bool,
791 (is_curve_server, set_curve_server) => ZMQ_CURVE_SERVER as bool,
792 (is_gssapi_server, set_gssapi_server) => ZMQ_GSSAPI_SERVER as bool,
793 (is_gssapi_plaintext, set_gssapi_plaintext) => ZMQ_GSSAPI_PLAINTEXT as bool,
794 (_, set_req_relaxed) => ZMQ_REQ_RELAXED as bool,
795 (_, set_req_correlate) => ZMQ_REQ_CORRELATE as bool,
796 }
797
798 pub fn get_socket_type(&self) -> Result<SocketType> {
800 sockopt::get(self.sock, zmq_sys::ZMQ_TYPE as c_int).map(SocketType::from_raw)
801 }
802
803 pub fn get_rcvmore(&self) -> Result<bool> {
805 sockopt::get(self.sock, zmq_sys::ZMQ_RCVMORE as c_int).map(|o: i64| o == 1i64)
806 }
807
808 sockopts! {
809 (get_maxmsgsize, set_maxmsgsize) => ZMQ_MAXMSGSIZE as i64,
810 (get_sndhwm, set_sndhwm) => ZMQ_SNDHWM as i32,
811 (get_rcvhwm, set_rcvhwm) => ZMQ_RCVHWM as i32,
812 (get_affinity, set_affinity) => ZMQ_AFFINITY as u64,
813 (get_rate, set_rate) => ZMQ_RATE as i32,
814 (get_recovery_ivl, set_recovery_ivl) => ZMQ_RECOVERY_IVL as i32,
815 (get_sndbuf, set_sndbuf) => ZMQ_SNDBUF as i32,
816 (get_rcvbuf, set_rcvbuf) => ZMQ_RCVBUF as i32,
817 (get_tos, set_tos) => ZMQ_TOS as i32,
818 (get_linger, set_linger) => ZMQ_LINGER as i32,
819 (get_reconnect_ivl, set_reconnect_ivl) => ZMQ_RECONNECT_IVL as i32,
820 (get_reconnect_ivl_max, set_reconnect_ivl_max) => ZMQ_RECONNECT_IVL_MAX as i32,
821 (get_backlog, set_backlog) => ZMQ_BACKLOG as i32,
822
823 (get_fd) => ZMQ_FD as RawFd,
834
835 (get_events) => ZMQ_EVENTS as PollEvents,
863
864 (get_multicast_hops, set_multicast_hops) => ZMQ_MULTICAST_HOPS as i32,
865 (get_rcvtimeo, set_rcvtimeo) => ZMQ_RCVTIMEO as i32,
866 (get_sndtimeo, set_sndtimeo) => ZMQ_SNDTIMEO as i32,
867 (get_tcp_keepalive, set_tcp_keepalive) => ZMQ_TCP_KEEPALIVE as i32,
868 (get_tcp_keepalive_cnt, set_tcp_keepalive_cnt) => ZMQ_TCP_KEEPALIVE_CNT as i32,
869 (get_tcp_keepalive_idle, set_tcp_keepalive_idle) => ZMQ_TCP_KEEPALIVE_IDLE as i32,
870 (get_tcp_keepalive_intvl, set_tcp_keepalive_intvl) => ZMQ_TCP_KEEPALIVE_INTVL as i32,
871 (get_handshake_ivl, set_handshake_ivl) => ZMQ_HANDSHAKE_IVL as i32,
872 (_, set_identity) => ZMQ_ROUTING_ID as &[u8],
874 (_, set_subscribe) => ZMQ_SUBSCRIBE as &[u8],
875 (_, set_unsubscribe) => ZMQ_UNSUBSCRIBE as &[u8],
876 (get_heartbeat_ivl, set_heartbeat_ivl) => ZMQ_HEARTBEAT_IVL as i32,
877 (get_heartbeat_ttl, set_heartbeat_ttl) => ZMQ_HEARTBEAT_TTL as i32,
878 (get_heartbeat_timeout, set_heartbeat_timeout) => ZMQ_HEARTBEAT_TIMEOUT as i32,
879 (get_connect_timeout, set_connect_timeout) => ZMQ_CONNECT_TIMEOUT as i32,
880 }
881
882 pub fn get_identity(&self) -> Result<Vec<u8>> {
884 sockopt::get_bytes(self.sock, zmq_sys::ZMQ_ROUTING_ID as c_int, 255)
886 }
887
888 pub fn get_socks_proxy(&self) -> Result<result::Result<String, Vec<u8>>> {
889 sockopt::get_string(self.sock, zmq_sys::ZMQ_SOCKS_PROXY as c_int, 255, true)
892 }
893
894 pub fn get_mechanism(&self) -> Result<Mechanism> {
895 sockopt::get(self.sock, zmq_sys::ZMQ_MECHANISM as c_int).map(|mech| match mech {
896 zmq_sys::ZMQ_NULL => Mechanism::ZMQ_NULL,
897 zmq_sys::ZMQ_PLAIN => Mechanism::ZMQ_PLAIN,
898 zmq_sys::ZMQ_CURVE => Mechanism::ZMQ_CURVE,
899 zmq_sys::ZMQ_GSSAPI => Mechanism::ZMQ_GSSAPI,
900 _ => panic!("Mechanism is out of range!"),
901 })
902 }
903
904 pub fn get_plain_username(&self) -> Result<result::Result<String, Vec<u8>>> {
905 sockopt::get_string(self.sock, zmq_sys::ZMQ_PLAIN_USERNAME as c_int, 255, true)
907 }
908
909 pub fn get_plain_password(&self) -> Result<result::Result<String, Vec<u8>>> {
910 sockopt::get_string(self.sock, zmq_sys::ZMQ_PLAIN_PASSWORD as c_int, 256, true)
912 }
913
914 pub fn get_zap_domain(&self) -> Result<result::Result<String, Vec<u8>>> {
915 sockopt::get_string(self.sock, zmq_sys::ZMQ_ZAP_DOMAIN as c_int, 255, true)
917 }
918
919 pub fn get_last_endpoint(&self) -> Result<result::Result<String, Vec<u8>>> {
928 sockopt::get_string(
930 self.sock,
931 zmq_sys::ZMQ_LAST_ENDPOINT as c_int,
932 256 + 9 + 1,
933 true,
934 )
935 }
936
937 pub fn get_curve_publickey(&self) -> Result<Vec<u8>> {
943 sockopt::get_bytes(self.sock, zmq_sys::ZMQ_CURVE_PUBLICKEY as c_int, 32)
944 }
945
946 pub fn get_curve_secretkey(&self) -> Result<Vec<u8>> {
952 sockopt::get_bytes(self.sock, zmq_sys::ZMQ_CURVE_SECRETKEY as c_int, 32)
953 }
954
955 pub fn get_curve_serverkey(&self) -> Result<Vec<u8>> {
961 sockopt::get_bytes(self.sock, zmq_sys::ZMQ_CURVE_SERVERKEY as c_int, 32)
963 }
964
965 pub fn get_gssapi_principal(&self) -> Result<result::Result<String, Vec<u8>>> {
966 sockopt::get_string(self.sock, zmq_sys::ZMQ_GSSAPI_PRINCIPAL as c_int, 260, true)
968 }
969
970 pub fn get_gssapi_service_principal(&self) -> Result<result::Result<String, Vec<u8>>> {
971 sockopt::get_string(
973 self.sock,
974 zmq_sys::ZMQ_GSSAPI_SERVICE_PRINCIPAL as c_int,
975 260,
976 true,
977 )
978 }
979
980 sockopts! {
981 (_, set_socks_proxy) => ZMQ_SOCKS_PROXY as Option<&str>,
982 (_, set_plain_username) => ZMQ_PLAIN_USERNAME as Option<&str>,
983 (_, set_plain_password) => ZMQ_PLAIN_PASSWORD as Option<&str>,
984 (_, set_zap_domain) => ZMQ_ZAP_DOMAIN as &str,
985 (_, set_xpub_welcome_msg) => ZMQ_XPUB_WELCOME_MSG as Option<&str>,
986 (_, set_xpub_verbose) => ZMQ_XPUB_VERBOSE as bool,
987
988 (_, set_curve_publickey) => ZMQ_CURVE_PUBLICKEY as &[u8],
989 (_, set_curve_secretkey) => ZMQ_CURVE_SECRETKEY as &[u8],
990 (_, set_curve_serverkey) => ZMQ_CURVE_SERVERKEY as &[u8],
991 (_, set_gssapi_principal) => ZMQ_GSSAPI_PRINCIPAL as &str,
992 (_, set_gssapi_service_principal) => ZMQ_GSSAPI_SERVICE_PRINCIPAL as &str,
993 }
994
995 pub fn as_poll_item(&self, events: PollEvents) -> PollItem {
997 PollItem {
998 socket: self.sock,
999 fd: 0,
1000 events: events.bits(),
1001 revents: 0,
1002 marker: PhantomData,
1003 }
1004 }
1005
1006 pub fn poll(&self, events: PollEvents, timeout_ms: i64) -> Result<i32> {
1011 poll(&mut [self.as_poll_item(events)], timeout_ms)
1012 }
1013}
1014
1015bitflags! {
1018 pub struct PollEvents: i16 {
1020 const POLLIN = zmq_sys::ZMQ_POLLIN as i16;
1023 const POLLOUT = zmq_sys::ZMQ_POLLOUT as i16;
1026 const POLLERR = zmq_sys::ZMQ_POLLERR as i16;
1030 }
1031}
1032
1033pub const POLLIN: PollEvents = PollEvents::POLLIN;
1036
1037pub const POLLOUT: PollEvents = PollEvents::POLLOUT;
1040
1041pub const POLLERR: PollEvents = PollEvents::POLLERR;
1044
1045#[repr(C)]
1051pub struct PollItem<'a> {
1052 socket: *mut c_void,
1053 fd: RawFd,
1054 events: c_short,
1055 revents: c_short,
1056 marker: PhantomData<&'a Socket>,
1057}
1058
1059impl<'a> PollItem<'a> {
1060 pub fn from_fd(fd: RawFd, events: PollEvents) -> PollItem<'a> {
1063 PollItem {
1064 socket: ptr::null_mut(),
1065 fd,
1066 events: events.bits(),
1067 revents: 0,
1068 marker: PhantomData,
1069 }
1070 }
1071
1072 pub fn set_events(&mut self, events: PollEvents) {
1074 self.events = events.bits();
1075 }
1076
1077 pub fn get_revents(&self) -> PollEvents {
1079 PollEvents::from_bits_truncate(self.revents)
1080 }
1081
1082 pub fn is_readable(&self) -> bool {
1084 (self.revents & POLLIN.bits()) != 0
1085 }
1086
1087 pub fn is_writable(&self) -> bool {
1090 (self.revents & POLLOUT.bits()) != 0
1091 }
1092
1093 pub fn is_error(&self) -> bool {
1095 (self.revents & POLLERR.bits()) != 0
1096 }
1097
1098 pub fn has_socket(&self, socket: &Socket) -> bool {
1100 self.socket == socket.sock
1101 }
1102
1103 pub fn has_fd(&self, fd: RawFd) -> bool {
1105 self.socket.is_null() && self.fd == fd
1106 }
1107}
1108
1109pub fn poll(items: &mut [PollItem], timeout: i64) -> Result<i32> {
1121 let rc = zmq_try!(unsafe {
1122 zmq_sys::zmq_poll(
1123 items.as_mut_ptr() as *mut zmq_sys::zmq_pollitem_t,
1124 items.len() as c_int,
1125 timeout as c_long,
1126 )
1127 });
1128 Ok(rc as i32)
1129}
1130
1131pub fn proxy(frontend: &Socket, backend: &Socket) -> Result<()> {
1139 zmq_try!(unsafe { zmq_sys::zmq_proxy(frontend.sock, backend.sock, ptr::null_mut()) });
1140 Ok(())
1141}
1142
1143pub fn proxy_with_capture(
1148 frontend: &mut Socket,
1149 backend: &mut Socket,
1150 capture: &mut Socket,
1151) -> Result<()> {
1152 zmq_try!(unsafe { zmq_sys::zmq_proxy(frontend.sock, backend.sock, capture.sock) });
1153 Ok(())
1154}
1155
1156pub fn proxy_steerable(
1162 frontend: &mut Socket,
1163 backend: &mut Socket,
1164 control: &mut Socket,
1165) -> Result<()> {
1166 zmq_try!(unsafe {
1167 zmq_sys::zmq_proxy_steerable(frontend.sock, backend.sock, ptr::null_mut(), control.sock)
1168 });
1169 Ok(())
1170}
1171
1172pub fn proxy_steerable_with_capture(
1176 frontend: &mut Socket,
1177 backend: &mut Socket,
1178 capture: &mut Socket,
1179 control: &mut Socket,
1180) -> Result<()> {
1181 zmq_try!(unsafe {
1182 zmq_sys::zmq_proxy_steerable(frontend.sock, backend.sock, capture.sock, control.sock)
1183 });
1184 Ok(())
1185}
1186
1187pub fn has(capability: &str) -> Option<bool> {
1203 let c_str = ffi::CString::new(capability).unwrap();
1204 unsafe { Some(zmq_sys::zmq_has(c_str.as_ptr()) == 1) }
1205}
1206
1207#[derive(Debug)]
1213pub struct CurveKeyPair {
1214 pub public_key: [u8; 32],
1215 pub secret_key: [u8; 32],
1216}
1217
1218impl CurveKeyPair {
1219 pub fn new() -> Result<CurveKeyPair> {
1221 let mut ffi_public_key = [0u8; 41];
1223 let mut ffi_secret_key = [0u8; 41];
1224
1225 zmq_try!(unsafe {
1226 zmq_sys::zmq_curve_keypair(
1227 ffi_public_key.as_mut_ptr() as *mut libc::c_char,
1228 ffi_secret_key.as_mut_ptr() as *mut libc::c_char,
1229 )
1230 });
1231
1232 let mut pair = CurveKeyPair {
1233 public_key: [0; 32],
1234 secret_key: [0; 32],
1235 };
1236 unsafe {
1237 zmq_sys::zmq_z85_decode(
1240 pair.public_key.as_mut_ptr(),
1241 ffi_public_key.as_ptr() as *mut libc::c_char,
1242 );
1243 zmq_sys::zmq_z85_decode(
1244 pair.secret_key.as_mut_ptr(),
1245 ffi_secret_key.as_ptr() as *mut libc::c_char,
1246 );
1247 }
1248
1249 Ok(pair)
1250 }
1251}
1252
1253#[derive(Debug)]
1255pub enum EncodeError {
1256 BadLength,
1257 FromUtf8Error(FromUtf8Error),
1258}
1259
1260impl From<FromUtf8Error> for EncodeError {
1261 fn from(err: FromUtf8Error) -> Self {
1262 EncodeError::FromUtf8Error(err)
1263 }
1264}
1265
1266impl fmt::Display for EncodeError {
1267 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1268 match *self {
1269 EncodeError::BadLength => write!(f, "Invalid data length. Should be multiple of 4."),
1270 EncodeError::FromUtf8Error(ref e) => write!(f, "UTF8 conversion error: {}", e),
1271 }
1272 }
1273}
1274
1275impl std::error::Error for EncodeError {}
1276
1277pub fn z85_encode(data: &[u8]) -> result::Result<String, EncodeError> {
1284 if data.len() % 4 != 0 {
1285 return Err(EncodeError::BadLength);
1286 }
1287
1288 let len = data.len() * 5 / 4 + 1;
1289 let mut dest = vec![0u8; len];
1290
1291 unsafe {
1292 zmq_sys::zmq_z85_encode(
1293 dest.as_mut_ptr() as *mut libc::c_char,
1294 data.as_ptr(),
1295 data.len(),
1296 );
1297 }
1298
1299 dest.truncate(len - 1);
1300 String::from_utf8(dest).map_err(EncodeError::FromUtf8Error)
1301}
1302
1303#[derive(Debug)]
1305pub enum DecodeError {
1306 BadLength,
1308 NulError(ffi::NulError),
1310}
1311
1312impl From<ffi::NulError> for DecodeError {
1313 fn from(err: ffi::NulError) -> Self {
1314 DecodeError::NulError(err)
1315 }
1316}
1317
1318impl fmt::Display for DecodeError {
1319 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1320 match *self {
1321 DecodeError::BadLength => write!(f, "Invalid data length. Should be multiple of 5."),
1322 DecodeError::NulError(ref e) => write!(f, "Nul byte error: {}", e),
1323 }
1324 }
1325}
1326
1327impl std::error::Error for DecodeError {}
1328
1329pub fn z85_decode(data: &str) -> result::Result<Vec<u8>, DecodeError> {
1336 if data.len() % 5 != 0 {
1337 return Err(DecodeError::BadLength);
1338 }
1339
1340 let len = data.len() * 4 / 5;
1341 let mut dest = vec![0u8; len];
1342
1343 let c_str = ffi::CString::new(data)?;
1344
1345 unsafe {
1346 zmq_sys::zmq_z85_decode(dest.as_mut_ptr(), c_str.into_raw());
1347 }
1348
1349 Ok(dest)
1350}