slog_async/
lib.rs

1// {{{ Crate docs
2//! # Async drain for slog-rs
3//!
4//! `slog-rs` is an ecosystem of reusable components for structured, extensible,
5//! composable logging for Rust.
6//!
7//! `slog-async` allows building `Drain`s that offload processing to another
8//! thread.  Typically, serialization and IO operations are slow enough that
9//! they make logging hinder the performance of the main code. Sending log
10//! records to another thread is much faster (ballpark of 100ns).
11//!
12//! Note: Unlike other logging solutions, `slog-rs` does not have a hardcoded
13//! async logging implementation. This crate is just a reasonable reference
14//! implementation. It should have good performance and work well in most use
15//! cases. See the documentation and implementation for more details.
16//!
17//! It's relatively easy to implement your own `slog-rs` async logging. Feel
18//! free to use this one as a starting point.
19//!
20//! ## Beware of `std::process::exit`
21//!
22//! When using `std::process::exit` to terminate a process with an exit code,
23//! it is important to notice that destructors will not be called. This matters
24//! for `slog_async` as it **prevents flushing** of the async drain and
25//! **discards messages** that are not yet written.
26//!
27//! A way around this issue is encapsulate the construction of the logger into
28//! its own function that returns before `std::process::exit` is called.
29//!
30//! ```
31//! // ...
32//! fn main() {
33//!     let exit_code = run(); // logger gets flushed as `run()` returns.
34//! #   if false {
35//! #   // this must not run or it'll end the doctest
36//!     std::process::exit(exit_code)
37//! #   }
38//! }
39//!
40//! fn run() -> i32 {
41//!    // initialize the logger
42//!
43//!    // ... do your thing ...
44//!
45//!    1 // exit code to return
46//! }
47//! ```
48// }}}
49
50// {{{ Imports & meta
51#![warn(missing_docs)]
52
53#[macro_use]
54extern crate slog;
55extern crate crossbeam_channel;
56extern crate take_mut;
57extern crate thread_local;
58
59use crossbeam_channel::Sender;
60
61use slog::{BorrowedKV, Level, Record, RecordStatic, SingleKV, KV};
62use slog::{Key, OwnedKVList, Serializer};
63
64use slog::Drain;
65use std::fmt;
66use std::sync;
67use std::{io, thread};
68
69use std::sync::atomic::AtomicUsize;
70use std::sync::atomic::Ordering;
71use std::sync::Mutex;
72use take_mut::take;
73
74use std::panic::{catch_unwind, AssertUnwindSafe};
75// }}}
76
77// {{{ Serializer
78struct ToSendSerializer {
79    kv: Box<dyn KV + Send>,
80}
81
82impl ToSendSerializer {
83    fn new() -> Self {
84        ToSendSerializer { kv: Box::new(()) }
85    }
86
87    fn finish(self) -> Box<dyn KV + Send> {
88        self.kv
89    }
90}
91
92impl Serializer for ToSendSerializer {
93    fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result {
94        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
95        Ok(())
96    }
97    fn emit_unit(&mut self, key: Key) -> slog::Result {
98        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, ()))));
99        Ok(())
100    }
101    fn emit_none(&mut self, key: Key) -> slog::Result {
102        let val: Option<()> = None;
103        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
104        Ok(())
105    }
106    fn emit_char(&mut self, key: Key, val: char) -> slog::Result {
107        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
108        Ok(())
109    }
110    fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result {
111        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
112        Ok(())
113    }
114    fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result {
115        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
116        Ok(())
117    }
118    fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result {
119        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
120        Ok(())
121    }
122    fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result {
123        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
124        Ok(())
125    }
126    fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result {
127        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
128        Ok(())
129    }
130    fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result {
131        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
132        Ok(())
133    }
134    fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result {
135        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
136        Ok(())
137    }
138    fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result {
139        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
140        Ok(())
141    }
142    fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result {
143        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
144        Ok(())
145    }
146    fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result {
147        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
148        Ok(())
149    }
150    #[cfg(integer128)]
151    fn emit_u128(&mut self, key: Key, val: u128) -> slog::Result {
152        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
153        Ok(())
154    }
155    #[cfg(integer128)]
156    fn emit_i128(&mut self, key: Key, val: i128) -> slog::Result {
157        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
158        Ok(())
159    }
160    fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result {
161        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
162        Ok(())
163    }
164    fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result {
165        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
166        Ok(())
167    }
168    fn emit_str(&mut self, key: Key, val: &str) -> slog::Result {
169        let val = val.to_owned();
170        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
171        Ok(())
172    }
173    fn emit_arguments(
174        &mut self,
175        key: Key,
176        val: &fmt::Arguments,
177    ) -> slog::Result {
178        let val = fmt::format(*val);
179        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
180        Ok(())
181    }
182
183    #[cfg(feature = "nested-values")]
184    fn emit_serde(
185        &mut self,
186        key: Key,
187        value: &slog::SerdeValue,
188    ) -> slog::Result {
189        let val = value.to_sendable();
190        take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
191        Ok(())
192    }
193}
194// }}}
195
196// {{{ Async
197// {{{ AsyncError
198/// Errors reported by `Async`
199#[derive(Debug)]
200pub enum AsyncError {
201    /// Could not send record to worker thread due to full queue
202    Full,
203    /// Fatal problem - mutex or channel poisoning issue
204    Fatal(Box<dyn std::error::Error>),
205}
206
207impl<T> From<crossbeam_channel::TrySendError<T>> for AsyncError {
208    fn from(_: crossbeam_channel::TrySendError<T>) -> AsyncError {
209        AsyncError::Full
210    }
211}
212
213impl<T> From<crossbeam_channel::SendError<T>> for AsyncError {
214    fn from(_: crossbeam_channel::SendError<T>) -> AsyncError {
215        AsyncError::Fatal(Box::new(io::Error::new(
216            io::ErrorKind::BrokenPipe,
217            "The logger thread terminated",
218        )))
219    }
220}
221
222impl<T> From<std::sync::PoisonError<T>> for AsyncError {
223    fn from(err: std::sync::PoisonError<T>) -> AsyncError {
224        AsyncError::Fatal(Box::new(io::Error::new(
225            io::ErrorKind::BrokenPipe,
226            err.to_string(),
227        )))
228    }
229}
230
231/// `AsyncResult` alias
232pub type AsyncResult<T> = std::result::Result<T, AsyncError>;
233
234// }}}
235
236// {{{ AsyncCore
237/// `AsyncCore` builder
238pub struct AsyncCoreBuilder<D>
239where
240    D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
241{
242    chan_size: usize,
243    blocking: bool,
244    drain: D,
245    thread_name: Option<String>,
246}
247
248impl<D> AsyncCoreBuilder<D>
249where
250    D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
251{
252    fn new(drain: D) -> Self {
253        AsyncCoreBuilder {
254            chan_size: 128,
255            blocking: false,
256            drain,
257            thread_name: None,
258        }
259    }
260
261    /// Configure a name to be used for the background thread.
262    ///
263    /// The name must not contain '\0'.
264    ///
265    /// # Panics
266    ///
267    /// If a name with '\0' is passed.
268    pub fn thread_name(mut self, name: String) -> Self {
269        assert!(name.find('\0').is_none(), "Name with \\'0\\' in it passed");
270        self.thread_name = Some(name);
271        self
272    }
273
274    /// Set channel size used to send logging records to worker thread. When
275    /// buffer is full `AsyncCore` will start returning `AsyncError::Full` or block, depending on
276    /// the `blocking` configuration.
277    pub fn chan_size(mut self, s: usize) -> Self {
278        self.chan_size = s;
279        self
280    }
281
282    /// Should the logging call be blocking if the channel is full?
283    ///
284    /// Default is false, in which case it'll return `AsyncError::Full`.
285    pub fn blocking(mut self, blocking: bool) -> Self {
286        self.blocking = blocking;
287        self
288    }
289
290    fn spawn_thread(self) -> (thread::JoinHandle<()>, Sender<AsyncMsg>) {
291        let (tx, rx) = crossbeam_channel::bounded(self.chan_size);
292        let mut builder = thread::Builder::new();
293        if let Some(thread_name) = self.thread_name {
294            builder = builder.name(thread_name);
295        }
296        let drain = self.drain;
297        let join = builder
298            .spawn(move || {
299                let drain = AssertUnwindSafe(&drain);
300                // catching possible unwinding panics which can occur in used inner Drain implementation
301                if let Err(panic_cause) = catch_unwind(move || loop {
302                    match rx.recv() {
303                        Ok(AsyncMsg::Record(r)) => {
304                            if r.log_to(&*drain).is_err() {
305                                eprintln!("slog-async failed while writing");
306                                return;
307                            }
308                        }
309                        Ok(AsyncMsg::Finish) => return,
310                        Err(recv_error) => {
311                            eprintln!("slog-async failed while receiving: {recv_error}");
312                            return;
313                        }
314                    }
315                }) {
316                    eprintln!("slog-async failed with panic: {panic_cause:?}")
317                }
318            })
319            .unwrap();
320
321        (join, tx)
322    }
323
324    /// Build `AsyncCore`
325    pub fn build(self) -> AsyncCore {
326        self.build_no_guard()
327    }
328
329    /// Build `AsyncCore`
330    pub fn build_no_guard(self) -> AsyncCore {
331        let blocking = self.blocking;
332        let (join, tx) = self.spawn_thread();
333
334        AsyncCore {
335            ref_sender: tx,
336            tl_sender: thread_local::ThreadLocal::new(),
337            join: Mutex::new(Some(join)),
338            blocking,
339        }
340    }
341
342    /// Build `AsyncCore` with `AsyncGuard`
343    ///
344    /// See `AsyncGuard` for more information.
345    pub fn build_with_guard(self) -> (AsyncCore, AsyncGuard) {
346        let blocking = self.blocking;
347        let (join, tx) = self.spawn_thread();
348
349        (
350            AsyncCore {
351                ref_sender: tx.clone(),
352                tl_sender: thread_local::ThreadLocal::new(),
353                join: Mutex::new(None),
354                blocking,
355            },
356            AsyncGuard {
357                join: Some(join),
358                tx,
359            },
360        )
361    }
362}
363
364/// Async guard
365///
366/// All `Drain`s are reference-counted by every `Logger` that uses them.
367/// `Async` drain runs a worker thread and sends a termination (and flushing)
368/// message only when being `drop`ed. Because of that it's actually
369/// quite easy to have a left-over reference to a `Async` drain, when
370/// terminating: especially on `panic`s or similar unwinding event. Typically
371/// it's caused be a leftover reference like `Logger` in thread-local variable,
372/// global variable, or a thread that is not being joined on. It might be a
373/// program bug, but logging should work reliably especially in case of bugs.
374///
375/// `AsyncGuard` is a remedy: it will send a flush and termination message to
376/// a `Async` worker thread, and wait for it to finish on it's own `drop`. Using it
377/// is a simplest way to guarantee log flushing when using `slog_async`.
378pub struct AsyncGuard {
379    // Should always be `Some`. `None` only
380    // after `drop`
381    join: Option<thread::JoinHandle<()>>,
382    tx: Sender<AsyncMsg>,
383}
384
385impl Drop for AsyncGuard {
386    fn drop(&mut self) {
387        let _err: Result<(), Box<dyn std::error::Error>> = {
388            || {
389                let _ = self.tx.send(AsyncMsg::Finish);
390                let join = self.join.take().unwrap();
391                if join.thread().id() != thread::current().id() {
392                    // See AsyncCore::drop for rationale of this branch.
393                    join.join().map_err(|_| {
394                        io::Error::new(
395                            io::ErrorKind::BrokenPipe,
396                            "Logging thread worker join error",
397                        )
398                    })?;
399                }
400                Ok(())
401            }
402        }();
403    }
404}
405
406/// Core of `Async` drain
407///
408/// See `Async` for documentation.
409///
410/// Wrapping `AsyncCore` allows implementing custom overflow (and other errors)
411/// handling strategy.
412///
413/// Note: On drop `AsyncCore` waits for it's worker-thread to finish (after
414/// handling all previous `Record`s sent to it). If you can't tolerate the
415/// delay, make sure you drop it eg. in another thread.
416pub struct AsyncCore {
417    ref_sender: Sender<AsyncMsg>,
418    tl_sender: thread_local::ThreadLocal<Sender<AsyncMsg>>,
419    join: Mutex<Option<thread::JoinHandle<()>>>,
420    blocking: bool,
421}
422
423impl AsyncCore {
424    /// New `AsyncCore` with default parameters
425    pub fn new<D>(drain: D) -> Self
426    where
427        D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
428        D: std::panic::RefUnwindSafe,
429    {
430        AsyncCoreBuilder::new(drain).build()
431    }
432
433    /// Build `AsyncCore` drain with custom parameters
434    pub fn custom<
435        D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
436    >(
437        drain: D,
438    ) -> AsyncCoreBuilder<D> {
439        AsyncCoreBuilder::new(drain)
440    }
441    fn get_sender(
442        &self,
443    ) -> Result<
444        &crossbeam_channel::Sender<AsyncMsg>,
445        std::sync::PoisonError<
446            sync::MutexGuard<crossbeam_channel::Sender<AsyncMsg>>,
447        >,
448    > {
449        self.tl_sender.get_or_try(|| Ok(self.ref_sender.clone()))
450    }
451
452    /// Send `AsyncRecord` to a worker thread.
453    fn send(&self, r: AsyncRecord) -> AsyncResult<()> {
454        let sender = self.get_sender()?;
455
456        if self.blocking {
457            sender.send(AsyncMsg::Record(r))?;
458        } else {
459            sender.try_send(AsyncMsg::Record(r))?;
460        }
461
462        Ok(())
463    }
464}
465
466impl Drain for AsyncCore {
467    type Ok = ();
468    type Err = AsyncError;
469
470    fn log(
471        &self,
472        record: &Record,
473        logger_values: &OwnedKVList,
474    ) -> AsyncResult<()> {
475        self.send(AsyncRecord::from(record, logger_values))
476    }
477}
478
479/// Serialized record.
480pub struct AsyncRecord {
481    msg: String,
482    level: Level,
483    location: Box<slog::RecordLocation>,
484    tag: String,
485    logger_values: OwnedKVList,
486    kv: Box<dyn KV + Send>,
487}
488
489impl AsyncRecord {
490    /// Serializes a `Record` and an `OwnedKVList`.
491    pub fn from(record: &Record, logger_values: &OwnedKVList) -> Self {
492        let mut ser = ToSendSerializer::new();
493        record
494            .kv()
495            .serialize(record, &mut ser)
496            .expect("`ToSendSerializer` can't fail");
497
498        AsyncRecord {
499            msg: fmt::format(*record.msg()),
500            level: record.level(),
501            location: Box::new(*record.location()),
502            tag: String::from(record.tag()),
503            logger_values: logger_values.clone(),
504            kv: ser.finish(),
505        }
506    }
507
508    /// Writes the record to a `Drain`.
509    pub fn log_to<D: Drain>(self, drain: &D) -> Result<D::Ok, D::Err> {
510        let rs = RecordStatic {
511            location: &*self.location,
512            level: self.level,
513            tag: &self.tag,
514        };
515
516        drain.log(
517            &Record::new(
518                &rs,
519                &format_args!("{}", self.msg),
520                BorrowedKV(&self.kv),
521            ),
522            &self.logger_values,
523        )
524    }
525
526    /// Deconstruct this `AsyncRecord` into a record and `OwnedKVList`.
527    pub fn as_record_values(&self, mut f: impl FnMut(&Record, &OwnedKVList)) {
528        let rs = RecordStatic {
529            location: &*self.location,
530            level: self.level,
531            tag: &self.tag,
532        };
533
534        f(
535            &Record::new(
536                &rs,
537                &format_args!("{}", self.msg),
538                BorrowedKV(&self.kv),
539            ),
540            &self.logger_values,
541        )
542    }
543}
544
545enum AsyncMsg {
546    Record(AsyncRecord),
547    Finish,
548}
549
550impl Drop for AsyncCore {
551    fn drop(&mut self) {
552        let _err: Result<(), Box<dyn std::error::Error>> = {
553            || {
554                if let Some(join) = self.join.lock()?.take() {
555                    let _ = self.get_sender()?.send(AsyncMsg::Finish);
556                    if join.thread().id() != thread::current().id() {
557                        // A custom Drain::log implementation could dynamically
558                        // swap out the logger which eventually invokes
559                        // AsyncCore::drop in the worker thread.
560                        // If we drop the AsyncCore inside the logger thread,
561                        // this join() either panic or dead-lock.
562                        // TODO: Figure out whether skipping join() instead of
563                        // panicking is desirable.
564                        join.join().map_err(|_| {
565                            io::Error::new(
566                                io::ErrorKind::BrokenPipe,
567                                "Logging thread worker join error",
568                            )
569                        })?;
570                    }
571                }
572                Ok(())
573            }
574        }();
575    }
576}
577// }}}
578
579/// Behavior used when the channel is full.
580///
581/// # Note
582///
583/// More variants may be added in the future, without considering it a breaking change.
584#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
585pub enum OverflowStrategy {
586    /// The message gets dropped and a message with number of dropped is produced once there's
587    /// space.
588    ///
589    /// This is the default.
590    ///
591    /// Note that the message with number of dropped messages takes one slot in the channel as
592    /// well.
593    DropAndReport,
594    /// The message gets dropped silently.
595    Drop,
596    /// The caller is blocked until there's enough space.
597    Block,
598    #[doc(hidden)]
599    DoNotMatchAgainstThisAndReadTheDocs,
600}
601
602/// `Async` builder
603pub struct AsyncBuilder<D>
604where
605    D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
606{
607    core: AsyncCoreBuilder<D>,
608    // Increment a counter whenever a message is dropped due to not fitting inside the channel.
609    inc_dropped: bool,
610}
611
612impl<D> AsyncBuilder<D>
613where
614    D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
615{
616    fn new(drain: D) -> AsyncBuilder<D> {
617        AsyncBuilder {
618            core: AsyncCoreBuilder::new(drain),
619            inc_dropped: true,
620        }
621    }
622
623    /// Set channel size used to send logging records to worker thread. When
624    /// buffer is full `AsyncCore` will start returning `AsyncError::Full`.
625    pub fn chan_size(self, s: usize) -> Self {
626        AsyncBuilder {
627            core: self.core.chan_size(s),
628            ..self
629        }
630    }
631
632    /// Sets what will happen if the channel is full.
633    pub fn overflow_strategy(
634        self,
635        overflow_strategy: OverflowStrategy,
636    ) -> Self {
637        let (block, inc) = match overflow_strategy {
638            OverflowStrategy::Block => (true, false),
639            OverflowStrategy::Drop => (false, false),
640            OverflowStrategy::DropAndReport => (false, true),
641            OverflowStrategy::DoNotMatchAgainstThisAndReadTheDocs => {
642                panic!("Invalid variant")
643            }
644        };
645        AsyncBuilder {
646            core: self.core.blocking(block),
647            inc_dropped: inc,
648        }
649    }
650
651    /// Configure a name to be used for the background thread.
652    ///
653    /// The name must not contain '\0'.
654    ///
655    /// # Panics
656    ///
657    /// If a name with '\0' is passed.
658    pub fn thread_name(self, name: String) -> Self {
659        AsyncBuilder {
660            core: self.core.thread_name(name),
661            ..self
662        }
663    }
664
665    /// Complete building `Async`
666    pub fn build(self) -> Async {
667        Async {
668            core: self.core.build_no_guard(),
669            dropped: AtomicUsize::new(0),
670            inc_dropped: self.inc_dropped,
671        }
672    }
673
674    /// Complete building `Async`
675    pub fn build_no_guard(self) -> Async {
676        Async {
677            core: self.core.build_no_guard(),
678            dropped: AtomicUsize::new(0),
679            inc_dropped: self.inc_dropped,
680        }
681    }
682
683    /// Complete building `Async` with `AsyncGuard`
684    ///
685    /// See `AsyncGuard` for more information.
686    pub fn build_with_guard(self) -> (Async, AsyncGuard) {
687        let (core, guard) = self.core.build_with_guard();
688        (
689            Async {
690                core,
691                dropped: AtomicUsize::new(0),
692                inc_dropped: self.inc_dropped,
693            },
694            guard,
695        )
696    }
697}
698
699/// Async drain
700///
701/// `Async` will send all the logging records to a wrapped drain running in
702/// another thread.
703///
704/// `Async` never returns `AsyncError::Full`.
705///
706/// `Record`s are passed to the worker thread through a channel with a bounded
707/// size (see `AsyncBuilder::chan_size`). On channel overflow `Async` will
708/// start dropping `Record`s and log a message informing about it after
709/// sending more `Record`s is possible again. The exact details of handling
710/// overflow is implementation defined, might change and should not be relied
711/// on, other than message won't be dropped as long as channel does not
712/// overflow.
713///
714/// Any messages reported by `Async` will contain `slog-async` logging `Record`
715/// tag to allow easy custom handling.
716///
717/// Note: On drop `Async` waits for it's worker-thread to finish (after handling
718/// all previous `Record`s sent to it). If you can't tolerate the delay, make
719/// sure you drop it eg. in another thread.
720pub struct Async {
721    core: AsyncCore,
722    dropped: AtomicUsize,
723    // Increment the dropped counter if dropped?
724    inc_dropped: bool,
725}
726
727impl Async {
728    /// New `AsyncCore` with default parameters
729    pub fn default<
730        D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
731    >(
732        drain: D,
733    ) -> Self {
734        AsyncBuilder::new(drain).build()
735    }
736
737    /// Build `Async` drain with custom parameters
738    ///
739    /// The wrapped drain must handle all results (`Drain<Ok=(),Error=Never>`)
740    /// since there's no way to return it back. See `slog::DrainExt::fuse()` and
741    /// `slog::DrainExt::ignore_res()` for typical error handling strategies.
742    pub fn new<D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static>(
743        drain: D,
744    ) -> AsyncBuilder<D> {
745        AsyncBuilder::new(drain)
746    }
747
748    fn push_dropped(&self, logger_values: &OwnedKVList) -> AsyncResult<()> {
749        let dropped = self.dropped.swap(0, Ordering::Relaxed);
750        if dropped > 0 {
751            match self.core.log(
752                &record!(
753                    slog::Level::Error,
754                    "slog-async",
755                    &format_args!(
756                        "slog-async: logger dropped messages \
757                         due to channel \
758                         overflow"
759                    ),
760                    b!("count" => dropped)
761                ),
762                logger_values,
763            ) {
764                Ok(()) => {}
765                Err(AsyncError::Full) => {
766                    self.dropped.fetch_add(dropped + 1, Ordering::Relaxed);
767                    return Ok(());
768                }
769                Err(e) => return Err(e),
770            }
771        }
772        Ok(())
773    }
774}
775
776impl Drain for Async {
777    type Ok = ();
778    type Err = AsyncError;
779
780    // TODO: Review `Ordering::Relaxed`
781    fn log(
782        &self,
783        record: &Record,
784        logger_values: &OwnedKVList,
785    ) -> AsyncResult<()> {
786        self.push_dropped(logger_values)?;
787
788        match self.core.log(record, logger_values) {
789            Ok(()) => {}
790            Err(AsyncError::Full) if self.inc_dropped => {
791                self.dropped.fetch_add(1, Ordering::Relaxed);
792            }
793            Err(AsyncError::Full) => {}
794            Err(e) => return Err(e),
795        }
796
797        Ok(())
798    }
799}
800
801impl Drop for Async {
802    fn drop(&mut self) {
803        let _ = self.push_dropped(&o!().into());
804    }
805}
806
807// }}}
808
809
810#[cfg(test)]
811mod test {
812    use super::*;
813    use std::sync::mpsc;
814
815    #[test]
816    fn integration_test() {
817        let (mock_drain, mock_drain_rx) = MockDrain::new();
818        let async_drain = AsyncBuilder::new(mock_drain)
819            .build();
820        let slog = slog::Logger::root(async_drain.fuse(), o!("field1" => "value1"));
821
822        info!(slog, "Message 1"; "field2" => "value2");
823        warn!(slog, "Message 2"; "field3" => "value3");
824        assert_eq!(mock_drain_rx.recv().unwrap(), r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"#);
825        assert_eq!(mock_drain_rx.recv().unwrap(), r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"#);
826    }
827
828
829    /// Test-helper drain
830    #[derive(Debug)]
831    struct MockDrain {
832        tx: mpsc::Sender<String>,
833    }
834
835    impl MockDrain {
836        fn new() -> (Self, mpsc::Receiver<String>) {
837            let (tx, rx) = mpsc::channel();
838            (Self { tx }, rx)
839        }
840    }
841
842    impl slog::Drain for MockDrain {
843        type Ok = ();
844        type Err = slog::Never;
845
846        fn log(&self, record: &Record, logger_kv: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
847            let mut serializer = MockSerializer::default();
848            logger_kv.serialize(record, &mut serializer).unwrap();
849            record.kv().serialize(record, &mut serializer).unwrap();
850            let level = record.level().as_short_str();
851            let msg = record.msg().to_string();
852            let entry = format!("{} {}: {:?}", level, msg, serializer.kvs);
853            self.tx.send(entry).unwrap();
854            Ok(())
855        }
856    }
857
858    #[derive(Default)]
859    struct MockSerializer {
860        kvs: Vec<(String, String)>,
861    }
862
863    impl slog::Serializer for MockSerializer {
864        fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> Result<(), slog::Error> {
865            self.kvs.push((key.to_string(), val.to_string()));
866            Ok(())
867        }
868    }
869}
870
871// vim: foldmethod=marker foldmarker={{{,}}}