1use std::fmt;
10use std::thread;
11use std::io;
12use std::net::{
13 UdpSocket,
14 SocketAddr
15};
16
17use std::error::Error;
18use std::time::{
19 Duration,
20 Instant
21};
22
23use crate::errors::IPBusError;
24use crate::serialization::{
25 parse_u32_be
27};
28
29pub const MT_MAX_PACKSIZE : usize = 128;
33
34pub const UDP_SOCKET_SLEEP_USEC : u64 = 100;
37
38#[derive(Debug, Copy, Clone, PartialEq)]
48pub enum IPBusPacketType {
49 Read = 0,
50 Write = 1,
51 ReadNonIncrement = 2,
55 WriteNonIncrement = 3,
56 RMW = 4,
57 Unknown = 99
60}
61
62impl IPBusPacketType {
63
64 pub fn to_u8(&self) -> u8 {
65 let ret_val : u8;
66 match self {
67 IPBusPacketType::Read => {
68 ret_val = 0;
69 }
70 IPBusPacketType::Write => {
71 ret_val = 1;
72 }
73 IPBusPacketType::ReadNonIncrement => {
74 ret_val = 2;
75 }
76 IPBusPacketType::WriteNonIncrement => {
77 ret_val = 3;
78 }
79 IPBusPacketType::RMW => {
80 ret_val = 4;
81 }
82 IPBusPacketType::Unknown => {
83 ret_val = 99;
84 }
85 }
86 ret_val
87 }
88
89 pub fn from_u8(ptype : u8) -> Self {
90 let ptype_val : Self;
91 match ptype {
92 0 => {ptype_val = IPBusPacketType::Read;}
93 1 => {ptype_val = IPBusPacketType::Write;}
94 2 => {ptype_val = IPBusPacketType::ReadNonIncrement;}
95 3 => {ptype_val = IPBusPacketType::WriteNonIncrement;}
96 4 => {ptype_val = IPBusPacketType::RMW;}
97 _ => {ptype_val = IPBusPacketType::Unknown;}
98 }
99 return ptype_val;
100 }
101}
102
103impl fmt::Display for IPBusPacketType {
104 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
105 let repr : String;
106 match self {
107 IPBusPacketType::Read => {repr = String::from("Read");}
108 IPBusPacketType::Write => {repr = String::from("Write");}
109 IPBusPacketType::ReadNonIncrement => {repr = String::from("ReadNonIncrement");}
110 IPBusPacketType::WriteNonIncrement => {repr = String::from("WriteNonIncrement");}
111 IPBusPacketType::RMW => {repr = String::from("RMW");}
112 IPBusPacketType::Unknown => {repr = String::from("Unknown");}
113 }
114 write!(f, "<IPBusPacketType: {}>", repr)
115 }
116}
117
118#[derive(Debug, Clone)]
119pub struct IPBusPacket {
120 pub pid : u16,
121 pub ptype : IPBusPacketType,
122 pub data : [u8;MT_MAX_PACKSIZE]
123}
124
125#[derive(Debug)]
127pub struct IPBus {
128 pub socket : UdpSocket,
129 pub packet_type : IPBusPacketType,
131 pub pid : u16,
134 pub expected_pid : u16,
135 pub last_pid : u16,
136 pub buffer : [u8;MT_MAX_PACKSIZE]
137}
138
139impl IPBus {
140
141 pub fn new(target_address : &str)
142 -> io::Result<Self> {
143 let socket = Self::connect(target_address)?;
144 let mut bus = Self {
145 socket : socket,
146 packet_type : IPBusPacketType::Read,
148 pid : 0,
149 expected_pid : 0,
150 last_pid : 0,
151 buffer : [0;MT_MAX_PACKSIZE]
152 };
153 match bus.realign_packet_id() {
154 Err(err) => {
155 error!("Packet ID realign failed! {}", err);
156 return Err(std::io::Error::new(std::io::ErrorKind::Other, "Can not realign packet id"));
157 },
158 Ok(_) => ()
159 }
160 Ok(bus)
161 }
162
163 pub fn connect(target_address : &str)
174 ->io::Result<UdpSocket> {
175 let local_addrs = [
177 SocketAddr::from(([0, 0, 0, 0], 50100)),
178 SocketAddr::from(([0, 0, 0, 0], 50101)),
179 SocketAddr::from(([0, 0, 0, 0], 50102)),
180 SocketAddr::from(([0, 0, 0, 0], 50103)),
181 SocketAddr::from(([0, 0, 0, 0], 50104)),
182 ];
183 let local_socket = UdpSocket::bind(&local_addrs[..]);
184 let socket : UdpSocket;
185 match local_socket {
186 Err(err) => {
187 error!("Can not create local UDP socket for master trigger connection!, err {}", err);
188 return Err(err);
189 }
190 Ok(value) => {
191 info!("Successfully bound UDP socket for master trigger communcations to {:?}", value);
192 socket = value;
193 match socket.set_read_timeout(Some(Duration::from_millis(1))) {
196 Err(err) => error!("Can not set read timeout for Udp socket! {err}"),
197 Ok(_) => ()
198 }
199 match socket.set_write_timeout(Some(Duration::from_millis(1))) {
200 Err(err) => error!("Can not set write timeout for Udp socket! {err}"),
201 Ok(_) => ()
202 }
203 match socket.connect(target_address) {
204 Err(err) => {
205 error!("Can not connect to IPBus socket to target address {}! {}", target_address, err);
206 return Err(err);
207 }
208 Ok(_) => info!("Successfully connected IPBus to target address {}!", target_address)
209 }
210 match socket.set_nonblocking(false) {
211 Err(err) => {
212 error!("Can not set socket to blocking mode! {err}");
213 },
214 Ok(_) => ()
215 }
216 return Ok(socket);
217 }
218 } }
220
221 fn get_next_pid(&mut self) -> u16 {
233 let pid = self.pid;
234 self.expected_pid = self.pid;
235 self.pid += 1;
237 if self.pid > u16::MAX {
239 self.pid = 0;
240 return 0;
241 }
242 return pid;
243 }
244
245 pub fn receive(&mut self) -> io::Result<usize> {
248 let number_of_bytes = self.socket.recv(&mut self.buffer)?;
249 Ok(number_of_bytes)
251 }
252
253
254 pub fn send(&mut self, data : &Vec<u8>) -> io::Result<()> {
257 self.socket.send(data.as_slice())?;
258 thread::sleep(Duration::from_micros(UDP_SOCKET_SLEEP_USEC));
259 Ok(())
260 }
261
262
263
264 pub fn get_status(&mut self)
269 -> Result<(), Box<dyn Error>> {
270 let mut udp_data = Vec::<u8>::new();
271 let mut phead = self.create_packetheader(true);
272 phead = phead & 0xfffffff0;
273 phead = phead | 0x00000001;
274 udp_data.extend_from_slice(&phead.to_be_bytes());
275 for _ in 0..15 {
276 udp_data.push(0);
277 udp_data.push(0);
278 udp_data.push(0);
279 udp_data.push(0);
280 }
281 let mut send_again = true;
282 let mut number_of_bytes : usize;
283 loop {
284 if send_again {
285 match self.send(&udp_data) {
286 Err(err) => error!("Unable to send udp data! {err}"),
287 Ok(_) => ()
288 }
289 }
290 trace!("[IPBus::get_status => message {:?} sent!", udp_data);
291 match self.receive() {
292 Err(err) => {
293 error!("Can not receive status packet from Udp Socket! {err}");
294 return Err(Box::new(IPBusError::UdpReceiveFailed));
295 },
296 Ok(_number_of_bytes) => {
297 number_of_bytes = _number_of_bytes;
298 }
299 }
300 let status_byte = self.buffer[3];
302 if status_byte & 0x1 != 1 {
303 send_again = false;
306 continue;
307 } else {
308 break;
309 }
310 }
311 trace!("[IPBus::get_status] => {} bytes received!", number_of_bytes);
312 for word in 0..16 {
314 trace!("[IPBus::get_status] => WORD {word} : [{},{},{},{}]", self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4+2], self.buffer[word*4+3]);
315 }
316 Ok(())
317 }
318
319
320 fn create_packetheader(&mut self, status : bool) -> u32 {
325 let pid : u16;
327 if status {
328 pid = 0;
329 } else {
330 pid = self.get_next_pid();
331 }
332 let pid_bytes = pid.to_be_bytes();
333 let pid_be0 = (pid_bytes[0] as u32) << 16;
334 let pid_be1 = (pid_bytes[1] as u32) << 8;
335 let header = (0x2 << 28) as u32
336 | (0x0 << 24) as u32
337 | pid_be0
338 | pid_be1
339 | (0xf << 4) as u32
340 | 0x0 as u32; trace!("[IPBus::create_packetheader] => Will use packet ID {pid}");
343 trace!("[IPBus::create_packetheader] => Generated header {:?}", header.to_be_bytes());
344 header
345 }
346
347 fn create_transactionheader(&self, nwords : u8) -> u32 {
348 let header = (0x2 << 28) as u32
349 | (0x0 << 24) as u32
350 | (0x0 << 20) as u32
351 | (0x0 << 16) as u32
352 | (nwords as u32) << 8
353 | ((self.packet_type.to_u8() & 0xf) << 4) as u32
354 | 0xf as u32; header
356 }
357
358 fn encode_payload(&mut self,
370 addr : u32,
371 data : &Vec<u32>) -> Vec<u8> {
372 let mut udp_data = Vec::<u8>::new();
373 let pheader = self.create_packetheader(false);
374 let nwords = data.len() as u8;
375 trace!("[IPBus::encode_payload] => Encoding payload for packet type {}!", self.packet_type);
376 let theader = self.create_transactionheader(nwords);
377 udp_data.extend_from_slice(&pheader.to_be_bytes());
378 udp_data.extend_from_slice(&theader.to_be_bytes());
379 udp_data.extend_from_slice(&addr.to_be_bytes());
380 if self.packet_type == IPBusPacketType::Write
381 || self.packet_type == IPBusPacketType::WriteNonIncrement {
382 for i in data {
383 udp_data.extend_from_slice(&i.to_be_bytes());
384 }
385 }
386 trace!("[IPBus::encode_payload] => payload {:?}", udp_data);
387 udp_data
388 }
389
390 pub fn get_pid_from_current_buffer(&self) -> u16 {
391 let buffer = self.buffer.to_vec();
392 let pheader = parse_u32_be(&buffer, &mut 0);
393 let pid = ((0x00ffff00 & pheader) >> 8) as u16;
394 pid
395 }
396
397 fn decode_payload(&mut self,
406 verbose : bool)
407 -> Result<Vec<u32>, IPBusError> {
408 let mut pos : usize = 0;
409 let mut data = Vec::<u32>::new();
410 let buffer = self.buffer.to_vec();
411 let is_status = buffer[3] & 0x1 == 1;
413 trace!("[IPBus::decode_payload] => buffer (vec) {:?}", buffer);
414 let pheader = parse_u32_be(&buffer, &mut pos);
415 let theader = parse_u32_be(&buffer, &mut pos);
416 trace!("[IPBus::decode_payload] => pheader {pheader}");
417 trace!("[IPBus::decode_payload] => theader {theader}");
418 let pid = ((0x00ffff00 & pheader) >> 8) as u16;
419 let size = ((0x0000ff00 & theader) >> 8) as u16;
420 let ptype = ((0x000000f0 & theader) >> 4) as u8;
421 let packet_type = IPBusPacketType::from_u8(ptype);
422 trace!("[IPBus::decode_payload] => PID, SIZE, PTYPE : {} {} {}", pid, size, packet_type);
423 if pid != self.expected_pid {
424 if !is_status {
425 error!("Invalid packet ID. Expected {}, received {}", self.expected_pid, pid);
426 return Err(IPBusError::InvalidPacketID);
433 }
434 }
435 match packet_type {
436 IPBusPacketType::Unknown => {
437 return Err(IPBusError::DecodingFailed);
438 }
439 IPBusPacketType::Read |
440 IPBusPacketType::ReadNonIncrement => {
441 if (((size as usize) * 4) + 11) < MT_MAX_PACKSIZE {
442 for i in 0..size as usize {
443 data.push( ((self.buffer[8 + i * 4] as u32) << 24)
444 | ((self.buffer[9 + i * 4] as u32) << 16)
445 | ((self.buffer[10 + i * 4] as u32) << 8)
446 | self.buffer[11 + i * 4] as u32)
447 }
448 } else {
449 error!("Size {} larger than bufffer len {}", size, data.len());
450 }
451 },
452 IPBusPacketType::Write => {
453 data.push(0);
454 },
455 IPBusPacketType::WriteNonIncrement => {
456 error!("Decoding of WriteNonIncrement packet not supported!");
457 },
458 IPBusPacketType::RMW => {
459 error!("Decoding of RMW packet not supported!!");
460 }
461 }
462 if verbose {
463 println!("[IPBus::decode_payload] ==> Decoding IPBus Packet:");
464 println!(" >> Msg : {:?}", self.buffer);
465 println!(" >> data : {:?}", data);
472 }
473 Ok(data)
474 }
475
476 pub fn realign_packet_id(&mut self)
478 -> Result<(), Box<dyn Error>> {
479 trace!("[IPBus::realign_packet_id] - aligning...");
480 let pid = self.get_target_next_expected_packet_id()?;
481 self.pid = pid;
482 self.expected_pid = pid;
483 trace!("[IPBus::realign_packet_id] - aligned {}", self.pid);
494 Ok(())
495 }
496
497 pub fn buffer_is_status(&self) -> bool {
498 self.buffer[3] & 0x1 == 1
499 }
500
501 pub fn get_target_next_expected_packet_id(&mut self)
503 -> Result<u16, Box<dyn Error>> {
504 self.get_status()?;
505 let word = 3usize;
507 trace!("[IPBus::get_status] => WORD {word} : [{},{},{},{}]", self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4+2], self.buffer[word*4+3]);
508 let word3 = [self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4 + 2], self.buffer[word*4 + 3]];
509 let target_exp_pid = u16::from_be_bytes([word3[1], word3[2]]);
510 trace!("[IPBus::target_next_pid] => Get expected packet id {target_exp_pid}");
511 Ok(target_exp_pid)
512 }
513
514
515 pub fn read_multiple(&mut self,
529 addr : u32,
530 nwords : usize,
531 increment_addr : bool)
532 -> Result<Vec<u32>, Box<dyn Error>> {
533 let send_data = vec![0u32;nwords];
534 if increment_addr {
535 self.packet_type = IPBusPacketType::Read;
536 } else {
537 self.packet_type = IPBusPacketType::ReadNonIncrement;
538 }
539 let mut message = self.encode_payload(addr, &send_data);
540 let mut send_again = true;
541 let timeout = Instant::now();
542 loop {
543 if send_again {
544 self.send(&message)?;
545 }
546 match self.receive() {
547 Err(_err) => {
548 self.realign_packet_id()?;
556 message = self.encode_payload(addr, &send_data);
559 send_again = true;
560 if timeout.elapsed().as_millis() == 10 {
561 return Err(Box::new(IPBusError::UdpReceiveFailed));
562 }
563 continue;
564 }
565 Ok(number_of_bytes) => {
566
567 if self.buffer_is_status() {
568 continue;
570 }
571
572 let pid_from_buffer = self.get_pid_from_current_buffer();
573 if pid_from_buffer != self.expected_pid {
574 error!("We got a packet, but the PacketID is not as expected!");
575 error!("-- self.pid {}, self.expected_pid {}, buffer pid {}", self.pid, self.expected_pid, pid_from_buffer);
576 if self.expected_pid > pid_from_buffer {
578 send_again = false;
579 continue;
580 } else {
581 self.realign_packet_id()?;
584 message = self.encode_payload(addr, &send_data);
585 send_again = true;
586 continue;
587 }
588 }
589 trace!("[IPBus::read] => Received {} bytes from master trigger! Message {:?}", number_of_bytes, self.buffer);
590 break;
591 }
592 } } let data = self.decode_payload(false)?;
595 if data.len() == 0 {
596 error!("Received empty data!");
597 return Err(Box::new(IPBusError::DecodingFailed));
598 }
599 Ok(data)
600 }
601
602 pub fn read(&mut self,
608 addr : u32)
609 -> Result<u32, Box<dyn Error>> {
610
611 let data = self.read_multiple(addr,1,false)?;
612 Ok(data[0])
613 }
614
615 pub fn write(&mut self,
622 addr : u32,
623 data : u32)
624 -> Result<(), Box<dyn Error>> {
625 let send_data = Vec::<u32>::from([data]);
627 self.packet_type = IPBusPacketType::Write;
628 let message = self.encode_payload(addr, &send_data);
629 match self.send(&message) {
630 Err(err) => {
631 error!("Sending Udp message failed! {err}");
632 return Err(Box::new(IPBusError::UdpSendFailed));
633 },
634 Ok(_) => ()
635 }
636 match self.receive() {
637 Err(err) => {
640 error!("Unable to receive data! i/o error : {}", err.kind());
642 let target_exp_pid = self.get_target_next_expected_packet_id()?;
643 let buffer_pid = self.get_pid_from_current_buffer();
644 error!("self.pid {}, self.expected.pid {}, target expects pid {:?}, buffer pid {} ", self.pid, self.expected_pid, target_exp_pid, buffer_pid);
645 return Err(Box::new(IPBusError::UdpReceiveFailed));
646 }
648 Ok(_data) => {
649 trace!("[ipbus::write] => Got buffer {:?}", self.buffer);
651 return Ok(());
652 }
653 }
654 }
655
656}
657
658impl fmt::Display for IPBus {
659 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
660 let mut repr = String::from("<IPBus:");
661 repr += &(format!(" pid : {}>", self.pid));
662 write!(f, "{}", repr)
663 }
664}
665