tof_dataclasses/
threading.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
//! Thread control structures
//! FIXME - this should go to liftof-lib

use std::collections::HashMap;
use std::fmt;

/// Send runtime information 
/// to threads via shared memory
/// (Arc(Mutex)
#[derive(Default, Debug)]
pub struct ThreadControl {
  /// Stop ALL threads
  pub stop_flag                  : bool,
  /// Trigger calibration thread
  pub calibration_active         : bool,
  /// Keep track on how many calibration 
  /// packets we have received
  pub finished_calibrations      : HashMap<u8,bool>,
  /// alive indicator for cmd dispatch thread
  pub thread_cmd_dispatch_active : bool,
  /// alive indicator for data sink thread
  pub thread_data_sink_active    : bool,
  /// alive indicator for runner thread
  pub thread_runner_active       : bool,
  /// alive indicator for event builder thread
  pub thread_event_bldr_active   : bool,
  /// alive indicator for master trigger thread
  pub thread_master_trg_active   : bool,
  /// alive indicator for monitoring thread
  pub thread_monitoring_active   : bool,
  /// Running readoutboard communicator threads - the key is associated rb id
  pub thread_rbcomm_active       : HashMap<u8, bool>,
  /// The current run id
  pub run_id                     : u32,
  /// The number of boards available
  pub n_rbs                      : u32,
  /// Write data to disk
  pub write_data_to_disk         : bool,
}

impl ThreadControl {
  pub fn new() -> Self {
    Self {
      stop_flag                  : false,
      calibration_active         : false,
      finished_calibrations      : HashMap::<u8,bool>::new(),
      thread_cmd_dispatch_active : false,
      thread_data_sink_active    : false,
      thread_runner_active       : false,
      thread_event_bldr_active   : false,
      thread_master_trg_active   : false,
      thread_monitoring_active   : false,
      thread_rbcomm_active       : HashMap::<u8,bool>::new(),
      run_id                     : 0,
      n_rbs                      : 0,
      write_data_to_disk         : false,
    }
  }
}

impl fmt::Display for ThreadControl {
  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
    let mut repr = String::from("<ThreadControl:");
    repr        += &(format!("\n  Run ID         : {}", self.run_id));
    repr        += &(format!("\n  N RBs          : {}", self.n_rbs));
    repr        += &(format!("\n  wr to disk     : {}", self.write_data_to_disk));
    repr        += "\n    -- reported RB calibration activity:";
    repr        += &(format!("\n  RB cali active : {}", self.calibration_active));
    repr        += &(format!("\n  -- finished    : \n{:?}", self.finished_calibrations));       
    repr        += "\n    -- program status:";
    repr        += &(format!("\n  stop flag : {}", self.stop_flag));
    repr        += "\n    -- reported thread activity:";
    repr        += &(format!("\n  cmd dispatcher : {}", self.thread_cmd_dispatch_active));
    repr        += &(format!("\n  runner         : {}", self.thread_runner_active));
    repr        += &(format!("\n  data sink      : {}", self.thread_data_sink_active));
    repr        += &(format!("\n  monitoring     : {}", self.thread_monitoring_active));
    if self.thread_rbcomm_active.len() > 0 {
      repr        += "\n -- active RB threads";
      for k in self.thread_rbcomm_active.keys() {
        repr      += &(format!("\n -- -- {} : {}", k, self.thread_rbcomm_active.get(k).unwrap()));
      }
    }
    repr        += &(format!("\n  master trig    : {}>", self.thread_master_trg_active));
    write!(f, "{}", repr)
  }
}


//enum Message {
//  NewJob(Job),
//  Terminate,
//}
//
//
///// Implements "standard" Threadpool. 
/////
///// Threadpool spawns unnamed threads 
///// for workers
//pub struct ThreadPool {
//  workers: Vec<Worker>,
//  sender: mpsc::Sender<Message>,
//}
//
//trait FnBox {
//  fn call_box(self: Box<Self>);
//}
//
//impl<F: FnOnce()> FnBox for F {
//  fn call_box(self: Box<F>) {
//    (*self)()
//  }
//}
//
//type Job = Box<dyn FnBox + Send + 'static>;
//
//impl ThreadPool {
//  /// Create a new ThreadPool.
//  ///
//  /// The size is the number of threads in the pool.
//  ///
//  /// # Panics
//  ///
//  /// The `new` function will panic if the size is zero.
//  pub fn new(size: usize) -> ThreadPool {
//    assert!(size > 0);
//
//    let (sender, receiver) = mpsc::channel();
//    let receiver = Arc::new(Mutex::new(receiver));
//    let mut workers = Vec::with_capacity(size);
//
//    for id in 0..size {
//      workers.push(Worker::new(id, Arc::clone(&receiver)));
//    }
//
//    ThreadPool {
//      workers,
//      sender,
//    }
//  }
//
//  pub fn execute<F>(&self, f: F)
//    where
//        F: FnOnce() + Send + 'static {
//    let job = Box::new(f);
//    self.sender.send(Message::NewJob(job)).unwrap();
//  }
//}
//
//impl Drop for ThreadPool {
//  fn drop(&mut self) {
//    info!("Sending terminate message to all workers.");
//
//    for _ in &mut self.workers {
//      self.sender.send(Message::Terminate).unwrap();
//    }
//
//    warn!("Shutting down all workers.");
//
//    for worker in &mut self.workers {
//      info!("Shutting down worker {}", worker.id);
//
//      if let Some(thread) = worker.thread.take() {
//          thread.join().unwrap();
//      }
//    }
//  }
//}
//
//struct Worker {
//  id: usize,
//  thread: Option<thread::JoinHandle<()>>,
//}
//
//impl Worker {
//  fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
//    Worker {
//      let thread = thread::spawn(move ||{
//      loop {
//        let message = receiver.lock().unwrap().recv().unwrap();
//        match message {
//          Message::NewJob(job) => {
//            trace!("Worker {} got a job; executing.", id);
//            job.call_box();
//          },
//          Message::Terminate => {
//            trace!("Worker {} was told to terminate.", id);
//            break;
//          },
//        }
//      }
//    });
//
//    Worker {
//      id,
//      thread: Some(thread),
//    }
//  }
//}