1#![warn(missing_docs)]
52
53#[macro_use]
54extern crate slog;
55extern crate crossbeam_channel;
56extern crate take_mut;
57extern crate thread_local;
58
59use crossbeam_channel::Sender;
60
61use slog::{BorrowedKV, Level, Record, RecordStatic, SingleKV, KV};
62use slog::{Key, OwnedKVList, Serializer};
63
64use slog::Drain;
65use std::fmt;
66use std::sync;
67use std::{io, thread};
68
69use std::sync::atomic::AtomicUsize;
70use std::sync::atomic::Ordering;
71use std::sync::Mutex;
72use take_mut::take;
73
74use std::panic::{catch_unwind, AssertUnwindSafe};
75struct ToSendSerializer {
79 kv: Box<dyn KV + Send>,
80}
81
82impl ToSendSerializer {
83 fn new() -> Self {
84 ToSendSerializer { kv: Box::new(()) }
85 }
86
87 fn finish(self) -> Box<dyn KV + Send> {
88 self.kv
89 }
90}
91
92impl Serializer for ToSendSerializer {
93 fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result {
94 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
95 Ok(())
96 }
97 fn emit_unit(&mut self, key: Key) -> slog::Result {
98 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, ()))));
99 Ok(())
100 }
101 fn emit_none(&mut self, key: Key) -> slog::Result {
102 let val: Option<()> = None;
103 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
104 Ok(())
105 }
106 fn emit_char(&mut self, key: Key, val: char) -> slog::Result {
107 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
108 Ok(())
109 }
110 fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result {
111 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
112 Ok(())
113 }
114 fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result {
115 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
116 Ok(())
117 }
118 fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result {
119 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
120 Ok(())
121 }
122 fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result {
123 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
124 Ok(())
125 }
126 fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result {
127 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
128 Ok(())
129 }
130 fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result {
131 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
132 Ok(())
133 }
134 fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result {
135 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
136 Ok(())
137 }
138 fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result {
139 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
140 Ok(())
141 }
142 fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result {
143 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
144 Ok(())
145 }
146 fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result {
147 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
148 Ok(())
149 }
150 #[cfg(integer128)]
151 fn emit_u128(&mut self, key: Key, val: u128) -> slog::Result {
152 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
153 Ok(())
154 }
155 #[cfg(integer128)]
156 fn emit_i128(&mut self, key: Key, val: i128) -> slog::Result {
157 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
158 Ok(())
159 }
160 fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result {
161 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
162 Ok(())
163 }
164 fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result {
165 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
166 Ok(())
167 }
168 fn emit_str(&mut self, key: Key, val: &str) -> slog::Result {
169 let val = val.to_owned();
170 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
171 Ok(())
172 }
173 fn emit_arguments(
174 &mut self,
175 key: Key,
176 val: &fmt::Arguments,
177 ) -> slog::Result {
178 let val = fmt::format(*val);
179 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
180 Ok(())
181 }
182
183 #[cfg(feature = "nested-values")]
184 fn emit_serde(
185 &mut self,
186 key: Key,
187 value: &slog::SerdeValue,
188 ) -> slog::Result {
189 let val = value.to_sendable();
190 take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
191 Ok(())
192 }
193}
194#[derive(Debug)]
200pub enum AsyncError {
201 Full,
203 Fatal(Box<dyn std::error::Error>),
205}
206
207impl<T> From<crossbeam_channel::TrySendError<T>> for AsyncError {
208 fn from(_: crossbeam_channel::TrySendError<T>) -> AsyncError {
209 AsyncError::Full
210 }
211}
212
213impl<T> From<crossbeam_channel::SendError<T>> for AsyncError {
214 fn from(_: crossbeam_channel::SendError<T>) -> AsyncError {
215 AsyncError::Fatal(Box::new(io::Error::new(
216 io::ErrorKind::BrokenPipe,
217 "The logger thread terminated",
218 )))
219 }
220}
221
222impl<T> From<std::sync::PoisonError<T>> for AsyncError {
223 fn from(err: std::sync::PoisonError<T>) -> AsyncError {
224 AsyncError::Fatal(Box::new(io::Error::new(
225 io::ErrorKind::BrokenPipe,
226 err.to_string(),
227 )))
228 }
229}
230
231pub type AsyncResult<T> = std::result::Result<T, AsyncError>;
233
234pub struct AsyncCoreBuilder<D>
239where
240 D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
241{
242 chan_size: usize,
243 blocking: bool,
244 drain: D,
245 thread_name: Option<String>,
246}
247
248impl<D> AsyncCoreBuilder<D>
249where
250 D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
251{
252 fn new(drain: D) -> Self {
253 AsyncCoreBuilder {
254 chan_size: 128,
255 blocking: false,
256 drain,
257 thread_name: None,
258 }
259 }
260
261 pub fn thread_name(mut self, name: String) -> Self {
269 assert!(name.find('\0').is_none(), "Name with \\'0\\' in it passed");
270 self.thread_name = Some(name);
271 self
272 }
273
274 pub fn chan_size(mut self, s: usize) -> Self {
278 self.chan_size = s;
279 self
280 }
281
282 pub fn blocking(mut self, blocking: bool) -> Self {
286 self.blocking = blocking;
287 self
288 }
289
290 fn spawn_thread(self) -> (thread::JoinHandle<()>, Sender<AsyncMsg>) {
291 let (tx, rx) = crossbeam_channel::bounded(self.chan_size);
292 let mut builder = thread::Builder::new();
293 if let Some(thread_name) = self.thread_name {
294 builder = builder.name(thread_name);
295 }
296 let drain = self.drain;
297 let join = builder
298 .spawn(move || {
299 let drain = AssertUnwindSafe(&drain);
300 if let Err(panic_cause) = catch_unwind(move || loop {
302 match rx.recv() {
303 Ok(AsyncMsg::Record(r)) => {
304 if r.log_to(&*drain).is_err() {
305 eprintln!("slog-async failed while writing");
306 return;
307 }
308 }
309 Ok(AsyncMsg::Finish) => return,
310 Err(recv_error) => {
311 eprintln!("slog-async failed while receiving: {recv_error}");
312 return;
313 }
314 }
315 }) {
316 eprintln!("slog-async failed with panic: {panic_cause:?}")
317 }
318 })
319 .unwrap();
320
321 (join, tx)
322 }
323
324 pub fn build(self) -> AsyncCore {
326 self.build_no_guard()
327 }
328
329 pub fn build_no_guard(self) -> AsyncCore {
331 let blocking = self.blocking;
332 let (join, tx) = self.spawn_thread();
333
334 AsyncCore {
335 ref_sender: tx,
336 tl_sender: thread_local::ThreadLocal::new(),
337 join: Mutex::new(Some(join)),
338 blocking,
339 }
340 }
341
342 pub fn build_with_guard(self) -> (AsyncCore, AsyncGuard) {
346 let blocking = self.blocking;
347 let (join, tx) = self.spawn_thread();
348
349 (
350 AsyncCore {
351 ref_sender: tx.clone(),
352 tl_sender: thread_local::ThreadLocal::new(),
353 join: Mutex::new(None),
354 blocking,
355 },
356 AsyncGuard {
357 join: Some(join),
358 tx,
359 },
360 )
361 }
362}
363
364pub struct AsyncGuard {
379 join: Option<thread::JoinHandle<()>>,
382 tx: Sender<AsyncMsg>,
383}
384
385impl Drop for AsyncGuard {
386 fn drop(&mut self) {
387 let _err: Result<(), Box<dyn std::error::Error>> = {
388 || {
389 let _ = self.tx.send(AsyncMsg::Finish);
390 let join = self.join.take().unwrap();
391 if join.thread().id() != thread::current().id() {
392 join.join().map_err(|_| {
394 io::Error::new(
395 io::ErrorKind::BrokenPipe,
396 "Logging thread worker join error",
397 )
398 })?;
399 }
400 Ok(())
401 }
402 }();
403 }
404}
405
406pub struct AsyncCore {
417 ref_sender: Sender<AsyncMsg>,
418 tl_sender: thread_local::ThreadLocal<Sender<AsyncMsg>>,
419 join: Mutex<Option<thread::JoinHandle<()>>>,
420 blocking: bool,
421}
422
423impl AsyncCore {
424 pub fn new<D>(drain: D) -> Self
426 where
427 D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
428 D: std::panic::RefUnwindSafe,
429 {
430 AsyncCoreBuilder::new(drain).build()
431 }
432
433 pub fn custom<
435 D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
436 >(
437 drain: D,
438 ) -> AsyncCoreBuilder<D> {
439 AsyncCoreBuilder::new(drain)
440 }
441 fn get_sender(
442 &self,
443 ) -> Result<
444 &crossbeam_channel::Sender<AsyncMsg>,
445 std::sync::PoisonError<
446 sync::MutexGuard<crossbeam_channel::Sender<AsyncMsg>>,
447 >,
448 > {
449 self.tl_sender.get_or_try(|| Ok(self.ref_sender.clone()))
450 }
451
452 fn send(&self, r: AsyncRecord) -> AsyncResult<()> {
454 let sender = self.get_sender()?;
455
456 if self.blocking {
457 sender.send(AsyncMsg::Record(r))?;
458 } else {
459 sender.try_send(AsyncMsg::Record(r))?;
460 }
461
462 Ok(())
463 }
464}
465
466impl Drain for AsyncCore {
467 type Ok = ();
468 type Err = AsyncError;
469
470 fn log(
471 &self,
472 record: &Record,
473 logger_values: &OwnedKVList,
474 ) -> AsyncResult<()> {
475 self.send(AsyncRecord::from(record, logger_values))
476 }
477}
478
479pub struct AsyncRecord {
481 msg: String,
482 level: Level,
483 location: Box<slog::RecordLocation>,
484 tag: String,
485 logger_values: OwnedKVList,
486 kv: Box<dyn KV + Send>,
487}
488
489impl AsyncRecord {
490 pub fn from(record: &Record, logger_values: &OwnedKVList) -> Self {
492 let mut ser = ToSendSerializer::new();
493 record
494 .kv()
495 .serialize(record, &mut ser)
496 .expect("`ToSendSerializer` can't fail");
497
498 AsyncRecord {
499 msg: fmt::format(*record.msg()),
500 level: record.level(),
501 location: Box::new(*record.location()),
502 tag: String::from(record.tag()),
503 logger_values: logger_values.clone(),
504 kv: ser.finish(),
505 }
506 }
507
508 pub fn log_to<D: Drain>(self, drain: &D) -> Result<D::Ok, D::Err> {
510 let rs = RecordStatic {
511 location: &*self.location,
512 level: self.level,
513 tag: &self.tag,
514 };
515
516 drain.log(
517 &Record::new(
518 &rs,
519 &format_args!("{}", self.msg),
520 BorrowedKV(&self.kv),
521 ),
522 &self.logger_values,
523 )
524 }
525
526 pub fn as_record_values(&self, mut f: impl FnMut(&Record, &OwnedKVList)) {
528 let rs = RecordStatic {
529 location: &*self.location,
530 level: self.level,
531 tag: &self.tag,
532 };
533
534 f(
535 &Record::new(
536 &rs,
537 &format_args!("{}", self.msg),
538 BorrowedKV(&self.kv),
539 ),
540 &self.logger_values,
541 )
542 }
543}
544
545enum AsyncMsg {
546 Record(AsyncRecord),
547 Finish,
548}
549
550impl Drop for AsyncCore {
551 fn drop(&mut self) {
552 let _err: Result<(), Box<dyn std::error::Error>> = {
553 || {
554 if let Some(join) = self.join.lock()?.take() {
555 let _ = self.get_sender()?.send(AsyncMsg::Finish);
556 if join.thread().id() != thread::current().id() {
557 join.join().map_err(|_| {
565 io::Error::new(
566 io::ErrorKind::BrokenPipe,
567 "Logging thread worker join error",
568 )
569 })?;
570 }
571 }
572 Ok(())
573 }
574 }();
575 }
576}
577#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
585pub enum OverflowStrategy {
586 DropAndReport,
594 Drop,
596 Block,
598 #[doc(hidden)]
599 DoNotMatchAgainstThisAndReadTheDocs,
600}
601
602pub struct AsyncBuilder<D>
604where
605 D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
606{
607 core: AsyncCoreBuilder<D>,
608 inc_dropped: bool,
610}
611
612impl<D> AsyncBuilder<D>
613where
614 D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
615{
616 fn new(drain: D) -> AsyncBuilder<D> {
617 AsyncBuilder {
618 core: AsyncCoreBuilder::new(drain),
619 inc_dropped: true,
620 }
621 }
622
623 pub fn chan_size(self, s: usize) -> Self {
626 AsyncBuilder {
627 core: self.core.chan_size(s),
628 ..self
629 }
630 }
631
632 pub fn overflow_strategy(
634 self,
635 overflow_strategy: OverflowStrategy,
636 ) -> Self {
637 let (block, inc) = match overflow_strategy {
638 OverflowStrategy::Block => (true, false),
639 OverflowStrategy::Drop => (false, false),
640 OverflowStrategy::DropAndReport => (false, true),
641 OverflowStrategy::DoNotMatchAgainstThisAndReadTheDocs => {
642 panic!("Invalid variant")
643 }
644 };
645 AsyncBuilder {
646 core: self.core.blocking(block),
647 inc_dropped: inc,
648 }
649 }
650
651 pub fn thread_name(self, name: String) -> Self {
659 AsyncBuilder {
660 core: self.core.thread_name(name),
661 ..self
662 }
663 }
664
665 pub fn build(self) -> Async {
667 Async {
668 core: self.core.build_no_guard(),
669 dropped: AtomicUsize::new(0),
670 inc_dropped: self.inc_dropped,
671 }
672 }
673
674 pub fn build_no_guard(self) -> Async {
676 Async {
677 core: self.core.build_no_guard(),
678 dropped: AtomicUsize::new(0),
679 inc_dropped: self.inc_dropped,
680 }
681 }
682
683 pub fn build_with_guard(self) -> (Async, AsyncGuard) {
687 let (core, guard) = self.core.build_with_guard();
688 (
689 Async {
690 core,
691 dropped: AtomicUsize::new(0),
692 inc_dropped: self.inc_dropped,
693 },
694 guard,
695 )
696 }
697}
698
699pub struct Async {
721 core: AsyncCore,
722 dropped: AtomicUsize,
723 inc_dropped: bool,
725}
726
727impl Async {
728 pub fn default<
730 D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
731 >(
732 drain: D,
733 ) -> Self {
734 AsyncBuilder::new(drain).build()
735 }
736
737 pub fn new<D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static>(
743 drain: D,
744 ) -> AsyncBuilder<D> {
745 AsyncBuilder::new(drain)
746 }
747
748 fn push_dropped(&self, logger_values: &OwnedKVList) -> AsyncResult<()> {
749 let dropped = self.dropped.swap(0, Ordering::Relaxed);
750 if dropped > 0 {
751 match self.core.log(
752 &record!(
753 slog::Level::Error,
754 "slog-async",
755 &format_args!(
756 "slog-async: logger dropped messages \
757 due to channel \
758 overflow"
759 ),
760 b!("count" => dropped)
761 ),
762 logger_values,
763 ) {
764 Ok(()) => {}
765 Err(AsyncError::Full) => {
766 self.dropped.fetch_add(dropped + 1, Ordering::Relaxed);
767 return Ok(());
768 }
769 Err(e) => return Err(e),
770 }
771 }
772 Ok(())
773 }
774}
775
776impl Drain for Async {
777 type Ok = ();
778 type Err = AsyncError;
779
780 fn log(
782 &self,
783 record: &Record,
784 logger_values: &OwnedKVList,
785 ) -> AsyncResult<()> {
786 self.push_dropped(logger_values)?;
787
788 match self.core.log(record, logger_values) {
789 Ok(()) => {}
790 Err(AsyncError::Full) if self.inc_dropped => {
791 self.dropped.fetch_add(1, Ordering::Relaxed);
792 }
793 Err(AsyncError::Full) => {}
794 Err(e) => return Err(e),
795 }
796
797 Ok(())
798 }
799}
800
801impl Drop for Async {
802 fn drop(&mut self) {
803 let _ = self.push_dropped(&o!().into());
804 }
805}
806
807#[cfg(test)]
811mod test {
812 use super::*;
813 use std::sync::mpsc;
814
815 #[test]
816 fn integration_test() {
817 let (mock_drain, mock_drain_rx) = MockDrain::new();
818 let async_drain = AsyncBuilder::new(mock_drain)
819 .build();
820 let slog = slog::Logger::root(async_drain.fuse(), o!("field1" => "value1"));
821
822 info!(slog, "Message 1"; "field2" => "value2");
823 warn!(slog, "Message 2"; "field3" => "value3");
824 assert_eq!(mock_drain_rx.recv().unwrap(), r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"#);
825 assert_eq!(mock_drain_rx.recv().unwrap(), r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"#);
826 }
827
828
829 #[derive(Debug)]
831 struct MockDrain {
832 tx: mpsc::Sender<String>,
833 }
834
835 impl MockDrain {
836 fn new() -> (Self, mpsc::Receiver<String>) {
837 let (tx, rx) = mpsc::channel();
838 (Self { tx }, rx)
839 }
840 }
841
842 impl slog::Drain for MockDrain {
843 type Ok = ();
844 type Err = slog::Never;
845
846 fn log(&self, record: &Record, logger_kv: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
847 let mut serializer = MockSerializer::default();
848 logger_kv.serialize(record, &mut serializer).unwrap();
849 record.kv().serialize(record, &mut serializer).unwrap();
850 let level = record.level().as_short_str();
851 let msg = record.msg().to_string();
852 let entry = format!("{} {}: {:?}", level, msg, serializer.kvs);
853 self.tx.send(entry).unwrap();
854 Ok(())
855 }
856 }
857
858 #[derive(Default)]
859 struct MockSerializer {
860 kvs: Vec<(String, String)>,
861 }
862
863 impl slog::Serializer for MockSerializer {
864 fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> Result<(), slog::Error> {
865 self.kvs.push((key.to_string(), val.to_string()));
866 Ok(())
867 }
868 }
869}
870
871