1use crate::prelude::*;
13
14
15pub const MT_MAX_PACKSIZE : usize = 128;
19
20pub const UDP_SOCKET_SLEEP_USEC : u64 = 100;
23
24#[cfg_attr(feature = "pybindings", pyclass(eq, eq_int))]
36#[derive(Debug, PartialEq, Clone, Copy, FromRepr, AsRefStr, EnumIter)]
37#[repr(u8)]
38pub enum IPBusPacketType {
39 Read = 0,
40 Write = 1,
41 ReadNonIncrement = 2,
45 WriteNonIncrement = 3,
46 RMW = 4,
47 Unknown = 99
50}
51
52expand_and_test_enum!(IPBusPacketType, test_ipbuspackettype_repr);
53
54
55#[derive(Debug)]
70#[cfg_attr(feature="pybindings", pyclass)]
71pub struct IPBus {
72 pub socket : UdpSocket,
73 pub packet_type : IPBusPacketType,
75 pub pid : u16,
78 pub expected_pid : u16,
79 pub last_pid : u16,
80 pub buffer : [u8;MT_MAX_PACKSIZE]
81}
82
83impl IPBus {
84
85 pub fn new(target_address : &str)
86 -> io::Result<Self> {
87 let socket = Self::connect(target_address)?;
88 let mut bus = Self {
89 socket : socket,
90 packet_type : IPBusPacketType::Read,
92 pid : 0,
93 expected_pid : 0,
94 last_pid : 0,
95 buffer : [0;MT_MAX_PACKSIZE]
96 };
97 match bus.realign_packet_id() {
98 Err(err) => {
99 error!("Packet ID realign failed! {}", err);
100 return Err(std::io::Error::new(std::io::ErrorKind::Other, "Can not realign packet id"));
101 },
102 Ok(_) => ()
103 }
104 Ok(bus)
105 }
106
107 pub fn connect(target_address : &str)
118 ->io::Result<UdpSocket> {
119 let local_addrs = [
121 SocketAddr::from(([0, 0, 0, 0], 50100)),
122 SocketAddr::from(([0, 0, 0, 0], 50101)),
123 SocketAddr::from(([0, 0, 0, 0], 50102)),
124 SocketAddr::from(([0, 0, 0, 0], 50103)),
125 SocketAddr::from(([0, 0, 0, 0], 50104)),
126 ];
127 let local_socket = UdpSocket::bind(&local_addrs[..]);
128 let socket : UdpSocket;
129 match local_socket {
130 Err(err) => {
131 error!("Can not create local UDP socket for master trigger connection!, err {}", err);
132 return Err(err);
133 }
134 Ok(value) => {
135 info!("Successfully bound UDP socket for master trigger communcations to {:?}", value);
136 socket = value;
137 match socket.set_read_timeout(Some(Duration::from_micros(1000))) {
140 Err(err) => error!("Can not set read timeout for Udp socket! {err}"),
141 Ok(_) => ()
142 }
143 match socket.set_write_timeout(Some(Duration::from_micros(1000))) {
144 Err(err) => error!("Can not set write timeout for Udp socket! {err}"),
145 Ok(_) => ()
146 }
147 match socket.connect(target_address) {
148 Err(err) => {
149 error!("Can not connect to IPBus socket to target address {}! {}", target_address, err);
150 return Err(err);
151 }
152 Ok(_) => info!("Successfully connected IPBus to target address {}!", target_address)
153 }
154 match socket.set_nonblocking(false) {
155 Err(err) => {
156 error!("Can not set socket to blocking mode! {err}");
157 },
158 Ok(_) => ()
159 }
160 return Ok(socket);
161 }
162 } }
164
165 fn get_next_pid(&mut self) -> u16 {
169 let pid = self.pid;
170 self.expected_pid = self.pid;
171 self.pid += 1;
173 if self.pid > u16::MAX {
175 self.pid = 0;
176 return 0;
177 }
178 return pid;
179 }
180
181 pub fn receive(&mut self) -> io::Result<usize> {
184 let number_of_bytes = self.socket.recv(&mut self.buffer)?;
185 Ok(number_of_bytes)
187 }
188
189
190 pub fn send(&mut self, data : &Vec<u8>) -> io::Result<()> {
193 self.socket.send(data.as_slice())?;
194 thread::sleep(Duration::from_micros(UDP_SOCKET_SLEEP_USEC));
195 Ok(())
196 }
197
198
199
200 pub fn get_status(&mut self)
205 -> Result<(), Box<dyn Error>> {
206 let mut udp_data = Vec::<u8>::new();
207 let mut phead = self.create_packetheader(true);
208 phead = phead & 0xfffffff0;
209 phead = phead | 0x00000001;
210 udp_data.extend_from_slice(&phead.to_be_bytes());
211 for _ in 0..15 {
212 udp_data.push(0);
213 udp_data.push(0);
214 udp_data.push(0);
215 udp_data.push(0);
216 }
217 let mut send_again = true;
218 let mut number_of_bytes : usize;
219 loop {
220 if send_again {
221 match self.send(&udp_data) {
222 Err(err) => error!("Unable to send udp data! {err}"),
223 Ok(_) => ()
224 }
225 }
226 trace!("[IPBus::get_status => message {:?} sent!", udp_data);
227 match self.receive() {
228 Err(err) => {
229 error!("Can not receive status packet from Udp Socket! {err}");
230 return Err(Box::new(IPBusError::UdpReceiveFailed));
231 },
232 Ok(_number_of_bytes) => {
233 number_of_bytes = _number_of_bytes;
234 }
235 }
236 let status_byte = self.buffer[3];
238 if status_byte & 0x1 != 1 {
239 send_again = false;
242 continue;
243 } else {
244 break;
245 }
246 }
247 trace!("[IPBus::get_status] => {} bytes received!", number_of_bytes);
248 for word in 0..16 {
250 trace!("[IPBus::get_status] => WORD {word} : [{},{},{},{}]", self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4+2], self.buffer[word*4+3]);
251 }
252 Ok(())
253 }
254
255
256 fn create_packetheader(&mut self, status : bool) -> u32 {
261 let pid : u16;
263 if status {
264 pid = 0;
265 } else {
266 pid = self.get_next_pid();
267 }
268 let pid_bytes = pid.to_be_bytes();
269 let pid_be0 = (pid_bytes[0] as u32) << 16;
270 let pid_be1 = (pid_bytes[1] as u32) << 8;
271 let header = (0x2 << 28) as u32
272 | (0x0 << 24) as u32
273 | pid_be0
274 | pid_be1
275 | (0xf << 4) as u32
276 | 0x0 as u32; trace!("[IPBus::create_packetheader] => Will use packet ID {pid}");
279 trace!("[IPBus::create_packetheader] => Generated header {:?}", header.to_be_bytes());
280 header
281 }
282
283 fn create_transactionheader(&self, nwords : u8) -> u32 {
284 let header = (0x2 << 28) as u32
285 | (0x0 << 24) as u32
286 | (0x0 << 20) as u32
287 | (0x0 << 16) as u32
288 | (nwords as u32) << 8
289 | ((self.packet_type as u8 & 0xf) << 4) as u32
290 | 0xf as u32; header
292 }
293
294 fn encode_payload(&mut self,
306 addr : u32,
307 data : &Vec<u32>) -> Vec<u8> {
308 let mut udp_data = Vec::<u8>::new();
309 let pheader = self.create_packetheader(false);
310 let nwords = data.len() as u8;
311 trace!("[IPBus::encode_payload] => Encoding payload for packet type {}!", self.packet_type);
312 let theader = self.create_transactionheader(nwords);
313 udp_data.extend_from_slice(&pheader.to_be_bytes());
314 udp_data.extend_from_slice(&theader.to_be_bytes());
315 udp_data.extend_from_slice(&addr.to_be_bytes());
316 if self.packet_type == IPBusPacketType::Write
317 || self.packet_type == IPBusPacketType::WriteNonIncrement {
318 for i in data {
319 udp_data.extend_from_slice(&i.to_be_bytes());
320 }
321 }
322 trace!("[IPBus::encode_payload] => payload {:?}", udp_data);
323 udp_data
324 }
325
326 pub fn get_pid_from_current_buffer(&self) -> u16 {
327 let buffer = self.buffer.to_vec();
328 let pheader = parse_u32_be(&buffer, &mut 0);
329 let pid = ((0x00ffff00 & pheader) >> 8) as u16;
330 pid
331 }
332
333 fn decode_payload(&mut self,
342 verbose : bool)
343 -> Result<Vec<u32>, IPBusError> {
344 let mut pos : usize = 0;
345 let mut data = Vec::<u32>::new();
346 let buffer = self.buffer.to_vec();
347 let is_status = buffer[3] & 0x1 == 1;
349 trace!("[IPBus::decode_payload] => buffer (vec) {:?}", buffer);
350 let pheader = parse_u32_be(&buffer, &mut pos);
351 let theader = parse_u32_be(&buffer, &mut pos);
352 trace!("[IPBus::decode_payload] => pheader {pheader}");
353 trace!("[IPBus::decode_payload] => theader {theader}");
354 let pid = ((0x00ffff00 & pheader) >> 8) as u16;
355 let size = ((0x0000ff00 & theader) >> 8) as u16;
356 let ptype = ((0x000000f0 & theader) >> 4) as u8;
357 let packet_type = IPBusPacketType::from(ptype);
358 trace!("[IPBus::decode_payload] => PID, SIZE, PTYPE : {} {} {}", pid, size, packet_type);
359 if pid != self.expected_pid {
360 if !is_status {
361 error!("Invalid packet ID. Expected {}, received {}", self.expected_pid, pid);
362 return Err(IPBusError::InvalidPacketID);
369 }
370 }
371 match packet_type {
372 IPBusPacketType::Unknown => {
373 return Err(IPBusError::DecodingFailed);
374 }
375 IPBusPacketType::Read |
376 IPBusPacketType::ReadNonIncrement => {
377 if (((size as usize) * 4) + 11) < MT_MAX_PACKSIZE {
378 for i in 0..size as usize {
379 data.push( ((self.buffer[8 + i * 4] as u32) << 24)
380 | ((self.buffer[9 + i * 4] as u32) << 16)
381 | ((self.buffer[10 + i * 4] as u32) << 8)
382 | self.buffer[11 + i * 4] as u32)
383 }
384 } else {
385 error!("Size {} larger than bufffer len {}", size, data.len());
386 }
387 },
388 IPBusPacketType::Write => {
389 data.push(0);
390 },
391 IPBusPacketType::WriteNonIncrement => {
392 error!("Decoding of WriteNonIncrement packet not supported!");
393 },
394 IPBusPacketType::RMW => {
395 error!("Decoding of RMW packet not supported!!");
396 }
397 }
398 if verbose {
399 println!("[IPBus::decode_payload] ==> Decoding IPBus Packet:");
400 println!(" >> Msg : {:?}", self.buffer);
401 println!(" >> data : {:?}", data);
408 }
409 Ok(data)
410 }
411
412 pub fn realign_packet_id(&mut self)
414 -> Result<(), Box<dyn Error>> {
415 trace!("[IPBus::realign_packet_id] - aligning...");
416 let pid = self.get_target_next_expected_packet_id()?;
417 self.pid = pid;
418 self.expected_pid = pid;
419 trace!("[IPBus::realign_packet_id] - aligned {}", self.pid);
430 Ok(())
431 }
432
433 pub fn buffer_is_status(&self) -> bool {
434 self.buffer[3] & 0x1 == 1
435 }
436
437 pub fn get_target_next_expected_packet_id(&mut self)
439 -> Result<u16, Box<dyn Error>> {
440 self.get_status()?;
441 let word = 3usize;
443 trace!("[IPBus::get_status] => WORD {word} : [{},{},{},{}]", self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4+2], self.buffer[word*4+3]);
444 let word3 = [self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4 + 2], self.buffer[word*4 + 3]];
445 let target_exp_pid = u16::from_be_bytes([word3[1], word3[2]]);
446 trace!("[IPBus::target_next_pid] => Get expected packet id {target_exp_pid}");
447 Ok(target_exp_pid)
448 }
449
450
451 pub fn read_multiple(&mut self,
465 addr : u32,
466 nwords : usize,
467 increment_addr : bool)
468 -> Result<Vec<u32>, Box<dyn Error>> {
469 let send_data = vec![0u32;nwords];
470 if increment_addr {
471 self.packet_type = IPBusPacketType::Read;
472 } else {
473 self.packet_type = IPBusPacketType::ReadNonIncrement;
474 }
475 let mut message = self.encode_payload(addr, &send_data);
476 let mut send_again = true;
477 let timeout = Instant::now();
478 loop {
479 if send_again {
480 self.send(&message)?;
481 }
482 match self.receive() {
483 Err(_err) => {
484 self.realign_packet_id()?;
492 message = self.encode_payload(addr, &send_data);
495 send_again = true;
496 if timeout.elapsed().as_millis() == 10 {
497 return Err(Box::new(IPBusError::UdpReceiveFailed));
498 }
499 continue;
500 }
501 Ok(number_of_bytes) => {
502
503 if self.buffer_is_status() {
504 continue;
506 }
507
508 let pid_from_buffer = self.get_pid_from_current_buffer();
509 if pid_from_buffer != self.expected_pid {
510 error!("We got a packet, but the PacketID is not as expected!");
511 error!("-- self.pid {}, self.expected_pid {}, buffer pid {}", self.pid, self.expected_pid, pid_from_buffer);
512 if self.expected_pid > pid_from_buffer {
514 send_again = false;
515 continue;
516 } else {
517 self.realign_packet_id()?;
520 message = self.encode_payload(addr, &send_data);
521 send_again = true;
522 continue;
523 }
524 }
525 trace!("[IPBus::read] => Received {} bytes from master trigger! Message {:?}", number_of_bytes, self.buffer);
526 break;
527 }
528 } } let data = self.decode_payload(false)?;
531 if data.len() == 0 {
532 error!("Received empty data!");
533 return Err(Box::new(IPBusError::DecodingFailed));
534 }
535 Ok(data)
536 }
537
538 pub fn read(&mut self,
544 addr : u32)
545 -> Result<u32, Box<dyn Error>> {
546
547 let data = self.read_multiple(addr,1,false)?;
548 Ok(data[0])
549 }
550
551 pub fn write(&mut self,
558 addr : u32,
559 data : u32)
560 -> Result<(), Box<dyn Error>> {
561 let send_data = Vec::<u32>::from([data]);
563 self.packet_type = IPBusPacketType::Write;
564 let message = self.encode_payload(addr, &send_data);
565 match self.send(&message) {
566 Err(err) => {
567 error!("Sending Udp message failed! {err}");
568 return Err(Box::new(IPBusError::UdpSendFailed));
569 },
570 Ok(_) => ()
571 }
572 match self.receive() {
573 Err(err) => {
576 error!("Unable to receive data! i/o error : {}", err.kind());
578 let target_exp_pid = self.get_target_next_expected_packet_id()?;
579 let buffer_pid = self.get_pid_from_current_buffer();
580 error!("self.pid {}, self.expected.pid {}, target expects pid {:?}, buffer pid {} ", self.pid, self.expected_pid, target_exp_pid, buffer_pid);
581 return Err(Box::new(IPBusError::UdpReceiveFailed));
582 }
584 Ok(_data) => {
585 trace!("[ipbus::write] => Got buffer {:?}", self.buffer);
587 return Ok(());
588 }
589 }
590 }
591}
592
593impl fmt::Display for IPBus {
594 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
595 let mut repr = String::from("<IPBus:");
596 repr += &(format!(" pid : {}>", self.pid));
597 write!(f, "{}", repr)
598 }
599}
600
601
602#[cfg(feature="pybindings")]
603#[pymethods]
604impl IPBus {
605 #[new]
606 fn new_py(target_address : &str) -> Self {
607 IPBus::new(target_address).expect("Unable to connect to {target_address}")
608 }
609
610 #[getter]
612 #[pyo3(name="status")]
613 pub fn get_status_py(&mut self) -> PyResult<()> {
614 match self.get_status() {
615 Ok(_) => {
616 return Ok(());
617 },
618 Err(err) => {
619 return Err(PyValueError::new_err(err.to_string()));
620 }
621 }
622 }
623
624 #[getter]
625 #[pyo3(name="buffer")]
626 pub fn get_buffer(&self) -> [u8;MT_MAX_PACKSIZE] {
627 return self.buffer.clone();
628 }
629
630 #[pyo3(name="set_packet_id")]
631 pub fn set_packet_id_py(&mut self, pid : u16) {
632 self.pid = pid;
633 }
634
635 #[pyo3(name="get_packet_id")]
636 pub fn get_packet_id_py(&self) -> u16 {
637 self.pid
638 }
639
640 #[getter]
641 #[pyo3(name="expected_pid")]
642 pub fn get_expected_packet_id_py(&self) -> u16 {
643 self.expected_pid
644 }
645
646 #[pyo3(name="realign_packet_id")]
648 pub fn realign_packet_id_py(&mut self) -> PyResult<()> {
649 match self.realign_packet_id() {
650 Ok(_) => {
651 return Ok(());
652 },
653 Err(err) => {
654 return Err(PyValueError::new_err(err.to_string()));
655 }
656 }
657 }
658
659 #[getter]
661 #[pyo3(name="target_next_expected_pid")]
662 pub fn get_target_next_expected_packet_id_py(&mut self)
663 -> PyResult<u16> {
664 match self.get_target_next_expected_packet_id() {
665 Ok(result) => {
666 return Ok(result);
667 },
668 Err(err) => {
669 return Err(PyValueError::new_err(err.to_string()));
670 }
671 }
672 }
673
674 #[pyo3(name="read_multiple")]
675 pub fn read_multiple_py(&mut self,
676 addr : u32,
677 nwords : usize,
678 increment_addr : bool)
679 -> PyResult<Vec<u32>> {
680
681 match self.read_multiple(addr,
682 nwords,
683 increment_addr) {
684 Ok(result) => {
685 return Ok(result);
686 },
687 Err(err) => {
688 return Err(PyValueError::new_err(err.to_string()));
689 }
690 }
691 }
692
693 #[pyo3(name="write")]
694 pub fn write_py(&mut self,
695 addr : u32,
696 data : u32)
697 -> PyResult<()> {
698
699 match self.write(addr, data) {
700 Ok(_) => Ok(()),
701 Err(err) => {
702 return Err(PyValueError::new_err(err.to_string()));
703 }
704 }
705 }
706
707
708 #[pyo3(name="read")]
709 pub fn read_py(&mut self, addr : u32)
710 -> PyResult<u32> {
711 match self.read(addr) {
712 Ok(result) => {
713 return Ok(result);
714 },
715 Err(err) => {
716 return Err(PyValueError::new_err(err.to_string()));
717 }
718 }
719 }
720}