use std::fmt;
use std::thread;
use std::io;
use std::net::{
UdpSocket,
SocketAddr
};
use std::error::Error;
use std::time::{
Duration,
Instant
};
use crate::errors::IPBusError;
use crate::serialization::{
parse_u32_be
};
pub const MT_MAX_PACKSIZE : usize = 128;
pub const UDP_SOCKET_SLEEP_USEC : u64 = 100;
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum IPBusPacketType {
Read = 0,
Write = 1,
ReadNonIncrement = 2,
WriteNonIncrement = 3,
RMW = 4,
Unknown = 99
}
impl IPBusPacketType {
pub fn to_u8(&self) -> u8 {
let ret_val : u8;
match self {
IPBusPacketType::Read => {
ret_val = 0;
}
IPBusPacketType::Write => {
ret_val = 1;
}
IPBusPacketType::ReadNonIncrement => {
ret_val = 2;
}
IPBusPacketType::WriteNonIncrement => {
ret_val = 3;
}
IPBusPacketType::RMW => {
ret_val = 4;
}
IPBusPacketType::Unknown => {
ret_val = 99;
}
}
ret_val
}
pub fn from_u8(ptype : u8) -> Self {
let ptype_val : Self;
match ptype {
0 => {ptype_val = IPBusPacketType::Read;}
1 => {ptype_val = IPBusPacketType::Write;}
2 => {ptype_val = IPBusPacketType::ReadNonIncrement;}
3 => {ptype_val = IPBusPacketType::WriteNonIncrement;}
4 => {ptype_val = IPBusPacketType::RMW;}
_ => {ptype_val = IPBusPacketType::Unknown;}
}
return ptype_val;
}
}
impl fmt::Display for IPBusPacketType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let repr : String;
match self {
IPBusPacketType::Read => {repr = String::from("Read");}
IPBusPacketType::Write => {repr = String::from("Write");}
IPBusPacketType::ReadNonIncrement => {repr = String::from("ReadNonIncrement");}
IPBusPacketType::WriteNonIncrement => {repr = String::from("WriteNonIncrement");}
IPBusPacketType::RMW => {repr = String::from("RMW");}
IPBusPacketType::Unknown => {repr = String::from("Unknown");}
}
write!(f, "<IPBusPacketType: {}>", repr)
}
}
#[derive(Debug, Clone)]
pub struct IPBusPacket {
pub pid : u16,
pub ptype : IPBusPacketType,
pub data : [u8;MT_MAX_PACKSIZE]
}
#[derive(Debug)]
pub struct IPBus {
pub socket : UdpSocket,
pub packet_type : IPBusPacketType,
pub pid : u16,
pub expected_pid : u16,
pub last_pid : u16,
pub buffer : [u8;MT_MAX_PACKSIZE]
}
impl IPBus {
pub fn new(target_address : &str)
-> io::Result<Self> {
let socket = Self::connect(target_address)?;
let mut bus = Self {
socket : socket,
packet_type : IPBusPacketType::Read,
pid : 0,
expected_pid : 0,
last_pid : 0,
buffer : [0;MT_MAX_PACKSIZE]
};
match bus.realign_packet_id() {
Err(err) => {
error!("Packet ID realign failed! {}", err);
return Err(std::io::Error::new(std::io::ErrorKind::Other, "Can not realign packet id"));
},
Ok(_) => ()
}
Ok(bus)
}
pub fn connect(target_address : &str)
->io::Result<UdpSocket> {
let local_addrs = [
SocketAddr::from(([0, 0, 0, 0], 50100)),
SocketAddr::from(([0, 0, 0, 0], 50101)),
SocketAddr::from(([0, 0, 0, 0], 50102)),
SocketAddr::from(([0, 0, 0, 0], 50103)),
SocketAddr::from(([0, 0, 0, 0], 50104)),
];
let local_socket = UdpSocket::bind(&local_addrs[..]);
let socket : UdpSocket;
match local_socket {
Err(err) => {
error!("Can not create local UDP socket for master trigger connection!, err {}", err);
return Err(err);
}
Ok(value) => {
info!("Successfully bound UDP socket for master trigger communcations to {:?}", value);
socket = value;
match socket.set_read_timeout(Some(Duration::from_millis(1))) {
Err(err) => error!("Can not set read timeout for Udp socket! {err}"),
Ok(_) => ()
}
match socket.set_write_timeout(Some(Duration::from_millis(1))) {
Err(err) => error!("Can not set write timeout for Udp socket! {err}"),
Ok(_) => ()
}
match socket.connect(target_address) {
Err(err) => {
error!("Can not connect to IPBus socket to target address {}! {}", target_address, err);
return Err(err);
}
Ok(_) => info!("Successfully connected IPBus to target address {}!", target_address)
}
match socket.set_nonblocking(false) {
Err(err) => {
error!("Can not set socket to blocking mode! {err}");
},
Ok(_) => ()
}
return Ok(socket);
}
} }
fn get_next_pid(&mut self) -> u16 {
let pid = self.pid;
self.expected_pid = self.pid;
self.pid += 1;
if self.pid > u16::MAX {
self.pid = 0;
return 0;
}
return pid;
}
pub fn receive(&mut self) -> io::Result<usize> {
let number_of_bytes = self.socket.recv(&mut self.buffer)?;
Ok(number_of_bytes)
}
pub fn send(&mut self, data : &Vec<u8>) -> io::Result<()> {
self.socket.send(data.as_slice())?;
thread::sleep(Duration::from_micros(UDP_SOCKET_SLEEP_USEC));
Ok(())
}
pub fn get_status(&mut self)
-> Result<(), Box<dyn Error>> {
let mut udp_data = Vec::<u8>::new();
let mut phead = self.create_packetheader(true);
phead = phead & 0xfffffff0;
phead = phead | 0x00000001;
udp_data.extend_from_slice(&phead.to_be_bytes());
for _ in 0..15 {
udp_data.push(0);
udp_data.push(0);
udp_data.push(0);
udp_data.push(0);
}
let mut send_again = true;
let mut number_of_bytes : usize;
loop {
if send_again {
match self.send(&udp_data) {
Err(err) => error!("Unable to send udp data! {err}"),
Ok(_) => ()
}
}
trace!("[IPBus::get_status => message {:?} sent!", udp_data);
match self.receive() {
Err(err) => {
error!("Can not receive status packet from Udp Socket! {err}");
return Err(Box::new(IPBusError::UdpReceiveFailed));
},
Ok(_number_of_bytes) => {
number_of_bytes = _number_of_bytes;
}
}
let status_byte = self.buffer[3];
if status_byte & 0x1 != 1 {
send_again = false;
continue;
} else {
break;
}
}
trace!("[IPBus::get_status] => {} bytes received!", number_of_bytes);
for word in 0..16 {
trace!("[IPBus::get_status] => WORD {word} : [{},{},{},{}]", self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4+2], self.buffer[word*4+3]);
}
Ok(())
}
fn create_packetheader(&mut self, status : bool) -> u32 {
let pid : u16;
if status {
pid = 0;
} else {
pid = self.get_next_pid();
}
let pid_bytes = pid.to_be_bytes();
let pid_be0 = (pid_bytes[0] as u32) << 16;
let pid_be1 = (pid_bytes[1] as u32) << 8;
let header = (0x2 << 28) as u32
| (0x0 << 24) as u32
| pid_be0
| pid_be1
| (0xf << 4) as u32
| 0x0 as u32; trace!("[IPBus::create_packetheader] => Will use packet ID {pid}");
trace!("[IPBus::create_packetheader] => Generated header {:?}", header.to_be_bytes());
header
}
fn create_transactionheader(&self, nwords : u8) -> u32 {
let header = (0x2 << 28) as u32
| (0x0 << 24) as u32
| (0x0 << 20) as u32
| (0x0 << 16) as u32
| (nwords as u32) << 8
| ((self.packet_type.to_u8() & 0xf) << 4) as u32
| 0xf as u32; header
}
fn encode_payload(&mut self,
addr : u32,
data : &Vec<u32>) -> Vec<u8> {
let mut udp_data = Vec::<u8>::new();
let pheader = self.create_packetheader(false);
let nwords = data.len() as u8;
trace!("[IPBus::encode_payload] => Encoding payload for packet type {}!", self.packet_type);
let theader = self.create_transactionheader(nwords);
udp_data.extend_from_slice(&pheader.to_be_bytes());
udp_data.extend_from_slice(&theader.to_be_bytes());
udp_data.extend_from_slice(&addr.to_be_bytes());
if self.packet_type == IPBusPacketType::Write
|| self.packet_type == IPBusPacketType::WriteNonIncrement {
for i in data {
udp_data.extend_from_slice(&i.to_be_bytes());
}
}
trace!("[IPBus::encode_payload] => payload {:?}", udp_data);
udp_data
}
pub fn get_pid_from_current_buffer(&self) -> u16 {
let buffer = self.buffer.to_vec();
let pheader = parse_u32_be(&buffer, &mut 0);
let pid = ((0x00ffff00 & pheader) >> 8) as u16;
pid
}
fn decode_payload(&mut self,
verbose : bool)
-> Result<Vec<u32>, IPBusError> {
let mut pos : usize = 0;
let mut data = Vec::<u32>::new();
let buffer = self.buffer.to_vec();
let is_status = buffer[3] & 0x1 == 1;
trace!("[IPBus::decode_payload] => buffer (vec) {:?}", buffer);
let pheader = parse_u32_be(&buffer, &mut pos);
let theader = parse_u32_be(&buffer, &mut pos);
trace!("[IPBus::decode_payload] => pheader {pheader}");
trace!("[IPBus::decode_payload] => theader {theader}");
let pid = ((0x00ffff00 & pheader) >> 8) as u16;
let size = ((0x0000ff00 & theader) >> 8) as u16;
let ptype = ((0x000000f0 & theader) >> 4) as u8;
let packet_type = IPBusPacketType::from_u8(ptype);
trace!("[IPBus::decode_payload] => PID, SIZE, PTYPE : {} {} {}", pid, size, packet_type);
if pid != self.expected_pid {
if !is_status {
error!("Invalid packet ID. Expected {}, received {}", self.expected_pid, pid);
return Err(IPBusError::InvalidPacketID);
}
}
match packet_type {
IPBusPacketType::Unknown => {
return Err(IPBusError::DecodingFailed);
}
IPBusPacketType::Read |
IPBusPacketType::ReadNonIncrement => {
if (((size as usize) * 4) + 11) < MT_MAX_PACKSIZE {
for i in 0..size as usize {
data.push( ((self.buffer[8 + i * 4] as u32) << 24)
| ((self.buffer[9 + i * 4] as u32) << 16)
| ((self.buffer[10 + i * 4] as u32) << 8)
| self.buffer[11 + i * 4] as u32)
}
} else {
error!("Size {} larger than bufffer len {}", size, data.len());
}
},
IPBusPacketType::Write => {
data.push(0);
},
IPBusPacketType::WriteNonIncrement => {
error!("Decoding of WriteNonIncrement packet not supported!");
},
IPBusPacketType::RMW => {
error!("Decoding of RMW packet not supported!!");
}
}
if verbose {
println!("[IPBus::decode_payload] ==> Decoding IPBus Packet:");
println!(" >> Msg : {:?}", self.buffer);
println!(" >> data : {:?}", data);
}
Ok(data)
}
pub fn realign_packet_id(&mut self)
-> Result<(), Box<dyn Error>> {
trace!("[IPBus::realign_packet_id] - aligning...");
let pid = self.get_target_next_expected_packet_id()?;
self.pid = pid;
self.expected_pid = pid;
trace!("[IPBus::realign_packet_id] - aligned {}", self.pid);
Ok(())
}
pub fn buffer_is_status(&self) -> bool {
self.buffer[3] & 0x1 == 1
}
pub fn get_target_next_expected_packet_id(&mut self)
-> Result<u16, Box<dyn Error>> {
self.get_status()?;
let word = 3usize;
trace!("[IPBus::get_status] => WORD {word} : [{},{},{},{}]", self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4+2], self.buffer[word*4+3]);
let word3 = [self.buffer[word*4], self.buffer[word*4 + 1], self.buffer[word*4 + 2], self.buffer[word*4 + 3]];
let target_exp_pid = u16::from_be_bytes([word3[1], word3[2]]);
trace!("[IPBus::target_next_pid] => Get expected packet id {target_exp_pid}");
Ok(target_exp_pid)
}
pub fn read_multiple(&mut self,
addr : u32,
nwords : usize,
increment_addr : bool)
-> Result<Vec<u32>, Box<dyn Error>> {
let send_data = vec![0u32;nwords];
if increment_addr {
self.packet_type = IPBusPacketType::Read;
} else {
self.packet_type = IPBusPacketType::ReadNonIncrement;
}
let mut message = self.encode_payload(addr, &send_data);
let mut send_again = true;
let timeout = Instant::now();
loop {
if send_again {
self.send(&message)?;
}
match self.receive() {
Err(_err) => {
self.realign_packet_id()?;
message = self.encode_payload(addr, &send_data);
send_again = true;
if timeout.elapsed().as_millis() == 10 {
return Err(Box::new(IPBusError::UdpReceiveFailed));
}
continue;
}
Ok(number_of_bytes) => {
if self.buffer_is_status() {
continue;
}
let pid_from_buffer = self.get_pid_from_current_buffer();
if pid_from_buffer != self.expected_pid {
error!("We got a packet, but the PacketID is not as expected!");
error!("-- self.pid {}, self.expected_pid {}, buffer pid {}", self.pid, self.expected_pid, pid_from_buffer);
if self.expected_pid > pid_from_buffer {
send_again = false;
continue;
} else {
self.realign_packet_id()?;
message = self.encode_payload(addr, &send_data);
send_again = true;
continue;
}
}
trace!("[IPBus::read] => Received {} bytes from master trigger! Message {:?}", number_of_bytes, self.buffer);
break;
}
} } let data = self.decode_payload(false)?;
if data.len() == 0 {
error!("Received empty data!");
return Err(Box::new(IPBusError::DecodingFailed));
}
Ok(data)
}
pub fn read(&mut self,
addr : u32)
-> Result<u32, Box<dyn Error>> {
let data = self.read_multiple(addr,1,false)?;
Ok(data[0])
}
pub fn write(&mut self,
addr : u32,
data : u32)
-> Result<(), Box<dyn Error>> {
let send_data = Vec::<u32>::from([data]);
self.packet_type = IPBusPacketType::Write;
let message = self.encode_payload(addr, &send_data);
match self.send(&message) {
Err(err) => {
error!("Sending Udp message failed! {err}");
return Err(Box::new(IPBusError::UdpSendFailed));
},
Ok(_) => ()
}
match self.receive() {
Err(err) => {
error!("Unable to receive data! i/o error : {}", err.kind());
let target_exp_pid = self.get_target_next_expected_packet_id()?;
let buffer_pid = self.get_pid_from_current_buffer();
error!("self.pid {}, self.expected.pid {}, target expects pid {:?}, buffer pid {} ", self.pid, self.expected_pid, target_exp_pid, buffer_pid);
return Err(Box::new(IPBusError::UdpReceiveFailed));
}
Ok(_data) => {
trace!("[ipbus::write] => Got buffer {:?}", self.buffer);
return Ok(());
}
}
}
}
impl fmt::Display for IPBus {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut repr = String::from("<IPBus:");
repr += &(format!(" pid : {}>", self.pid));
write!(f, "{}", repr)
}
}