tof_dataclasses/
threading.rs

1//! Thread control structures
2//! FIXME - this should go to liftof-lib
3
4use std::collections::HashMap;
5use std::fmt;
6
7/// Send runtime information 
8/// to threads via shared memory
9/// (Arc(Mutex)
10#[derive(Default, Debug)]
11pub struct ThreadControl {
12  /// Stop ALL threads
13  pub stop_flag                  : bool,
14  /// Trigger calibration thread
15  pub calibration_active         : bool,
16  /// Keep track on how many calibration 
17  /// packets we have received
18  pub finished_calibrations      : HashMap<u8,bool>,
19  /// alive indicator for cmd dispatch thread
20  pub thread_cmd_dispatch_active : bool,
21  /// alive indicator for data sink thread
22  pub thread_data_sink_active    : bool,
23  /// alive indicator for runner thread
24  pub thread_runner_active       : bool,
25  /// alive indicator for event builder thread
26  pub thread_event_bldr_active   : bool,
27  /// alive indicator for master trigger thread
28  pub thread_master_trg_active   : bool,
29  /// alive indicator for monitoring thread
30  pub thread_monitoring_active   : bool,
31  /// Running readoutboard communicator threads - the key is associated rb id
32  pub thread_rbcomm_active       : HashMap<u8, bool>,
33  /// The current run id
34  pub run_id                     : u32,
35  /// The number of boards available
36  pub n_rbs                      : u32,
37  /// Write data to disk
38  pub write_data_to_disk         : bool,
39}
40
41impl ThreadControl {
42  pub fn new() -> Self {
43    Self {
44      stop_flag                  : false,
45      calibration_active         : false,
46      finished_calibrations      : HashMap::<u8,bool>::new(),
47      thread_cmd_dispatch_active : false,
48      thread_data_sink_active    : false,
49      thread_runner_active       : false,
50      thread_event_bldr_active   : false,
51      thread_master_trg_active   : false,
52      thread_monitoring_active   : false,
53      thread_rbcomm_active       : HashMap::<u8,bool>::new(),
54      run_id                     : 0,
55      n_rbs                      : 0,
56      write_data_to_disk         : false,
57    }
58  }
59}
60
61impl fmt::Display for ThreadControl {
62  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
63    let mut repr = String::from("<ThreadControl:");
64    repr        += &(format!("\n  Run ID         : {}", self.run_id));
65    repr        += &(format!("\n  N RBs          : {}", self.n_rbs));
66    repr        += &(format!("\n  wr to disk     : {}", self.write_data_to_disk));
67    repr        += "\n    -- reported RB calibration activity:";
68    repr        += &(format!("\n  RB cali active : {}", self.calibration_active));
69    repr        += &(format!("\n  -- finished    : \n{:?}", self.finished_calibrations));       
70    repr        += "\n    -- program status:";
71    repr        += &(format!("\n  stop flag : {}", self.stop_flag));
72    repr        += "\n    -- reported thread activity:";
73    repr        += &(format!("\n  cmd dispatcher : {}", self.thread_cmd_dispatch_active));
74    repr        += &(format!("\n  runner         : {}", self.thread_runner_active));
75    repr        += &(format!("\n  data sink      : {}", self.thread_data_sink_active));
76    repr        += &(format!("\n  monitoring     : {}", self.thread_monitoring_active));
77    if self.thread_rbcomm_active.len() > 0 {
78      repr        += "\n -- active RB threads";
79      for k in self.thread_rbcomm_active.keys() {
80        repr      += &(format!("\n -- -- {} : {}", k, self.thread_rbcomm_active.get(k).unwrap()));
81      }
82    }
83    repr        += &(format!("\n  master trig    : {}>", self.thread_master_trg_active));
84    write!(f, "{}", repr)
85  }
86}
87
88
89//enum Message {
90//  NewJob(Job),
91//  Terminate,
92//}
93//
94//
95///// Implements "standard" Threadpool. 
96/////
97///// Threadpool spawns unnamed threads 
98///// for workers
99//pub struct ThreadPool {
100//  workers: Vec<Worker>,
101//  sender: mpsc::Sender<Message>,
102//}
103//
104//trait FnBox {
105//  fn call_box(self: Box<Self>);
106//}
107//
108//impl<F: FnOnce()> FnBox for F {
109//  fn call_box(self: Box<F>) {
110//    (*self)()
111//  }
112//}
113//
114//type Job = Box<dyn FnBox + Send + 'static>;
115//
116//impl ThreadPool {
117//  /// Create a new ThreadPool.
118//  ///
119//  /// The size is the number of threads in the pool.
120//  ///
121//  /// # Panics
122//  ///
123//  /// The `new` function will panic if the size is zero.
124//  pub fn new(size: usize) -> ThreadPool {
125//    assert!(size > 0);
126//
127//    let (sender, receiver) = mpsc::channel();
128//    let receiver = Arc::new(Mutex::new(receiver));
129//    let mut workers = Vec::with_capacity(size);
130//
131//    for id in 0..size {
132//      workers.push(Worker::new(id, Arc::clone(&receiver)));
133//    }
134//
135//    ThreadPool {
136//      workers,
137//      sender,
138//    }
139//  }
140//
141//  pub fn execute<F>(&self, f: F)
142//    where
143//        F: FnOnce() + Send + 'static {
144//    let job = Box::new(f);
145//    self.sender.send(Message::NewJob(job)).unwrap();
146//  }
147//}
148//
149//impl Drop for ThreadPool {
150//  fn drop(&mut self) {
151//    info!("Sending terminate message to all workers.");
152//
153//    for _ in &mut self.workers {
154//      self.sender.send(Message::Terminate).unwrap();
155//    }
156//
157//    warn!("Shutting down all workers.");
158//
159//    for worker in &mut self.workers {
160//      info!("Shutting down worker {}", worker.id);
161//
162//      if let Some(thread) = worker.thread.take() {
163//          thread.join().unwrap();
164//      }
165//    }
166//  }
167//}
168//
169//struct Worker {
170//  id: usize,
171//  thread: Option<thread::JoinHandle<()>>,
172//}
173//
174//impl Worker {
175//  fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
176//    Worker {
177//      let thread = thread::spawn(move ||{
178//      loop {
179//        let message = receiver.lock().unwrap().recv().unwrap();
180//        match message {
181//          Message::NewJob(job) => {
182//            trace!("Worker {} got a job; executing.", id);
183//            job.call_box();
184//          },
185//          Message::Terminate => {
186//            trace!("Worker {} was told to terminate.", id);
187//            break;
188//          },
189//        }
190//      }
191//    });
192//
193//    Worker {
194//      id,
195//      thread: Some(thread),
196//    }
197//  }
198//}