Source code for udsonip.manager

"""
Multi-ECU manager for handling multiple ECUs over a single DoIP connection.
"""

from typing import Dict, Optional
from contextlib import contextmanager
from doipclient import DoIPClient
from udsoncan.client import Client as UDSClient
from .connection import UdsOnIpConnection
from . import exceptions
from . import constants


[docs] class DoIPManager: """ A client for managing connections to multiple ECUs via a single DoIP gateway. This class provides a high-level interface for managing multiple ECUs that are accessible through a single DoIP gateway. It handles the DoIP connection and allows switching between ECUs using a context manager. Example: >>> from udsonip import DoIPManager >>> manager = DoIPManager('192.168.1.10') >>> manager.add_ecu('engine', 0x00E0) >>> manager.add_ecu('transmission', 0x00E1) >>> >>> with manager.ecu('engine') as ecu: ... vin = ecu.read_data_by_identifier(constants.UDS_DID_VIN) ... print(f"Engine VIN: {vin.data.decode()}") """ def __init__( self, gateway_ip: str, client_ip: Optional[str] = None, client_logical_address: Optional[int] = None, protocol_version: int = 3, **kwargs, ): """ Initializes the DoIPManager. Args: gateway_ip: The IP address of the DoIP gateway. client_ip: Optional. The IP address of the client. If None, the system will attempt to determine it automatically. client_logical_address: Optional. The logical address of the client. Defaults to 0x0E00. protocol_version: Optional. The DoIP protocol version to use. Defaults to 3. **kwargs: Additional keyword arguments to pass to the underlying DoIPClient. """ self._gateway_ip = gateway_ip self._client_ip = client_ip self._client_logical_address = client_logical_address self._protocol_version = protocol_version self._kwargs = kwargs # ECU registry: name -> logical address self._ecus: Dict[str, int] = {} # Cached connections and clients per ECU self._connections: Dict[str, UdsOnIpConnection] = {} self._clients: Dict[str, UDSClient] = {} # Shared DoIP client (created on first use) self._doip: Optional[DoIPClient] = None self._connected = False
[docs] def add_ecu(self, name: str, logical_address: int): """ Register an ECU in the manager. Args: name: Friendly name for the ECU (e.g., 'engine', 'transmission') logical_address: ECU logical address Example: >>> manager.add_ecu('engine', 0x00E0) >>> manager.add_ecu('transmission', 0x00E1) >>> manager.add_ecu('gateway', 0x0001) """ self._ecus[name] = logical_address
[docs] def remove_ecu(self, name: str): """ Remove an ECU from the registry. Args: name: ECU name to remove """ if name in self._ecus: del self._ecus[name] # Clean up cached connection/client if name in self._connections: del self._connections[name] if name in self._clients: del self._clients[name]
[docs] def list_ecus(self) -> Dict[str, int]: """ Get a dictionary of all registered ECUs. Returns: Dictionary mapping ECU names to logical addresses """ return self._ecus.copy()
def _ensure_connected(self): """ Ensure the DoIP connection is established. Per ISO 13400-2:2019, we connect to the first registered ECU for routing activation. If no ECUs are registered, we fall back to 0x0001. """ if not self._connected: # Use first registered ECU address or default gateway address per ISO 13400-2 if self._ecus: gateway_address = next(iter(self._ecus.values())) else: gateway_address = 0x0001 try: self._doip = DoIPClient( ecu_ip_address=self._gateway_ip, ecu_logical_address=gateway_address, tcp_port=13400, udp_port=13400, client_ip_address=self._client_ip, client_logical_address=self._client_logical_address, protocol_version=self._protocol_version, **self._kwargs, ) self._connected = True except Exception as e: raise exceptions.ConnectionError( f"Failed to connect to gateway {self._gateway_ip}: {e}" ) def _get_client(self, name: str) -> UDSClient: """ Get or create a UDS client for the specified ECU. Args: name: ECU name Returns: UDS client instance """ if name not in self._ecus: raise exceptions.ECUNotFoundError(f"ECU '{name}' not found in registry") # Return cached client if available if name in self._clients: return self._clients[name] # Ensure connected self._ensure_connected() # Create connection and client logical_address = self._ecus[name] connection = UdsOnIpConnection(self._doip, logical_address) connection.open() client = UDSClient(connection) # Cache for reuse self._connections[name] = connection self._clients[name] = client return client
[docs] @contextmanager def ecu(self, name: str): """ Context manager for safe ECU communication. Args: name: Name of the ECU to communicate with Yields: UDS client configured for the specified ECU Example: >>> with manager.ecu('engine') as ecu: ... vin = ecu.read_data_by_identifier(constants.UDS_DID_VIN) ... print(f"Engine VIN: {vin.data.decode()}") """ client = self._get_client(name) try: yield client except Exception as e: # Re-raise with ECU context raise type(e)(f"Error communicating with ECU '{name}': {e}") from e
[docs] def switch_to(self, name: str) -> UDSClient: """ Switch to a different ECU (non-context-manager version). Args: name: ECU name to switch to Returns: UDS client for the ECU Note: Using the context manager (ecu()) is preferred for safety. """ return self._get_client(name)
[docs] def close(self): """ Close all connections and clean up resources. """ # Close all cached connections for connection in self._connections.values(): try: connection.close() except Exception: pass # Disconnect DoIP client if self._doip and self._connected: try: self._doip.close() except Exception: pass self._connected = False # Clear caches self._connections.clear() self._clients.clear()
def __enter__(self): """ Context manager entry. """ self._ensure_connected() return self def __exit__(self, exc_type, exc_val, exc_tb): """ Context manager exit. """ self.close() def __del__(self): """ Cleanup on deletion. """ self.close()