tof_dataclasses/threading.rs
1//! Thread control structures
2//! FIXME - this should go to liftof-lib
3
4use std::collections::HashMap;
5use std::fmt;
6
7/// Send runtime information
8/// to threads via shared memory
9/// (Arc(Mutex)
10#[derive(Default, Debug)]
11pub struct ThreadControl {
12 /// Stop ALL threads
13 pub stop_flag : bool,
14 /// Trigger calibration thread
15 pub calibration_active : bool,
16 /// Keep track on how many calibration
17 /// packets we have received
18 pub finished_calibrations : HashMap<u8,bool>,
19 /// alive indicator for cmd dispatch thread
20 pub thread_cmd_dispatch_active : bool,
21 /// alive indicator for data sink thread
22 pub thread_data_sink_active : bool,
23 /// alive indicator for runner thread
24 pub thread_runner_active : bool,
25 /// alive indicator for event builder thread
26 pub thread_event_bldr_active : bool,
27 /// alive indicator for master trigger thread
28 pub thread_master_trg_active : bool,
29 /// alive indicator for monitoring thread
30 pub thread_monitoring_active : bool,
31 /// Running readoutboard communicator threads - the key is associated rb id
32 pub thread_rbcomm_active : HashMap<u8, bool>,
33 /// The current run id
34 pub run_id : u32,
35 /// The number of boards available
36 pub n_rbs : u32,
37 /// Write data to disk
38 pub write_data_to_disk : bool,
39}
40
41impl ThreadControl {
42 pub fn new() -> Self {
43 Self {
44 stop_flag : false,
45 calibration_active : false,
46 finished_calibrations : HashMap::<u8,bool>::new(),
47 thread_cmd_dispatch_active : false,
48 thread_data_sink_active : false,
49 thread_runner_active : false,
50 thread_event_bldr_active : false,
51 thread_master_trg_active : false,
52 thread_monitoring_active : false,
53 thread_rbcomm_active : HashMap::<u8,bool>::new(),
54 run_id : 0,
55 n_rbs : 0,
56 write_data_to_disk : false,
57 }
58 }
59}
60
61impl fmt::Display for ThreadControl {
62 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
63 let mut repr = String::from("<ThreadControl:");
64 repr += &(format!("\n Run ID : {}", self.run_id));
65 repr += &(format!("\n N RBs : {}", self.n_rbs));
66 repr += &(format!("\n wr to disk : {}", self.write_data_to_disk));
67 repr += "\n -- reported RB calibration activity:";
68 repr += &(format!("\n RB cali active : {}", self.calibration_active));
69 repr += &(format!("\n -- finished : \n{:?}", self.finished_calibrations));
70 repr += "\n -- program status:";
71 repr += &(format!("\n stop flag : {}", self.stop_flag));
72 repr += "\n -- reported thread activity:";
73 repr += &(format!("\n cmd dispatcher : {}", self.thread_cmd_dispatch_active));
74 repr += &(format!("\n runner : {}", self.thread_runner_active));
75 repr += &(format!("\n data sink : {}", self.thread_data_sink_active));
76 repr += &(format!("\n monitoring : {}", self.thread_monitoring_active));
77 if self.thread_rbcomm_active.len() > 0 {
78 repr += "\n -- active RB threads";
79 for k in self.thread_rbcomm_active.keys() {
80 repr += &(format!("\n -- -- {} : {}", k, self.thread_rbcomm_active.get(k).unwrap()));
81 }
82 }
83 repr += &(format!("\n master trig : {}>", self.thread_master_trg_active));
84 write!(f, "{}", repr)
85 }
86}
87
88
89//enum Message {
90// NewJob(Job),
91// Terminate,
92//}
93//
94//
95///// Implements "standard" Threadpool.
96/////
97///// Threadpool spawns unnamed threads
98///// for workers
99//pub struct ThreadPool {
100// workers: Vec<Worker>,
101// sender: mpsc::Sender<Message>,
102//}
103//
104//trait FnBox {
105// fn call_box(self: Box<Self>);
106//}
107//
108//impl<F: FnOnce()> FnBox for F {
109// fn call_box(self: Box<F>) {
110// (*self)()
111// }
112//}
113//
114//type Job = Box<dyn FnBox + Send + 'static>;
115//
116//impl ThreadPool {
117// /// Create a new ThreadPool.
118// ///
119// /// The size is the number of threads in the pool.
120// ///
121// /// # Panics
122// ///
123// /// The `new` function will panic if the size is zero.
124// pub fn new(size: usize) -> ThreadPool {
125// assert!(size > 0);
126//
127// let (sender, receiver) = mpsc::channel();
128// let receiver = Arc::new(Mutex::new(receiver));
129// let mut workers = Vec::with_capacity(size);
130//
131// for id in 0..size {
132// workers.push(Worker::new(id, Arc::clone(&receiver)));
133// }
134//
135// ThreadPool {
136// workers,
137// sender,
138// }
139// }
140//
141// pub fn execute<F>(&self, f: F)
142// where
143// F: FnOnce() + Send + 'static {
144// let job = Box::new(f);
145// self.sender.send(Message::NewJob(job)).unwrap();
146// }
147//}
148//
149//impl Drop for ThreadPool {
150// fn drop(&mut self) {
151// info!("Sending terminate message to all workers.");
152//
153// for _ in &mut self.workers {
154// self.sender.send(Message::Terminate).unwrap();
155// }
156//
157// warn!("Shutting down all workers.");
158//
159// for worker in &mut self.workers {
160// info!("Shutting down worker {}", worker.id);
161//
162// if let Some(thread) = worker.thread.take() {
163// thread.join().unwrap();
164// }
165// }
166// }
167//}
168//
169//struct Worker {
170// id: usize,
171// thread: Option<thread::JoinHandle<()>>,
172//}
173//
174//impl Worker {
175// fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
176// Worker {
177// let thread = thread::spawn(move ||{
178// loop {
179// let message = receiver.lock().unwrap().recv().unwrap();
180// match message {
181// Message::NewJob(job) => {
182// trace!("Worker {} got a job; executing.", id);
183// job.call_box();
184// },
185// Message::Terminate => {
186// trace!("Worker {} was told to terminate.", id);
187// break;
188// },
189// }
190// }
191// });
192//
193// Worker {
194// id,
195// thread: Some(thread),
196// }
197// }
198//}