Source code for gallia.transports.doip

# SPDX-FileCopyrightText: AISEC Pentesting Team
#
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import asyncio
import socket
import struct
from dataclasses import dataclass
from enum import IntEnum, unique
from typing import Any

from pydantic import BaseModel, field_validator

from gallia.log import get_logger
from gallia.transports.base import BaseTransport, TargetURI
from gallia.utils import auto_int

logger = get_logger("gallia.transport.doip")


@unique
class ProtocolVersions(IntEnum):
    ISO_13400_2_2010 = 0x01
    ISO_13400_2_2012 = 0x02
    ISO_13400_2_2019 = 0x03


@unique
class RoutingActivationRequestTypes(IntEnum):
    RESERVED = 0xFF
    ManufacturerSpecific = 0xFE
    Default = 0x00
    WWH_OBD = 0x01
    CentralSecurity = 0xE0

    @classmethod
    def _missing_(cls, value: Any) -> RoutingActivationRequestTypes:
        if value in range(0xE1, 0x100):
            return cls.ManufacturerSpecific
        return cls.RESERVED


@unique
class RoutingActivationResponseCodes(IntEnum):
    RESERVED = 0xFF
    ManufacturerSpecific = 0xFE
    UnknownSourceAddress = 0x00
    NoResources = 0x01
    InvalidConnectionEntry = 0x02
    AlreadyActive = 0x03
    AuthenticationMissing = 0x04
    ConfirmationRejected = 0x05
    UnsupportedActivationType = 0x06
    TLSRequired = 0x07
    Success = 0x10
    SuccessConfirmationRequired = 0x11

    @classmethod
    def _missing_(cls, value: Any) -> RoutingActivationResponseCodes:
        if value in range(0xE0, 0xFF):
            return cls.ManufacturerSpecific
        return cls.RESERVED


class DoIPRoutingActivationDeniedError(ConnectionAbortedError):
    rac_code: RoutingActivationResponseCodes

    def __init__(self, rac_code: int):
        self.rac_code = RoutingActivationResponseCodes(rac_code)
        super().__init__(f"DoIP routing activation denied: {self.rac_code.name} ({rac_code})")


@unique
class PayloadTypes(IntEnum):
    GenericDoIPHeaderNACK = 0x0000
    VehicleIdentificationRequestMessage = 0x0001
    VehicleIdentificationRequestMessageWithEID = 0x0002
    VehicleIdentificationRequestMessageWithVIN = 0x0003
    VehicleAnnouncementMessage = 0x004
    RoutingActivationRequest = 0x0005
    RoutingActivationResponse = 0x0006
    AliveCheckRequest = 0x0007
    AliveCheckResponse = 0x0008
    DoIPEntityStatusRequest = 0x4001
    DoIPEntityStatusResponse = 0x4002
    DiagnosticPowerModeInformationRequest = 0x4003
    DiagnosticPowerModeInformationResponse = 0x4004
    DiagnosticMessage = 0x8001
    DiagnosticMessagePositiveAcknowledgement = 0x8002
    DiagnosticMessageNegativeAcknowledgement = 0x8003


@unique
class DiagnosticMessagePositiveAckCodes(IntEnum):
    Success = 0x00


@unique
class DiagnosticMessageNegativeAckCodes(IntEnum):
    RESERVED = 0xFF
    InvalidSourceAddress = 0x02
    UnknownTargetAddress = 0x03
    DiagnosticMessageTooLarge = 0x04
    OutOfMemory = 0x05
    TargetUnreachable = 0x06
    UnknownNetwork = 0x07
    TransportProtocolError = 0x08

    @classmethod
    def _missing_(cls, value: Any) -> DiagnosticMessageNegativeAckCodes:
        return cls.RESERVED


class DoIPNegativeAckError(BrokenPipeError):
    nack_code: DiagnosticMessageNegativeAckCodes

    def __init__(self, negative_ack_code: int):
        self.nack_code = DiagnosticMessageNegativeAckCodes(negative_ack_code)
        super().__init__(f"DoIP negative ACK received: {self.nack_code.name} ({negative_ack_code})")


@unique
class GenericDoIPHeaderNACKCodes(IntEnum):
    RESERVED = 0xFF
    IncorrectPatternFormat = 0x00
    UnknownPayloadType = 0x01
    MessageTooLarge = 0x02
    OutOfMemory = 0x03
    InvalidPayloadLength = 0x04

    @classmethod
    def _missing_(cls, value: Any) -> GenericDoIPHeaderNACKCodes:
        return cls.RESERVED


class DoIPGenericHeaderNACKError(ConnectionAbortedError):
    nack_code: GenericDoIPHeaderNACKCodes

    def __init__(self, nack_code: int):
        self.nack_code = GenericDoIPHeaderNACKCodes(nack_code)
        super().__init__(f"DoIP generic header negative ACK: {self.nack_code.name} ({nack_code})")


class TimingAndCommunicationParameters(IntEnum):
    CtrlTimeout = 2000
    AnnounceWait = 500
    AnnounceInterval = 500
    AnnounceNum = 3
    DiagnosticMessageMessageAckTimeout = 2000
    RoutingActivationResponseTimeout = 2000
    DiagnosticMessageMessageTimeout = 2000
    TCPGeneralInactivityTimeout = 5000
    TCPInitialInactivityTimeout = 2000
    TCPAliveCheckTimeout = 500
    ProcessingTimeout = 2000
    VehicleDiscoveryTimeout = 5000


@dataclass
class GenericHeader:
    ProtocolVersion: int
    PayloadType: int
    PayloadLength: int

    def pack(self) -> bytes:
        return struct.pack(
            "!BBHL",
            self.ProtocolVersion,
            self.ProtocolVersion ^ 0xFF,
            self.PayloadType,
            self.PayloadLength,
        )

    @classmethod
    def unpack(cls, data: bytes) -> GenericHeader:
        (
            protocol_version,
            inverse_protocol_version,
            payload_type,
            payload_length,
        ) = struct.unpack("!BBHL", data)
        if protocol_version != inverse_protocol_version ^ 0xFF:
            raise ValueError("inverse protocol_version is invalid")
        return cls(
            protocol_version,
            payload_type,
            payload_length,
        )


@dataclass
class GenericDoIPHeaderNACK:
    GenericHeaderNACKCode: GenericDoIPHeaderNACKCodes

    def pack(self) -> bytes:
        return struct.pack(
            "!B",
            self.GenericHeaderNACKCode,
        )

    @classmethod
    def unpack(cls, data: bytes) -> GenericDoIPHeaderNACK:
        (generic_header_NACK_code,) = struct.unpack("!B", data)
        return cls(
            GenericDoIPHeaderNACKCodes(generic_header_NACK_code),
        )


@dataclass
class VehicleIdentificationRequestMessage:
    def pack(self) -> bytes:
        return b""


@dataclass
class VehicleAnnouncementMessage:
    VIN: bytes
    LogicalAddress: int
    EID: bytes
    GID: bytes
    FurtherActionRequired: FurtherActionCodes
    VINGIDSyncStatus: SynchronisationStatusCodes | None

    @classmethod
    def unpack(cls, data: bytes) -> VehicleAnnouncementMessage:
        if len(data) == 32:
            # VINGIDSyncStatus is optional
            (vin, logical_address, eid, gid, further_action_required) = struct.unpack(
                "!17sH6s6sB", data
            )
            vin_gid_sync_status = None
        else:
            (
                vin,
                logical_address,
                eid,
                gid,
                further_action_required,
                vin_gid_sync_status,
            ) = struct.unpack("!17sH6s6sBB", data)

        return cls(
            vin,
            logical_address,
            eid,
            gid,
            FurtherActionCodes(further_action_required),
            SynchronisationStatusCodes(vin_gid_sync_status)
            if vin_gid_sync_status is not None
            else None,
        )


@unique
class FurtherActionCodes(IntEnum):
    RESERVED = 0x0F
    ManufacturerSpecific = 0xFF
    NoFurtherActionRequired = 0x00
    RoutingActivationRequiredToInitiateCentralSecurity = 0x10

    @classmethod
    def _missing_(cls, value: Any) -> FurtherActionCodes:
        if value in range(0x11, 0x100):
            return cls.ManufacturerSpecific
        return cls.RESERVED


@unique
class SynchronisationStatusCodes(IntEnum):
    RESERVED = 0xFF
    VINGIDSynchronized = 0x00
    IncompleteVINGIDNotSynchronized = 0x10

    @classmethod
    def _missing_(cls, value: Any) -> SynchronisationStatusCodes:
        return cls.RESERVED


@dataclass
class DoIPEntityStatusRequest:
    def pack(self) -> bytes:
        return b""


@dataclass
class DoIPEntityStatusResponse:
    NodeType: NodeTypes
    MaximumConcurrentTCP_DATASockets: int
    CurrentlyOpenTCP_DATASockets: int
    MaximumDataSize: int | None

    @classmethod
    def unpack(cls, data: bytes) -> DoIPEntityStatusResponse:
        if len(data) == 3:
            # MaximumDataSize is optional
            (nt, mcts, ncts) = struct.unpack("!BBB", data)
            mds = None
        else:
            (nt, mcts, ncts, mds) = struct.unpack("!BBBI", data)

        return cls(NodeTypes(nt), mcts, ncts, mds)


@unique
class NodeTypes(IntEnum):
    RESERVED = 0xFF
    Gateway = 0x00
    Node = 0x01

    @classmethod
    def _missing_(cls, value: Any) -> NodeTypes:
        return cls.RESERVED


@dataclass
class RoutingActivationRequest:
    SourceAddress: int
    ActivationType: int
    Reserved: int = 0x00000000  # Not used, default value.
    # OEMReserved uint32

    def pack(self) -> bytes:
        return struct.pack("!HBI", self.SourceAddress, self.ActivationType, self.Reserved)


@dataclass
class RoutingActivationResponse:
    SourceAddress: int
    TargetAddress: int
    RoutingActivationResponseCode: int
    Reserved: int = 0x00000000  # Not used, default value.
    # OEMReserved uint32

    @classmethod
    def unpack(cls, data: bytes) -> RoutingActivationResponse:
        (
            source_address,
            target_address,
            routing_activation_response_code,
            reserved,
        ) = struct.unpack("!HHBI", data)
        if reserved != 0x00000000:
            raise ValueError("reserved field contains data")
        return cls(
            source_address,
            target_address,
            routing_activation_response_code,
            reserved,
        )


@dataclass
class DiagnosticMessage:
    SourceAddress: int
    TargetAddress: int
    UserData: bytes

    def pack(self) -> bytes:
        return (
            struct.pack(
                "!HH",
                self.SourceAddress,
                self.TargetAddress,
            )
            + self.UserData
        )

    @classmethod
    def unpack(cls, data: bytes) -> DiagnosticMessage:
        source_address, target_address = struct.unpack("!HH", data[:4])
        data = data[4:]
        return cls(source_address, target_address, data)


@dataclass
class DiagnosticMessageAcknowledgement:
    SourceAddress: int
    TargetAddress: int
    ACKCode: int
    PreviousDiagnosticMessageData: bytes

    def pack(self) -> bytes:
        return (
            struct.pack(
                "!HHB",
                self.SourceAddress,
                self.TargetAddress,
                self.ACKCode,
            )
            + self.PreviousDiagnosticMessageData
        )


class DiagnosticMessagePositiveAcknowledgement(DiagnosticMessageAcknowledgement):
    ACKCode: DiagnosticMessagePositiveAckCodes

    @classmethod
    def unpack(cls, data: bytes) -> DiagnosticMessagePositiveAcknowledgement:
        source_address, target_address, ack_code = struct.unpack("!HHB", data[:5])
        prev_data = data[5:]

        return cls(
            source_address,
            target_address,
            DiagnosticMessagePositiveAckCodes(ack_code),
            prev_data,
        )


class DiagnosticMessageNegativeAcknowledgement(DiagnosticMessageAcknowledgement):
    ACKCode: DiagnosticMessageNegativeAckCodes

    @classmethod
    def unpack(cls, data: bytes) -> DiagnosticMessageNegativeAcknowledgement:
        source_address, target_address, ack_code = struct.unpack("!HHB", data[:5])
        prev_data = data[5:]

        return cls(
            source_address,
            target_address,
            DiagnosticMessageNegativeAckCodes(ack_code),
            prev_data,
        )


@dataclass
class AliveCheckRequest:
    pass


@dataclass
class AliveCheckResponse:
    SourceAddress: int

    def pack(self) -> bytes:
        return struct.pack("!H", self.SourceAddress)


# Messages expected to be sent by the DoIP gateway.
DoIPInData = (
    GenericDoIPHeaderNACK
    | RoutingActivationResponse
    | DiagnosticMessage
    | DiagnosticMessagePositiveAcknowledgement
    | DiagnosticMessageNegativeAcknowledgement
    | AliveCheckRequest
)

# Messages expected to be sent by us.
DoIPOutData = RoutingActivationRequest | DiagnosticMessage | AliveCheckResponse

DoIPFrame = tuple[
    GenericHeader,
    DoIPInData | DoIPOutData,
]
DoIPDiagFrame = tuple[GenericHeader, DiagnosticMessage]


class DoIPConnection:
    def __init__(  # noqa: PLR0913
        self,
        reader: asyncio.StreamReader,
        writer: asyncio.StreamWriter,
        src_addr: int,
        target_addr: int,
        protocol_version: int,
    ):
        self.reader = reader
        self.writer = writer
        self.src_addr = src_addr
        self.target_addr = target_addr
        self.protocol_version = protocol_version
        self._read_queue: asyncio.Queue[DoIPFrame] = asyncio.Queue()
        self._read_task = asyncio.create_task(self._read_worker())
        self._is_closed = False
        self._mutex = asyncio.Lock()

    @classmethod
    async def connect(  # noqa: PLR0913
        cls,
        host: str,
        port: int,
        src_addr: int,
        target_addr: int,
        so_linger: bool = False,
        protocol_version: int = ProtocolVersions.ISO_13400_2_2019,
    ) -> DoIPConnection:
        reader, writer = await asyncio.open_connection(host, port)

        if so_linger is True:
            # Depending on who will close the connection in the end, one party's socket
            # will remain in a TIME_WAIT state, which occupies resources until enough
            # time has passed. Setting the LINGER socket option tells our kernel to
            # close the connection with a RST, which brings the TCP connection to an
            # error state and thus avoids TIME_WAIT and instantly forces LISTEN or CLOSED
            # For more info, see e.g. Note 3 of :
            # https://www.ietf.org/rfc/rfc9293.html#name-state-machine-overview
            sock = writer.get_extra_info("socket")
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0))

        return cls(reader, writer, src_addr, target_addr, protocol_version)

    async def _read_frame(self) -> DoIPFrame | tuple[None, None]:
        # Header is fixed size 8 byte.
        hdr_buf = await self.reader.readexactly(8)
        hdr = GenericHeader.unpack(hdr_buf)

        payload_buf = await self.reader.readexactly(hdr.PayloadLength)
        payload: DoIPInData
        match hdr.PayloadType:
            case PayloadTypes.GenericDoIPHeaderNACK:
                payload = GenericDoIPHeaderNACK.unpack(payload_buf)
            case PayloadTypes.RoutingActivationResponse:
                payload = RoutingActivationResponse.unpack(payload_buf)
            case PayloadTypes.DiagnosticMessagePositiveAcknowledgement:
                payload = DiagnosticMessagePositiveAcknowledgement.unpack(payload_buf)
            case PayloadTypes.DiagnosticMessageNegativeAcknowledgement:
                payload = DiagnosticMessageNegativeAcknowledgement.unpack(payload_buf)
            case PayloadTypes.DiagnosticMessage:
                payload = DiagnosticMessage.unpack(payload_buf)
            case PayloadTypes.AliveCheckRequest:
                payload = AliveCheckRequest()
            case _:
                logger.warning(
                    f"DoIP message with unhandled PayloadType: {hdr} {payload_buf.hex()}"
                )
                return None, None
        logger.trace("Received DoIP message: %s, %s", hdr, payload)
        return hdr, payload

    async def _read_worker(self) -> None:
        try:
            while True:
                hdr, data = await self._read_frame()
                if hdr is None or data is None:
                    continue
                if hdr.PayloadType == PayloadTypes.DiagnosticMessage and isinstance(
                    data, AliveCheckRequest
                ):
                    await self.write_alive_check_response()
                    continue
                await self._read_queue.put((hdr, data))
        except asyncio.CancelledError:
            logger.debug("read worker cancelled")
        except asyncio.IncompleteReadError as e:
            logger.debug(f"read worker received EOF: {e}")
        except Exception as e:
            logger.critical(f"read worker died with {type(e)}: {e}")
        finally:
            logger.debug("Feeding EOF to reader and requesting a close")
            self.reader.feed_eof()
            await self.close()

    async def read_frame_unsafe(self) -> DoIPFrame:
        # Avoid waiting on the queue forever when
        # the connection has been terminated.
        if self._is_closed:
            raise ConnectionError()
        return await self._read_queue.get()

    async def read_frame(self) -> DoIPFrame:
        async with self._mutex:
            return await self.read_frame_unsafe()

    async def read_diag_request_raw(self) -> DoIPDiagFrame:
        unexpected_packets: list[tuple[Any, Any]] = []
        while True:
            hdr, payload = await self.read_frame()
            if not isinstance(payload, DiagnosticMessage):
                logger.warning(f"expected DoIP DiagnosticMessage, instead got: {hdr} {payload}")
                unexpected_packets.append((hdr, payload))
                continue
            if payload.SourceAddress != self.target_addr or payload.TargetAddress != self.src_addr:
                logger.warning(
                    f"DoIP-DiagnosticMessage: unexpected addresses (src:dst); expected {self.src_addr:#04x}:"
                    + f"{self.target_addr:#04x} but got: {payload.SourceAddress:#04x}:{payload.TargetAddress:#04x}"
                )
                unexpected_packets.append((hdr, payload))
                continue

            # Do not consume unexpected packets, but re-add them to the queue for other consumers
            for item in unexpected_packets:
                await self._read_queue.put(item)

            return hdr, payload

    async def read_diag_request(self) -> bytes:
        _, payload = await self.read_diag_request_raw()
        return payload.UserData

    async def _read_ack(self, prev_data: bytes) -> None:
        unexpected_packets: list[tuple[Any, Any]] = []
        while True:
            hdr, payload = await self.read_frame_unsafe()
            if not isinstance(payload, DiagnosticMessagePositiveAcknowledgement) and not isinstance(
                payload, DiagnosticMessageNegativeAcknowledgement
            ):
                logger.warning(f"expected DoIP positive/negative ACK, instead got: {hdr} {payload}")
                unexpected_packets.append((hdr, payload))
                continue

            if payload.SourceAddress != self.target_addr or payload.TargetAddress != self.src_addr:
                logger.warning(
                    f"DoIP-ACK: unexpected addresses (src:dst); expected {self.src_addr:#04x}:{self.target_addr:#04x} "
                    + f"but got: {payload.SourceAddress:#04x}:{payload.TargetAddress:#04x}"
                )
                unexpected_packets.append((hdr, payload))
                continue
            if (
                len(payload.PreviousDiagnosticMessageData) > 0
                and payload.PreviousDiagnosticMessageData
                != prev_data[: len(payload.PreviousDiagnosticMessageData)]
            ):
                logger.warning("ack: previous data differs from request")
                logger.warning(
                    f"DoIP-ACK: got: {payload.PreviousDiagnosticMessageData.hex()} expected {prev_data.hex()}"
                )
                unexpected_packets.append((hdr, payload))
                continue

            # Do not consume unexpected packets, but re-add them to the queue for other consumers
            for item in unexpected_packets:
                await self._read_queue.put(item)

            if isinstance(payload, DiagnosticMessageNegativeAcknowledgement):
                raise DoIPNegativeAckError(payload.ACKCode)
            return

    async def _read_routing_activation_response(self) -> None:
        unexpected_packets: list[tuple[Any, Any]] = []
        while True:
            hdr, payload = await self.read_frame_unsafe()
            if not isinstance(payload, RoutingActivationResponse):
                logger.warning(
                    f"expected DoIP RoutingActivationResponse, instead got: {hdr} {payload}"
                )
                unexpected_packets.append((hdr, payload))
                continue

            # Do not consume unexpected packets, but re-add them to the queue for other consumers
            for item in unexpected_packets:
                await self._read_queue.put(item)

            if payload.RoutingActivationResponseCode != RoutingActivationResponseCodes.Success:
                raise DoIPRoutingActivationDeniedError(payload.RoutingActivationResponseCode)
            return

    async def write_request_raw(self, hdr: GenericHeader, payload: DoIPOutData) -> None:
        async with self._mutex:
            buf = b""
            buf += hdr.pack()
            buf += payload.pack()
            self.writer.write(buf)
            await self.writer.drain()

            logger.trace("Sent DoIP message: hdr: %s, payload: %s", hdr, payload)

            try:
                match payload:
                    case DiagnosticMessage():
                        # Now an ACK message is expected.
                        await asyncio.wait_for(
                            self._read_ack(payload.UserData),
                            TimingAndCommunicationParameters.DiagnosticMessageMessageAckTimeout
                            / 1000,
                        )
                    case RoutingActivationRequest():
                        await asyncio.wait_for(
                            self._read_routing_activation_response(),
                            TimingAndCommunicationParameters.RoutingActivationResponseTimeout
                            / 1000,
                        )
            except TimeoutError as e:
                await self.close()
                raise BrokenPipeError("Timeout while waiting for DoIP ACK message") from e

    async def write_diag_request(self, data: bytes) -> None:
        hdr = GenericHeader(
            ProtocolVersion=self.protocol_version,
            PayloadType=PayloadTypes.DiagnosticMessage,
            PayloadLength=len(data) + 4,
        )
        payload = DiagnosticMessage(
            SourceAddress=self.src_addr,
            TargetAddress=self.target_addr,
            UserData=data,
        )
        await self.write_request_raw(hdr, payload)

    async def write_routing_activation_request(
        self,
        activation_type: int,
    ) -> None:
        hdr = GenericHeader(
            ProtocolVersion=self.protocol_version,
            PayloadType=PayloadTypes.RoutingActivationRequest,
            PayloadLength=7,
        )
        payload = RoutingActivationRequest(
            SourceAddress=self.src_addr,
            ActivationType=activation_type,
            Reserved=0x00,
        )
        await self.write_request_raw(hdr, payload)

    async def write_alive_check_response(self) -> None:
        hdr = GenericHeader(
            ProtocolVersion=self.protocol_version,
            PayloadType=PayloadTypes.AliveCheckResponse,
            PayloadLength=2,
        )
        payload = AliveCheckResponse(
            SourceAddress=self.src_addr,
        )
        await self.write_request_raw(hdr, payload)

    async def close(self) -> None:
        logger.debug("Closing DoIP connection...")
        if self._is_closed:
            logger.debug("Already closed!")
            return
        self._is_closed = True
        logger.debug("Cancelling read worker")
        self._read_task.cancel()
        self.writer.close()
        logger.debug("Awaiting confirmation of closed writer")
        await self.writer.wait_closed()


class DoIPConfig(BaseModel):
    src_addr: int
    target_addr: int
    activation_type: int = RoutingActivationRequestTypes.WWH_OBD.value
    protocol_version: int = ProtocolVersions.ISO_13400_2_2019

    @field_validator(
        "src_addr",
        "target_addr",
        "activation_type",
        "protocol_version",
        mode="before",
    )
    def auto_int(cls, v: str) -> int:
        return auto_int(v)


[docs] class DoIPTransport(BaseTransport, scheme="doip"): def __init__( self, target: TargetURI, port: int, config: DoIPConfig, conn: DoIPConnection, ): super().__init__(target) self.port = port self.config = config self._conn = conn self._is_closed = False @staticmethod async def _connect( # noqa: PLR0913 hostname: str, port: int, src_addr: int, target_addr: int, activation_type: int, protocol_version: int, ) -> DoIPConnection: conn = await DoIPConnection.connect( hostname, port, src_addr, target_addr, protocol_version=protocol_version, ) await conn.write_routing_activation_request(RoutingActivationRequestTypes(activation_type)) return conn
[docs] @classmethod async def connect( cls, target: str | TargetURI, timeout: float | None = None, ) -> DoIPTransport: t = target if isinstance(target, TargetURI) else TargetURI(target) cls.check_scheme(t) if t.hostname is None: raise ValueError("no hostname specified") port = t.port if t.port is not None else 13400 config = DoIPConfig(**t.qs_flat) conn = await asyncio.wait_for( cls._connect( t.hostname, port, config.src_addr, config.target_addr, config.activation_type, config.protocol_version, ), timeout, ) return cls(t, port, config, conn)
[docs] async def close(self) -> None: if self._is_closed: return self._is_closed = True await self._conn.close()
[docs] async def read( self, timeout: float | None = None, tags: list[str] | None = None, ) -> bytes: data = await asyncio.wait_for(self._conn.read_diag_request(), timeout) t = tags + ["read"] if tags is not None else ["read"] logger.trace(data.hex(), extra={"tags": t}) return data
[docs] async def write( self, data: bytes, timeout: float | None = None, tags: list[str] | None = None, ) -> int: t = tags + ["write"] if tags is not None else ["write"] logger.trace(data.hex(), extra={"tags": t}) try: await asyncio.wait_for(self._conn.write_diag_request(data), timeout) except DoIPNegativeAckError as e: if e.nack_code != DiagnosticMessageNegativeAckCodes.TargetUnreachable: raise e # TargetUnreachable can be just a temporary issue. Thus, we do not raise # BrokenPipeError but instead ignore it here and let upper layers handle # missing responses logger.debug("DoIP message was ACKed with TargetUnreachable") return len(data)