1use crate::prelude::*;
13
14
15pub const MT_MAX_PACKSIZE : usize = 128;
19
20pub const UDP_SOCKET_SLEEP_USEC : u64 = 100;
23
24#[derive(Debug, PartialEq, Clone, Copy, FromRepr, AsRefStr, EnumIter)]
34#[repr(u8)]
35pub enum IPBusPacketType {
36 Read = 0,
37 Write = 1,
38 ReadNonIncrement = 2,
42 WriteNonIncrement = 3,
43 RMW = 4,
44 Unknown = 99
47}
48
49expand_and_test_enum!(IPBusPacketType, test_ipbuspackettype_repr);
50
51
52#[derive(Debug)]
67#[cfg_attr(feature="pybindings", pyclass)]
68pub struct IPBus {
69 pub socket : UdpSocket,
70 pub packet_type : IPBusPacketType,
72 pub pid : u16,
75 pub expected_pid : u16,
76 pub last_pid : u16,
77 pub buffer : [u8;MT_MAX_PACKSIZE]
78}
79
80impl IPBus {
81
82 pub fn new(target_address : &str)
83 -> io::Result<Self> {
84 let socket = Self::connect(target_address)?;
85 let mut bus = Self {
86 socket : socket,
87 packet_type : IPBusPacketType::Read,
89 pid : 0,
90 expected_pid : 0,
91 last_pid : 0,
92 buffer : [0;MT_MAX_PACKSIZE]
93 };
94 match bus.realign_packet_id() {
95 Err(err) => {
96 error!("Packet ID realign failed! {}", err);
97 return Err(std::io::Error::new(std::io::ErrorKind::Other, "Can not realign packet id"));
98 },
99 Ok(_) => ()
100 }
101 Ok(bus)
102 }
103
104 pub fn connect(target_address : &str)
115 ->io::Result<UdpSocket> {
116 let local_addrs = [
118 SocketAddr::from(([0, 0, 0, 0], 50100)),
119 SocketAddr::from(([0, 0, 0, 0], 50101)),
120 SocketAddr::from(([0, 0, 0, 0], 50102)),
121 SocketAddr::from(([0, 0, 0, 0], 50103)),
122 SocketAddr::from(([0, 0, 0, 0], 50104)),
123 ];
124 let local_socket = UdpSocket::bind(&local_addrs[..]);
125 let socket : UdpSocket;
126 match local_socket {
127 Err(err) => {
128 error!("Can not create local UDP socket for master trigger connection!, err {}", err);
129 return Err(err);
130 }
131 Ok(value) => {
132 info!("Successfully bound UDP socket for master trigger communcations to {:?}", value);
133 socket = value;
134 match socket.set_read_timeout(Some(Duration::from_millis(1))) {
137 Err(err) => error!("Can not set read timeout for Udp socket! {err}"),
138 Ok(_) => ()
139 }
140 match socket.set_write_timeout(Some(Duration::from_millis(1))) {
141 Err(err) => error!("Can not set write timeout for Udp socket! {err}"),
142 Ok(_) => ()
143 }
144 match socket.connect(target_address) {
145 Err(err) => {
146 error!("Can not connect to IPBus socket to target address {}! {}", target_address, err);
147 return Err(err);
148 }
149 Ok(_) => info!("Successfully connected IPBus to target address {}!", target_address)
150 }
151 match socket.set_nonblocking(false) {
152 Err(err) => {
153 error!("Can not set socket to blocking mode! {err}");
154 },
155 Ok(_) => ()
156 }
157 return Ok(socket);
158 }
159 } }
161
162 fn get_next_pid(&mut self) -> u16 {
166 let pid = self.pid;
167 self.expected_pid = self.pid;
168 self.pid += 1;
170 if self.pid > u16::MAX {
172 self.pid = 0;
173 return 0;
174 }
175 return pid;
176 }
177
178 pub fn receive(&mut self) -> io::Result<usize> {
181 let number_of_bytes = self.socket.recv(&mut self.buffer)?;
182 Ok(number_of_bytes)
184 }
185
186
187 pub fn send(&mut self, data : &Vec<u8>) -> io::Result<()> {
190 self.socket.send(data.as_slice())?;
191 thread::sleep(Duration::from_micros(UDP_SOCKET_SLEEP_USEC));
192 Ok(())
193 }
194
195
196
197 pub fn get_status(&mut self)
202 -> Result<(), Box<dyn Error>> {
203 let mut udp_data = Vec::<u8>::new();
204 let mut phead = self.create_packetheader(true);
205 phead = phead & 0xfffffff0;
206 phead = phead | 0x00000001;
207 udp_data.extend_from_slice(&phead.to_be_bytes());
208 for _ in 0..15 {
209 udp_data.push(0);
210 udp_data.push(0);
211 udp_data.push(0);
212 udp_data.push(0);
213 }
214 let mut send_again = true;
215 let mut number_of_bytes : usize;
216 loop {
217 if send_again {
218 match self.send(&udp_data) {
219 Err(err) => error!("Unable to send udp data! {err}"),
220 Ok(_) => ()
221 }
222 }
223 trace!("[IPBus::get_status => message {:?} sent!", udp_data);
224 match self.receive() {
225 Err(err) => {
226 error!("Can not receive status packet from Udp Socket! {err}");
227 return Err(Box::new(IPBusError::UdpReceiveFailed));
228 },
229 Ok(_number_of_bytes) => {
230 number_of_bytes = _number_of_bytes;
231 }
232 }
233 let status_byte = self.buffer[3];
235 if status_byte & 0x1 != 1 {
236 send_again = false;
239 continue;
240 } else {
241 break;
242 }
243 }
244 trace!("[IPBus::get_status] => {} bytes received!", number_of_bytes);
245 for word in 0..16 {
247 trace!("[IPBus::get_status] => WORD {word} : [{},{},{},{}]", self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4+2], self.buffer[word*4+3]);
248 }
249 Ok(())
250 }
251
252
253 fn create_packetheader(&mut self, status : bool) -> u32 {
258 let pid : u16;
260 if status {
261 pid = 0;
262 } else {
263 pid = self.get_next_pid();
264 }
265 let pid_bytes = pid.to_be_bytes();
266 let pid_be0 = (pid_bytes[0] as u32) << 16;
267 let pid_be1 = (pid_bytes[1] as u32) << 8;
268 let header = (0x2 << 28) as u32
269 | (0x0 << 24) as u32
270 | pid_be0
271 | pid_be1
272 | (0xf << 4) as u32
273 | 0x0 as u32; trace!("[IPBus::create_packetheader] => Will use packet ID {pid}");
276 trace!("[IPBus::create_packetheader] => Generated header {:?}", header.to_be_bytes());
277 header
278 }
279
280 fn create_transactionheader(&self, nwords : u8) -> u32 {
281 let header = (0x2 << 28) as u32
282 | (0x0 << 24) as u32
283 | (0x0 << 20) as u32
284 | (0x0 << 16) as u32
285 | (nwords as u32) << 8
286 | ((self.packet_type as u8 & 0xf) << 4) as u32
287 | 0xf as u32; header
289 }
290
291 fn encode_payload(&mut self,
303 addr : u32,
304 data : &Vec<u32>) -> Vec<u8> {
305 let mut udp_data = Vec::<u8>::new();
306 let pheader = self.create_packetheader(false);
307 let nwords = data.len() as u8;
308 trace!("[IPBus::encode_payload] => Encoding payload for packet type {}!", self.packet_type);
309 let theader = self.create_transactionheader(nwords);
310 udp_data.extend_from_slice(&pheader.to_be_bytes());
311 udp_data.extend_from_slice(&theader.to_be_bytes());
312 udp_data.extend_from_slice(&addr.to_be_bytes());
313 if self.packet_type == IPBusPacketType::Write
314 || self.packet_type == IPBusPacketType::WriteNonIncrement {
315 for i in data {
316 udp_data.extend_from_slice(&i.to_be_bytes());
317 }
318 }
319 trace!("[IPBus::encode_payload] => payload {:?}", udp_data);
320 udp_data
321 }
322
323 pub fn get_pid_from_current_buffer(&self) -> u16 {
324 let buffer = self.buffer.to_vec();
325 let pheader = parse_u32_be(&buffer, &mut 0);
326 let pid = ((0x00ffff00 & pheader) >> 8) as u16;
327 pid
328 }
329
330 fn decode_payload(&mut self,
339 verbose : bool)
340 -> Result<Vec<u32>, IPBusError> {
341 let mut pos : usize = 0;
342 let mut data = Vec::<u32>::new();
343 let buffer = self.buffer.to_vec();
344 let is_status = buffer[3] & 0x1 == 1;
346 trace!("[IPBus::decode_payload] => buffer (vec) {:?}", buffer);
347 let pheader = parse_u32_be(&buffer, &mut pos);
348 let theader = parse_u32_be(&buffer, &mut pos);
349 trace!("[IPBus::decode_payload] => pheader {pheader}");
350 trace!("[IPBus::decode_payload] => theader {theader}");
351 let pid = ((0x00ffff00 & pheader) >> 8) as u16;
352 let size = ((0x0000ff00 & theader) >> 8) as u16;
353 let ptype = ((0x000000f0 & theader) >> 4) as u8;
354 let packet_type = IPBusPacketType::from(ptype);
355 trace!("[IPBus::decode_payload] => PID, SIZE, PTYPE : {} {} {}", pid, size, packet_type);
356 if pid != self.expected_pid {
357 if !is_status {
358 error!("Invalid packet ID. Expected {}, received {}", self.expected_pid, pid);
359 return Err(IPBusError::InvalidPacketID);
366 }
367 }
368 match packet_type {
369 IPBusPacketType::Unknown => {
370 return Err(IPBusError::DecodingFailed);
371 }
372 IPBusPacketType::Read |
373 IPBusPacketType::ReadNonIncrement => {
374 if (((size as usize) * 4) + 11) < MT_MAX_PACKSIZE {
375 for i in 0..size as usize {
376 data.push( ((self.buffer[8 + i * 4] as u32) << 24)
377 | ((self.buffer[9 + i * 4] as u32) << 16)
378 | ((self.buffer[10 + i * 4] as u32) << 8)
379 | self.buffer[11 + i * 4] as u32)
380 }
381 } else {
382 error!("Size {} larger than bufffer len {}", size, data.len());
383 }
384 },
385 IPBusPacketType::Write => {
386 data.push(0);
387 },
388 IPBusPacketType::WriteNonIncrement => {
389 error!("Decoding of WriteNonIncrement packet not supported!");
390 },
391 IPBusPacketType::RMW => {
392 error!("Decoding of RMW packet not supported!!");
393 }
394 }
395 if verbose {
396 println!("[IPBus::decode_payload] ==> Decoding IPBus Packet:");
397 println!(" >> Msg : {:?}", self.buffer);
398 println!(" >> data : {:?}", data);
405 }
406 Ok(data)
407 }
408
409 pub fn realign_packet_id(&mut self)
411 -> Result<(), Box<dyn Error>> {
412 trace!("[IPBus::realign_packet_id] - aligning...");
413 let pid = self.get_target_next_expected_packet_id()?;
414 self.pid = pid;
415 self.expected_pid = pid;
416 trace!("[IPBus::realign_packet_id] - aligned {}", self.pid);
427 Ok(())
428 }
429
430 pub fn buffer_is_status(&self) -> bool {
431 self.buffer[3] & 0x1 == 1
432 }
433
434 pub fn get_target_next_expected_packet_id(&mut self)
436 -> Result<u16, Box<dyn Error>> {
437 self.get_status()?;
438 let word = 3usize;
440 trace!("[IPBus::get_status] => WORD {word} : [{},{},{},{}]", self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4+2], self.buffer[word*4+3]);
441 let word3 = [self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4 + 2], self.buffer[word*4 + 3]];
442 let target_exp_pid = u16::from_be_bytes([word3[1], word3[2]]);
443 trace!("[IPBus::target_next_pid] => Get expected packet id {target_exp_pid}");
444 Ok(target_exp_pid)
445 }
446
447
448 pub fn read_multiple(&mut self,
462 addr : u32,
463 nwords : usize,
464 increment_addr : bool)
465 -> Result<Vec<u32>, Box<dyn Error>> {
466 let send_data = vec![0u32;nwords];
467 if increment_addr {
468 self.packet_type = IPBusPacketType::Read;
469 } else {
470 self.packet_type = IPBusPacketType::ReadNonIncrement;
471 }
472 let mut message = self.encode_payload(addr, &send_data);
473 let mut send_again = true;
474 let timeout = Instant::now();
475 loop {
476 if send_again {
477 self.send(&message)?;
478 }
479 match self.receive() {
480 Err(_err) => {
481 self.realign_packet_id()?;
489 message = self.encode_payload(addr, &send_data);
492 send_again = true;
493 if timeout.elapsed().as_millis() == 10 {
494 return Err(Box::new(IPBusError::UdpReceiveFailed));
495 }
496 continue;
497 }
498 Ok(number_of_bytes) => {
499
500 if self.buffer_is_status() {
501 continue;
503 }
504
505 let pid_from_buffer = self.get_pid_from_current_buffer();
506 if pid_from_buffer != self.expected_pid {
507 error!("We got a packet, but the PacketID is not as expected!");
508 error!("-- self.pid {}, self.expected_pid {}, buffer pid {}", self.pid, self.expected_pid, pid_from_buffer);
509 if self.expected_pid > pid_from_buffer {
511 send_again = false;
512 continue;
513 } else {
514 self.realign_packet_id()?;
517 message = self.encode_payload(addr, &send_data);
518 send_again = true;
519 continue;
520 }
521 }
522 trace!("[IPBus::read] => Received {} bytes from master trigger! Message {:?}", number_of_bytes, self.buffer);
523 break;
524 }
525 } } let data = self.decode_payload(false)?;
528 if data.len() == 0 {
529 error!("Received empty data!");
530 return Err(Box::new(IPBusError::DecodingFailed));
531 }
532 Ok(data)
533 }
534
535 pub fn read(&mut self,
541 addr : u32)
542 -> Result<u32, Box<dyn Error>> {
543
544 let data = self.read_multiple(addr,1,false)?;
545 Ok(data[0])
546 }
547
548 pub fn write(&mut self,
555 addr : u32,
556 data : u32)
557 -> Result<(), Box<dyn Error>> {
558 let send_data = Vec::<u32>::from([data]);
560 self.packet_type = IPBusPacketType::Write;
561 let message = self.encode_payload(addr, &send_data);
562 match self.send(&message) {
563 Err(err) => {
564 error!("Sending Udp message failed! {err}");
565 return Err(Box::new(IPBusError::UdpSendFailed));
566 },
567 Ok(_) => ()
568 }
569 match self.receive() {
570 Err(err) => {
573 error!("Unable to receive data! i/o error : {}", err.kind());
575 let target_exp_pid = self.get_target_next_expected_packet_id()?;
576 let buffer_pid = self.get_pid_from_current_buffer();
577 error!("self.pid {}, self.expected.pid {}, target expects pid {:?}, buffer pid {} ", self.pid, self.expected_pid, target_exp_pid, buffer_pid);
578 return Err(Box::new(IPBusError::UdpReceiveFailed));
579 }
581 Ok(_data) => {
582 trace!("[ipbus::write] => Got buffer {:?}", self.buffer);
584 return Ok(());
585 }
586 }
587 }
588}
589
590impl fmt::Display for IPBus {
591 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
592 let mut repr = String::from("<IPBus:");
593 repr += &(format!(" pid : {}>", self.pid));
594 write!(f, "{}", repr)
595 }
596}
597
598
599#[cfg(feature="pybindings")]
600#[pymethods]
601impl IPBus {
602 #[new]
603 fn new_py(target_address : &str) -> Self {
604 IPBus::new(target_address).expect("Unable to connect to {target_address}")
605 }
606
607 #[getter]
609 #[pyo3(name="status")]
610 pub fn get_status_py(&mut self) -> PyResult<()> {
611 match self.get_status() {
612 Ok(_) => {
613 return Ok(());
614 },
615 Err(err) => {
616 return Err(PyValueError::new_err(err.to_string()));
617 }
618 }
619 }
620
621 #[getter]
622 #[pyo3(name="buffer")]
623 pub fn get_buffer(&self) -> [u8;MT_MAX_PACKSIZE] {
624 return self.buffer.clone();
625 }
626
627 #[pyo3(name="set_packet_id")]
628 pub fn set_packet_id_py(&mut self, pid : u16) {
629 self.pid = pid;
630 }
631
632 #[pyo3(name="get_packet_id")]
633 pub fn get_packet_id_py(&self) -> u16 {
634 self.pid
635 }
636
637 #[getter]
638 #[pyo3(name="expected_pid")]
639 pub fn get_expected_packet_id_py(&self) -> u16 {
640 self.expected_pid
641 }
642
643 #[pyo3(name="realign_packet_id")]
645 pub fn realign_packet_id_py(&mut self) -> PyResult<()> {
646 match self.realign_packet_id() {
647 Ok(_) => {
648 return Ok(());
649 },
650 Err(err) => {
651 return Err(PyValueError::new_err(err.to_string()));
652 }
653 }
654 }
655
656 #[getter]
658 #[pyo3(name="target_next_expected_pid")]
659 pub fn get_target_next_expected_packet_id_py(&mut self)
660 -> PyResult<u16> {
661 match self.get_target_next_expected_packet_id() {
662 Ok(result) => {
663 return Ok(result);
664 },
665 Err(err) => {
666 return Err(PyValueError::new_err(err.to_string()));
667 }
668 }
669 }
670
671 #[pyo3(name="read_multiple")]
672 pub fn read_multiple_py(&mut self,
673 addr : u32,
674 nwords : usize,
675 increment_addr : bool)
676 -> PyResult<Vec<u32>> {
677
678 match self.read_multiple(addr,
679 nwords,
680 increment_addr) {
681 Ok(result) => {
682 return Ok(result);
683 },
684 Err(err) => {
685 return Err(PyValueError::new_err(err.to_string()));
686 }
687 }
688 }
689
690 #[pyo3(name="write")]
691 pub fn write_py(&mut self,
692 addr : u32,
693 data : u32)
694 -> PyResult<()> {
695
696 match self.write(addr, data) {
697 Ok(_) => Ok(()),
698 Err(err) => {
699 return Err(PyValueError::new_err(err.to_string()));
700 }
701 }
702 }
703
704
705 #[pyo3(name="read")]
706 pub fn read_py(&mut self, addr : u32)
707 -> PyResult<u32> {
708 match self.read(addr) {
709 Ok(result) => {
710 return Ok(result);
711 },
712 Err(err) => {
713 return Err(PyValueError::new_err(err.to_string()));
714 }
715 }
716 }
717}