"""
Enhanced DoIP connection for UDS communication with dynamic target address support.
"""
from typing import Optional
from udsoncan.connections import BaseConnection
from udsoncan.exceptions import TimeoutException
from doipclient import DoIPClient
from .exceptions import AddressSwitchError
[docs]
class UdsOnIpConnection(BaseConnection):
"""
Enhanced DoIP connection that supports dynamic target address switching.
This connection class wraps a DoIPClient and provides the interface required
by python-udsoncan while adding the ability to change the target ECU address
at runtime without recreating the connection.
Args:
doip_client: DoIPClient instance to use for communication
target_address: Optional target logical address. If not provided, uses
the address from doip_client
Example:
>>> from doipclient import DoIPClient
>>> doip = DoIPClient('192.168.1.10', 0x00E0)
>>> conn = UdsOnIpConnection(doip)
>>> # Switch target address dynamically
>>> conn.target_address = 0x00E1
"""
def __init__(self, doip_client: DoIPClient, target_address: Optional[int] = None):
BaseConnection.__init__(self, name="UdsOnIp")
self._doip = doip_client
self._target_address = target_address or doip_client._ecu_logical_address
self._opened = False
@property
def target_address(self) -> int:
"""Get the current target logical address."""
return self._target_address
@target_address.setter
def target_address(self, value: int):
"""
Set a new target logical address for subsequent communications.
Args:
value: New target logical address (e.g., 0x00E0, 0x00E1)
"""
if not 0x0000 <= value <= 0xFFFF:
raise AddressSwitchError(
f"Invalid logical address: {value:#x}. Must be a 16-bit integer."
)
self._target_address = value
self.logger.info(f"Target address switched to {value:#x}")
[docs]
def open(self):
"""
Open the DoIP connection.
"""
if not self._opened:
# DoIPClient connection is typically already established
# We just mark as opened
self._opened = True
self.logger.info("UdsOnIpConnection opened")
[docs]
def close(self):
"""
Close the DoIP connection.
"""
if self._opened:
self._opened = False
self.logger.info("UdsOnIpConnection closed")
[docs]
def specific_send(self, payload: bytes):
"""
Send a UDS payload to the current target address.
Args:
payload: UDS message payload to send
"""
self.logger.debug(
f"Sending {len(payload)} bytes to {self._target_address:#x}: {payload.hex()}"
)
# Use send_diagnostic_to_address for dynamic addressing
self._doip.send_diagnostic_to_address(self._target_address, bytearray(payload))
[docs]
def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]:
"""
Wait for and receive a UDS response frame.
Args:
timeout: Maximum time to wait for response in seconds
Returns:
Received frame data or None if timeout
"""
try:
# Receive diagnostic message
response = self._doip.receive_diagnostic(timeout=timeout)
if response:
self.logger.debug(f"Received {len(response)} bytes: {response.hex()}")
return bytes(response)
return None
except TimeoutError as e:
# Re-raise TimeoutError as udsoncan's TimeoutException for consistency
raise TimeoutException(f"UDS-on-IP operation timed out: {e}") from e
except Exception as e:
self.logger.error(f"Error receiving frame: {e}")
raise
[docs]
def empty_rxqueue(self):
"""
Empty the reception buffer.
"""
# DoIPClient handles buffering internally
pass
[docs]
def is_open(self) -> bool:
"""
Check if connection is open.
"""
return self._opened