"""
Enhanced UDS-on-IP client with simplified API.
"""
from typing import Optional, Union
from doipclient import DoIPClient
from udsoncan.client import Client as UDSClient
from udsoncan import services
from .connection import UdsOnIpConnection
from . import exceptions
from . import constants
[docs]
class UdsOnIpClient:
"""
Unified UDS-on-IP client providing simplified access to UDS services.
This client wraps both DoIPClient and UDS Client, providing a single
interface for automotive diagnostics over DoIP with support for dynamic
target address switching.
Args:
ecu_ip: IP address of the DoIP gateway/ECU
ecu_address: Logical address of the target ECU
client_ip: Optional source IP address (auto-detected if None)
client_logical_address: Optional client logical address (required if not provided in kwargs)
activation_type: DoIP activation type (default: 0)
protocol_version: DoIP protocol version (default: 0x03)
auto_reconnect: Automatically reconnect on connection loss
keep_alive: Send keep-alive messages
**kwargs: Additional arguments passed to UDS Client
Example:
>>> client = UdsOnIpClient('192.168.1.10', 0x00E0)
>>> response = client.read_data_by_identifier(0xF190)
>>> print(f"VIN: {response.data.decode()}")
>>> client.close()
"""
def __init__(
self,
ecu_ip: str,
ecu_address: int,
client_ip: Optional[str] = None,
client_logical_address: Optional[int] = None,
activation_type: int = 0,
protocol_version: int = 0x03,
auto_reconnect: bool = False,
keep_alive: bool = False,
**kwargs,
):
# Store configuration
self._auto_reconnect = auto_reconnect
self._keep_alive = keep_alive
try:
# Create DoIP client
self._doip = DoIPClient(
ecu_ip_address=ecu_ip,
ecu_logical_address=ecu_address,
tcp_port=13400,
udp_port=13400,
client_ip_address=client_ip,
client_logical_address=client_logical_address,
activation_type=activation_type,
protocol_version=protocol_version,
)
# Create enhanced connection
self._connection = UdsOnIpConnection(self._doip, ecu_address)
# Create UDS client
self._uds = UDSClient(self._connection, **kwargs)
# Connect
self._connection.open()
except Exception as e:
raise exceptions.ConnectionError(
f"Failed to connect to {ecu_ip}:{ecu_address:#x}"
) from e
@property
def target_address(self) -> int:
"""Get the current target ECU logical address."""
return self._connection.target_address
@target_address.setter
def target_address(self, value: int):
"""
Change the target ECU logical address.
Args:
value: New target logical address
Example:
>>> client.target_address = 0x00E1
>>> response = client.tester_present()
"""
try:
self._connection.target_address = value
except exceptions.AddressSwitchError as e:
raise exceptions.AddressSwitchError(
f"Failed to switch address to {value:#x}: {e}"
) from e
@property
def uds(self) -> UDSClient:
"""Access the underlying UDS client for advanced operations."""
return self._uds
[docs]
def close(self):
"""Close the connection to the ECU."""
try:
self._connection.close()
self._doip.close()
except Exception as e:
raise exceptions.ConnectionError("Error closing connection") from e
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.close()
# Convenience methods wrapping common UDS services
[docs]
def tester_present(self, suppress_response: bool = False):
"""
Send TesterPresent service request.
Args:
suppress_response: If True, suppress positive response
Returns:
Service response
"""
return self._uds.tester_present(suppress_response=suppress_response)
[docs]
def read_data_by_identifier(self, did: Union[int, list]):
"""
Read data by identifier (service UDS_SID_READ_DATA_BY_IDENTIFIER).
Args:
did: Data identifier or list of identifiers
Returns:
Service response with .data attribute
"""
return self._uds.read_data_by_identifier(did)
[docs]
def write_data_by_identifier(self, did: int, data: bytes):
"""
Write data by identifier (service UDS_SID_WRITE_DATA_BY_IDENTIFIER).
Args:
did: Data identifier
data: Data to write
Returns:
Service response
"""
return self._uds.write_data_by_identifier(did, data)
[docs]
def clear_dtc(self, group: int = 0xFFFFFF):
"""
Clear diagnostic trouble codes (service UDS_SID_CLEAR_DIAGNOSTIC_INFORMATION).
Args:
group: DTC group to clear (default: all DTCs)
Returns:
Service response
"""
return self._uds.clear_dtc(group)
[docs]
def ecu_reset(self, reset_type: int = 1):
"""
Request ECU reset (service UDS_SID_ECU_RESET).
Args:
reset_type: Reset type (1=hard reset, 2=key off/on, 3=soft reset)
Returns:
Service response
"""
return self._uds.ecu_reset(reset_type)
[docs]
def change_session(self, session: int):
"""
Change diagnostic session (service UDS_SID_DIAGNOSTIC_SESSION_CONTROL).
Args:
session: Session type (1=default, 2=programming, 3=extended)
Returns:
Service response
"""
return self._uds.change_session(session)
[docs]
def security_access(self, level: int, key: Optional[bytes] = None):
"""
Request security access (service UDS_SID_SECURITY_ACCESS).
Args:
level: Security level
key: Security key (None for seed request)
Returns:
Service response
"""
if key is None:
return self._uds.request_seed(level)
else:
return self._uds.send_key(level, key)
[docs]
def routine_control(self, routine_id: int, control_type: int = 1, data: Optional[bytes] = None):
"""
Execute routine control (service UDS_SID_ROUTINE_CONTROL).
Args:
routine_id: Routine identifier
control_type: Control type (1=start, 2=stop, 3=request results)
data: Optional routine data
Returns:
Service response
"""
return (
self._uds.start_routine(routine_id, data)
if control_type == 1
else (
self._uds.stop_routine(routine_id, data)
if control_type == 2
else self._uds.get_routine_result(routine_id, data)
)
)