Source code for udsonip.discovery

"""
ECU discovery utilities for DoIP networks.
"""

from typing import List, Optional
from dataclasses import dataclass
import time
import warnings
import socket

from doipclient.client import DoIPClient, Parser
from doipclient.messages import (
    VehicleIdentificationRequest,
    VehicleIdentificationResponse,
    payload_message_to_type,
)
from doipclient.constants import UDP_DISCOVERY

from . import exceptions
from .client import UdsOnIpClient


[docs] @dataclass class ECUInfo: """Information about a discovered ECU.""" ip: str """IP address of the ECU/gateway""" logical_address: int """ECU logical address""" vin: Optional[str] = None """Vehicle Identification Number""" eid: Optional[bytes] = None """Entity Identification (VIN or similar)""" gid: Optional[bytes] = None """Group Identification""" further_action_required: Optional[int] = None """Further action required byte""" vin_gid_sync_status: Optional[int] = None """VIN/GID Synchronization status""" def __str__(self): return f"ECU({self.ip} @ 0x{self.logical_address:04X})" def __repr__(self): return f"ECUInfo(ip='{self.ip}', logical_address=0x{self.logical_address:04X})"
[docs] def connect(self, client_ip: Optional[str] = None, **kwargs): """ Create a UdsOnIpClient connected to this ECU. Args: client_ip: Optional client IP address **kwargs: Additional arguments for UdsOnIpClient Returns: UdsOnIpClient instance """ return UdsOnIpClient( ecu_ip=self.ip, ecu_address=self.logical_address, client_ip=client_ip, protocol_version=0x03, # Default to 0x03 as per user request **kwargs, )
[docs] def discover_ecus( interface: Optional[str] = None, timeout: float = 3.0, protocol_version: int = 0x03 ) -> List[ECUInfo]: """ Discover ECUs on the DoIP network by broadcasting a Vehicle Identification Request and listening for unicast responses. This function implements a more robust discovery method: 1. Creates a single UDP socket bound to an ephemeral port. 2. Broadcasts a Vehicle Identification Request. 3. Listens on the same socket for direct unicast responses from all ECUs for the entire timeout period. This approach correctly captures all direct responses and is more efficient than the previous implementation. Args: interface: Network interface to use for IPv6 (not typically required for IPv4). timeout: Discovery timeout in seconds. protocol_version: DoIP protocol version (default: 0x03). Returns: List of discovered ECU information (duplicates filtered by IP and logical address). Raises: DiscoveryError: If discovery fails due to network or protocol errors. """ discovered_ecus = [] seen_ecus = set() # To store (ip, logical_address) to avoid duplicates sock = None try: # The _create_udp_socket in doipclient handles some of the complexities # of binding to a specific interface for IPv6. is_ipv6 = False # Assuming IPv4 for broadcast for now. sock = DoIPClient._create_udp_socket( ipv6=is_ipv6, udp_port=0, # Bind to an ephemeral port for sending and receiving unicast replies timeout=timeout, source_interface=interface, ) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Manually construct and send the Vehicle Identification Request message = VehicleIdentificationRequest() payload_data = message.pack() payload_type = payload_message_to_type[type(message)] data_bytes = DoIPClient._pack_doip(protocol_version, payload_type, payload_data) # Send broadcast request sock.sendto(data_bytes, ("255.255.255.255", UDP_DISCOVERY)) # Listen for responses on the same socket start_time = time.time() parser = Parser() while time.time() - start_time < timeout: try: remaining_timeout = timeout - (time.time() - start_time) if remaining_timeout <= 0: break sock.settimeout(remaining_timeout) data, addr = sock.recvfrom(1024) # Buffer size # The parser in doipclient is designed to read from a stream, # but for UDP each datagram is a full message. parser.reset() announcement = parser.read_message(data) if announcement and isinstance(announcement, VehicleIdentificationResponse): ip, _ = addr logical_address = announcement.logical_address if (ip, logical_address) not in seen_ecus: ecu_info = ECUInfo( ip=ip, logical_address=logical_address, vin=announcement.vin if announcement.vin else None, eid=announcement.eid, gid=announcement.gid, further_action_required=announcement.further_action_required.value, vin_gid_sync_status=( announcement.vin_sync_status.value if announcement.vin_sync_status is not None else None ), ) discovered_ecus.append(ecu_info) seen_ecus.add((ip, logical_address)) except socket.timeout: # This is expected when no more responses are coming in break except Exception as e: warnings.warn(f"Error during ECU discovery: {e}", RuntimeWarning) continue return discovered_ecus except Exception as e: raise exceptions.DiscoveryError(f"ECU discovery failed: {e}") finally: if sock: sock.close()
[docs] def get_entity(ip: str, protocol_version: int = 0x03) -> Optional[ECUInfo]: """ Get entity information from a specific DoIP gateway/ECU. Args: ip: IP address of the DoIP entity protocol_version: DoIP protocol version (default: 0x03) Returns: ECUInfo if successful, None otherwise Example: >>> ecu = get_entity('192.168.1.10') >>> if ecu: ... print(f"ECU at {ecu.ip}: {ecu.logical_address:#x}") """ try: address, announcement = DoIPClient.get_entity( ecu_ip_address=ip, protocol_version=protocol_version ) ip_address, _ = address return ECUInfo( ip=ip_address, logical_address=announcement.logical_address, vin=announcement.vin if announcement.vin else None, eid=announcement.eid, gid=announcement.gid, further_action_required=announcement.further_action_required.value, vin_gid_sync_status=( announcement.vin_sync_status.value if announcement.vin_sync_status is not None else None ), ) except TimeoutError: return None except Exception as e: raise exceptions.DiscoveryError(f"Failed to get entity info from {ip}: {e}")