# SPDX-FileCopyrightText: AISEC Pentesting Team
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import inspect
import struct
from abc import ABC, abstractmethod
from collections.abc import Sequence
from struct import pack
from typing import Any, TypeVar
from gallia.log import get_logger
from gallia.services.uds.core.constants import (
DTCFormatIdentifier,
InputOutputControlParameter,
ReadDTCInformationSubFuncs,
RoutineControlSubFuncs,
UDSErrorCodes,
UDSIsoServices,
UDSIsoServicesEchoLength,
)
from gallia.services.uds.core.utils import (
address_and_size_length,
any_repr,
bytes_repr,
check_data_identifier,
check_length,
check_range,
check_sub_function,
from_bytes,
int_repr,
service_repr,
sub_function_split,
to_bytes,
uds_memory_parameters,
)
logger = get_logger(__name__)
# ****************
# * Base classes *
# ****************
T_UDSRequest = TypeVar("T_UDSRequest", bound="UDSRequest")
[docs]
class UDSRequest(ABC):
SERVICE_ID: int | None
RESPONSE_TYPE: type[PositiveResponse]
_MINIMAL_LENGTH: int
_MAXIMAL_LENGTH: int | None
def __init_subclass__(
cls,
/,
service_id: int | None,
response_type: type[PositiveResponse],
minimal_length: int,
maximal_length: int | None,
**kwargs: Any,
) -> None:
super().__init_subclass__(**kwargs)
cls.SERVICE_ID = service_id
cls.RESPONSE_TYPE = response_type
cls._MINIMAL_LENGTH = minimal_length
cls._MAXIMAL_LENGTH = maximal_length
@property
@abstractmethod
def pdu(self) -> bytes:
pass
@classmethod
def from_pdu(cls: type[T_UDSRequest], pdu: bytes) -> T_UDSRequest:
cls._check_pdu(pdu)
result = cls._from_pdu(pdu)
assert result.pdu == pdu
return result
@classmethod
@abstractmethod
def _from_pdu(cls: type[T_UDSRequest], pdu: bytes) -> T_UDSRequest:
pass
@classmethod
def _check_pdu(cls, pdu: bytes) -> None:
check_length(pdu, cls._MINIMAL_LENGTH, cls._MAXIMAL_LENGTH)
if cls.SERVICE_ID is not None and pdu[0] != cls.SERVICE_ID:
raise ValueError(f"Service ID mismatch: {hex(pdu[0])} != {hex(cls.SERVICE_ID)}")
@property
def service_id(self) -> int:
return self.pdu[0]
@property
def data(self) -> bytes:
return self.pdu[1:]
def __repr__(self) -> str:
title = self.__class__.__name__
relevant_attributes = {}
for attr, value in self.__dict__.items():
if not attr.startswith("_"):
relevant_attributes[attr] = any_repr(value)
attributes = ", ".join(f"{attr}={value}" for attr, value in relevant_attributes.items())
return f"{title}({attributes})"
@staticmethod
def parse_dynamic(pdu: bytes) -> UDSRequest:
try:
logger.trace("Dynamically parsing request")
logger.trace(f" - Got PDU {pdu.hex()}")
request_service = UDSService._SERVICES[UDSIsoServices(pdu[0])]
logger.trace(f" - Inferred service {request_service.__name__}")
if (request_type := request_service.Request) is not None:
logger.trace(f" - Trying {request_type.__name__}")
return request_type.from_pdu(pdu)
if issubclass(request_service, SpecializedSubFunctionService):
logger.trace(" - Trying to infer subFunction")
request_sub_function = request_service._sub_function_type(pdu)
logger.trace(f" - Inferred subFunction {request_sub_function.__name__}")
assert (request_type := request_sub_function.Request) is not None
logger.trace(f" - Trying {request_type.__name__}")
return request_type.from_pdu(pdu)
raise ValueError("Request cannot be parsed")
except Exception as e:
logger.trace(
f" - Falling back to RawRequest because of the following problem: {repr(e)}"
)
return RawRequest(pdu)
T_UDSResponse = TypeVar("T_UDSResponse", bound="UDSResponse")
[docs]
class UDSResponse(ABC):
SERVICE_ID: int | None
RESPONSE_SERVICE_ID: int | None
_MINIMAL_LENGTH: int
_MAXIMAL_LENGTH: int | None
def __init_subclass__(
cls,
/,
service_id: int | None,
minimal_length: int,
maximal_length: int | None,
**kwargs: Any,
) -> None:
super().__init_subclass__(**kwargs)
cls.SERVICE_ID = service_id
cls.RESPONSE_SERVICE_ID = None if service_id is None else service_id + 0x40
cls._MINIMAL_LENGTH = minimal_length
cls._MAXIMAL_LENGTH = maximal_length
def __init__(self) -> None:
self.trigger_request: UDSRequest | None = None
@property
@abstractmethod
def pdu(self) -> bytes:
pass
@classmethod
def from_pdu(cls: type[T_UDSResponse], pdu: bytes) -> T_UDSResponse:
cls._check_pdu(pdu)
return cls._from_pdu(pdu)
@classmethod
@abstractmethod
def _from_pdu(cls: type[T_UDSResponse], pdu: bytes) -> T_UDSResponse:
pass
@classmethod
def _check_pdu(cls, pdu: bytes) -> None:
check_length(pdu, cls._MINIMAL_LENGTH, cls._MAXIMAL_LENGTH)
if (
cls.RESPONSE_SERVICE_ID is not None
and cls.SERVICE_ID is not None
and pdu[0] != cls.RESPONSE_SERVICE_ID
):
raise ValueError(
f"Service ID mismatch: {hex(pdu[0])} != {hex(cls.RESPONSE_SERVICE_ID)}"
f" ({hex(cls.SERVICE_ID)} + 0x40)"
)
@property
def service_id(self) -> int:
assert self.SERVICE_ID is not None
return self.SERVICE_ID
@abstractmethod
def matches(self, request: UDSRequest) -> bool:
pass
@staticmethod
def parse_dynamic(pdu: bytes) -> UDSResponse:
if pdu[0] == UDSIsoServices.NegativeResponse:
return NegativeResponse.from_pdu(pdu)
response_type: type[PositiveResponse]
logger.trace("Dynamically parsing response")
logger.trace(f" - Got PDU {pdu.hex()}")
try:
response_service = UDSService._SERVICES[UDSIsoServices(pdu[0] - 0x40)]
except Exception:
logger.trace(" - Falling back to raw response because the service is unknown")
return RawPositiveResponse(pdu)
logger.trace(f" - Inferred service {response_service.__name__}")
if response_service.Response is not None:
response_type = response_service.Response
elif issubclass(response_service, SpecializedSubFunctionService):
if len(pdu) < 2:
raise ValueError("Message of subfunction service contains no subfunction")
logger.trace(" - Trying to infer subfunction")
try:
response_sub_function = response_service._sub_function_type(pdu)
except ValueError as e:
logger.trace(f" - Falling back to raw response because {str(e)}")
return RawPositiveResponse(pdu)
logger.trace(f" - Inferred subFunction {response_sub_function.__name__}")
assert (response_type_ := response_sub_function.Response) is not None
response_type = response_type_
else:
logger.trace(" - Falling back to raw response because the response cannot be parsed")
return RawPositiveResponse(pdu)
logger.trace(f" - Trying {response_type.__name__}")
return response_type.from_pdu(pdu)
T_RawResponse = TypeVar("T_RawResponse", bound="RawResponse")
class RawResponse(UDSResponse, ABC, service_id=None, minimal_length=1, maximal_length=None):
def __init__(self, pdu: bytes) -> None:
super().__init__()
self._pdu = pdu
@classmethod
def _from_pdu(cls: type[T_RawResponse], pdu: bytes) -> T_RawResponse:
return cls(pdu)
@property
def pdu(self) -> bytes:
return self._pdu
@pdu.setter
def pdu(self, pdu: bytes) -> None:
self._pdu = pdu
def __repr__(self) -> str:
return f"{type(self).__name__}(pdu={bytes_repr(self.pdu)})"
class NegativeResponseBase(
UDSResponse,
ABC,
service_id=UDSIsoServices.NegativeResponse,
minimal_length=1,
maximal_length=None,
):
pass
class RawNegativeResponse(
NegativeResponseBase,
RawResponse,
service_id=UDSIsoServices.NegativeResponse,
minimal_length=1,
maximal_length=None,
):
def matches(self, request: UDSRequest) -> bool:
return len(self.pdu) > 1 and self.pdu[1] == request.service_id
[docs]
class NegativeResponse(
NegativeResponseBase,
service_id=UDSIsoServices.NegativeResponse,
minimal_length=3,
maximal_length=3,
):
def __init__(self, request_service_id: int, response_code: UDSErrorCodes) -> None:
super().__init__()
self.request_service_id = request_service_id
self.response_code = response_code
@property
def pdu(self) -> bytes:
return pack("!BBB", self.SERVICE_ID, self.request_service_id, self.response_code)
@classmethod
def _from_pdu(cls, pdu: bytes) -> NegativeResponse:
return NegativeResponse(pdu[1], UDSErrorCodes(pdu[2]))
@classmethod
def _check_pdu(cls, pdu: bytes) -> None:
check_length(pdu, cls._MINIMAL_LENGTH, cls._MAXIMAL_LENGTH)
if pdu[0] != UDSIsoServices.NegativeResponse:
raise ValueError(
f"Not a negative response: {hex(pdu[0])} != "
f"{hex(UDSIsoServices.NegativeResponse)}"
)
def matches(self, request: UDSRequest) -> bool:
return self.request_service_id == request.service_id
def __str__(self) -> str:
return str(self.response_code.name)
def __repr__(self) -> str:
return (
f"{type(self).__name__}(response_code={self.response_code.name}, "
f"request_service={service_repr(self.request_service_id)})"
)
T_PositiveResponse = TypeVar("T_PositiveResponse", bound="PositiveResponse")
[docs]
class PositiveResponse(UDSResponse, ABC, service_id=None, minimal_length=0, maximal_length=None):
@property
def data(self) -> bytes:
return self.pdu[1:]
def __repr__(self) -> str:
title = self.__class__.__name__
relevant_attributes = {}
for attr, value in self.__dict__.items():
if not attr.startswith("_") and attr not in ["trigger_request"]:
relevant_attributes[attr] = any_repr(value)
attributes = ", ".join(f"{attr}={value}" for attr, value in relevant_attributes.items())
return f"{title}({attributes})"
@classmethod
def parse_static(
cls: type[T_PositiveResponse], response_pdu: bytes
) -> NegativeResponse | T_PositiveResponse:
if response_pdu[0] == 0x7F:
negative_response = NegativeResponse.from_pdu(response_pdu)
return negative_response
response = cls.from_pdu(response_pdu)
return response
class UDSService(ABC):
SERVICE_ID: UDSIsoServices | None
_SERVICES: dict[UDSIsoServices | None, type[UDSService]] = {}
Response: type[PositiveResponse] | None = None
Request: type[UDSRequest] | None = None
@classmethod
def _response_type(cls, pdu: bytes) -> type[PositiveResponse] | None:
return cls.Response
@classmethod
def _request_type(cls, pdu: bytes) -> type[UDSRequest] | None:
return cls.Request
def __init_subclass__(cls, /, service_id: UDSIsoServices | None, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
cls.SERVICE_ID = service_id
UDSService._SERVICES[service_id] = cls
class SubFunction(ABC):
SUB_FUNCTION_ID: int | None
Response: type[PositiveResponse] | None
Request: type[UDSRequest] | None
def __init_subclass__(cls, /, sub_function_id: int | None, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
cls.SUB_FUNCTION_ID = sub_function_id
class SpecializedSubFunctionService(UDSService, ABC, service_id=None):
@classmethod
def _sub_function_type(cls, pdu: bytes) -> type[SubFunction]:
sub_function_id = pdu[1] % 0x80
sub_functions = [
x for x in cls.__dict__.values() if inspect.isclass(x) and issubclass(x, SubFunction)
]
for sub_function in sub_functions:
if sub_function.SUB_FUNCTION_ID == sub_function_id:
return sub_function
raise ValueError(f"SubFunction not supported: {int_repr(sub_function_id)}")
@classmethod
def _response_type(cls, pdu: bytes) -> type[PositiveResponse] | None:
return cls._sub_function_type(pdu).Response
@classmethod
def _request_type(cls, pdu: bytes) -> type[UDSRequest] | None:
return cls._sub_function_type(pdu).Request
[docs]
class SubFunctionResponse(
PositiveResponse, ABC, service_id=None, minimal_length=2, maximal_length=None
):
def __init__(self) -> None:
super().__init__()
check_sub_function(self.sub_function)
@property
@abstractmethod
def sub_function(self) -> int:
pass
@classmethod
def _check_pdu(cls, pdu: bytes) -> None:
super()._check_pdu(pdu)
check_sub_function(pdu[1])
[docs]
class SubFunctionRequest(
UDSRequest,
ABC,
service_id=None,
response_type=SubFunctionResponse, # type: ignore
minimal_length=2,
maximal_length=None,
):
def __init__(self, suppress_response: bool) -> None:
check_sub_function(self.sub_function)
self.suppress_response = suppress_response
@property
@abstractmethod
def sub_function(self) -> int:
pass
@property
def sub_function_with_suppress_response_bit(self) -> int:
return int(self.suppress_response) * 0x80 + self.sub_function
@staticmethod
def suppress_response_set(pdu: bytes) -> bool:
return pdu[1] >= 0x80
class SpecializedSubFunctionResponse(
SubFunctionResponse, ABC, service_id=None, minimal_length=2, maximal_length=None
):
SUB_FUNCTION_ID: int
def __init_subclass__(cls, /, sub_function_id: int, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
cls.SUB_FUNCTION_ID = sub_function_id
def __init__(self) -> None:
super().__init__()
@classmethod
def _check_pdu(cls, pdu: bytes) -> None:
super()._check_pdu(pdu)
if pdu[1] != cls.SUB_FUNCTION_ID:
raise ValueError(
f"Sub-function ID mismatch: {hex(pdu[1])} != " f"{hex(cls.SUB_FUNCTION_ID)}"
)
@property
def sub_function(self) -> int:
return self.SUB_FUNCTION_ID
class SpecializedSubFunctionRequest(
SubFunctionRequest,
ABC,
service_id=None,
response_type=SpecializedSubFunctionResponse, # type: ignore
minimal_length=2,
maximal_length=None,
):
SUB_FUNCTION_ID: int
def __init_subclass__(cls, /, sub_function_id: int, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
cls.SUB_FUNCTION_ID = sub_function_id
def __init__(self, suppress_response: bool) -> None:
super().__init__(suppress_response)
@classmethod
def _check_pdu(cls, pdu: bytes) -> None:
super()._check_pdu(pdu)
if pdu[1] % 0x80 != cls.SUB_FUNCTION_ID:
raise ValueError(
f"Sub-function ID mismatch: {hex(pdu[1])} != " f"{hex(cls.SUB_FUNCTION_ID)}"
)
@property
def sub_function(self) -> int:
return self.SUB_FUNCTION_ID
# ******************************
# * Raw requests and responses *
# ******************************
class RawPositiveResponse(
RawResponse,
PositiveResponse,
service_id=None,
minimal_length=1,
maximal_length=None,
):
@property
def service_id(self) -> int:
return self.pdu[0] - 0x40
def matches(self, request: UDSRequest) -> bool:
if self.service_id != request.service_id:
return False
# Use the old heuristic approach to detect as many mismatches as possible on responses which could not be parsed
try:
echo_length = UDSIsoServicesEchoLength[UDSIsoServices(self.service_id)]
return request.pdu[1 : echo_length + 1] == self.pdu[1 : echo_length + 1]
except Exception:
pass
return True
class RawRequest(
UDSRequest,
service_id=None,
response_type=RawPositiveResponse,
minimal_length=1,
maximal_length=None,
):
def __init__(self, pdu: bytes) -> None:
"""Raw request, which does not need to be compliant with the standard.
It can be used to send arbitrary data packets.
:param pdu: The data.
"""
self._pdu = pdu
@classmethod
def _from_pdu(cls, pdu: bytes) -> RawRequest:
return RawRequest(pdu)
@property
def service_id(self) -> int:
return self.pdu[0]
@property
def pdu(self) -> bytes:
return self._pdu
@pdu.setter
def pdu(self, pdu: bytes) -> None:
self._pdu = pdu
def __repr__(self) -> str:
return f"{type(self).__name__}(pdu={bytes_repr(self.pdu)})"
class Raw(UDSService, service_id=None):
Response = RawPositiveResponse
Request = RawRequest
# ******************************
# * Diagnostic session control *
# ******************************
class DiagnosticSessionControlResponse(
SubFunctionResponse,
service_id=UDSIsoServices.DiagnosticSessionControl,
minimal_length=2,
maximal_length=None,
):
@property
def pdu(self) -> bytes:
return (
pack("!BB", self.RESPONSE_SERVICE_ID, self.diagnostic_session_type)
+ self.session_parameter_record
)
@classmethod
def _from_pdu(cls, pdu: bytes) -> DiagnosticSessionControlResponse:
diagnostic_session_type = from_bytes(pdu[1:2])
session_parameter_record = pdu[2:]
return DiagnosticSessionControlResponse(diagnostic_session_type, session_parameter_record)
def __init__(self, diagnostic_session_type: int, session_parameter_record: bytes = b"") -> None:
self.diagnostic_session_type = diagnostic_session_type
self.session_parameter_record = session_parameter_record
super().__init__()
def matches(self, request: UDSRequest) -> bool:
return (
isinstance(request, DiagnosticSessionControlRequest)
and request.diagnostic_session_type == self.diagnostic_session_type
)
@property
def sub_function(self) -> int:
return self.diagnostic_session_type
class DiagnosticSessionControlRequest(
SubFunctionRequest,
service_id=UDSIsoServices.DiagnosticSessionControl,
response_type=DiagnosticSessionControlResponse,
minimal_length=2,
maximal_length=2,
):
def __init__(self, diagnostic_session_type: int, suppress_response: bool = False) -> None:
"""Sets the diagnostic session which is specified by a specific diagnosticSessionType
sub-function.
This is an implementation of the UDS request for service DiagnosticSessionControl (0x10).
:param diagnostic_session_type: The session sub-function.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
self.diagnostic_session_type = diagnostic_session_type
super().__init__(suppress_response)
@property
def pdu(self) -> bytes:
return pack("!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit)
@classmethod
def _from_pdu(cls, pdu: bytes) -> DiagnosticSessionControlRequest:
return DiagnosticSessionControlRequest(*sub_function_split(pdu[1]))
@property
def sub_function(self) -> int:
return self.diagnostic_session_type
class DiagnosticSessionControl(UDSService, service_id=UDSIsoServices.DiagnosticSessionControl):
Response = DiagnosticSessionControlResponse
Request = DiagnosticSessionControlRequest
# *************
# * ECU reset *
# *************
class ECUResetResponse(
SubFunctionResponse,
service_id=UDSIsoServices.EcuReset,
minimal_length=2,
maximal_length=3,
):
def __init__(self, reset_type: int, power_down_time: int | None = None) -> None:
if power_down_time is not None:
check_range(power_down_time, "powerDownTime", 0, 0xFF)
self.reset_type = reset_type
self.power_down_time = power_down_time
super().__init__()
@property
def pdu(self) -> bytes:
if self.power_down_time is None:
return pack("!BB", self.RESPONSE_SERVICE_ID, self.reset_type)
return pack("!BBB", self.RESPONSE_SERVICE_ID, self.reset_type, self.power_down_time)
@classmethod
def _from_pdu(cls, pdu: bytes) -> ECUResetResponse:
reset_type = pdu[1]
power_down_time = pdu[2] if len(pdu) > 2 else None
return ECUResetResponse(reset_type, power_down_time)
def matches(self, request: UDSRequest) -> bool:
return isinstance(request, ECUResetRequest) and request.reset_type == self.reset_type
@property
def sub_function(self) -> int:
return self.reset_type
class ECUResetRequest(
SubFunctionRequest,
service_id=UDSIsoServices.EcuReset,
response_type=ECUResetResponse,
minimal_length=2,
maximal_length=2,
):
def __init__(self, reset_type: int, suppress_response: bool = False) -> None:
"""Resets the ECU using the specified reset type sub-function.
This is an implementation of the UDS request for service ECUReset (0x11).
:param reset_type: The reset type sub-function.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
self.reset_type = reset_type
super().__init__(suppress_response)
@property
def pdu(self) -> bytes:
return pack("!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit)
@classmethod
def _from_pdu(cls, pdu: bytes) -> ECUResetRequest:
return ECUResetRequest(*sub_function_split(pdu[1]))
@property
def sub_function(self) -> int:
return self.reset_type
class ECUReset(UDSService, service_id=UDSIsoServices.EcuReset):
Response = ECUResetResponse
Request = ECUResetRequest
# *******************
# * Security access *
# *******************
class SecurityAccessResponse(
SubFunctionResponse,
service_id=UDSIsoServices.SecurityAccess,
minimal_length=2,
maximal_length=None,
):
def __init__(self, security_access_type: int, security_seed: bytes = b"") -> None:
self.security_access_type = security_access_type
self.security_seed = security_seed
super().__init__()
@property
def pdu(self) -> bytes:
return pack("!BB", self.RESPONSE_SERVICE_ID, self.security_access_type) + self.security_seed
@classmethod
def _from_pdu(cls, pdu: bytes) -> SecurityAccessResponse:
security_access_type = from_bytes(pdu[1:2])
security_seed = pdu[2:]
return SecurityAccessResponse(security_access_type, security_seed)
def matches(self, request: UDSRequest) -> bool:
return (
isinstance(request, _SecurityAccessRequest)
and request.security_access_type == self.security_access_type
)
@property
def sub_function(self) -> int:
return self.security_access_type
class _SecurityAccessRequest(
SubFunctionRequest,
ABC,
service_id=UDSIsoServices.SecurityAccess,
response_type=SecurityAccessResponse,
minimal_length=2,
maximal_length=None,
):
def __init__(self, security_access_type: int, suppress_response: bool = False) -> None:
self.security_access_type = security_access_type
super().__init__(suppress_response)
@property
def sub_function(self) -> int:
return self.security_access_type
class RequestSeedRequest(
_SecurityAccessRequest,
service_id=UDSIsoServices.SecurityAccess,
response_type=SecurityAccessResponse,
minimal_length=2,
maximal_length=None,
):
def __init__(
self,
security_access_type: int,
security_access_data_record: bytes = b"",
suppress_response: bool = False,
) -> None:
"""Requests a seed for a security access level.
This is an implementation of the UDS request for the requestSeed sub-function group
of the service SecurityAccess (0x27).
:param security_access_type: The securityAccess type sub-function.
:param security_access_data_record: Optional data.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(security_access_type, suppress_response)
if security_access_type % 2 == 0:
raise ValueError(
f"RequestSeed requests must have an odd securityAccessType: "
f"{hex(security_access_type)}"
)
self.security_access_data_record = security_access_data_record
@property
def pdu(self) -> bytes:
return (
pack("!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit)
+ self.security_access_data_record
)
@classmethod
def _from_pdu(cls, pdu: bytes) -> RequestSeedRequest:
security_access_type, suppress_response = sub_function_split(pdu[1])
security_access_data_record = pdu[2:]
return RequestSeedRequest(
security_access_type, security_access_data_record, suppress_response
)
class SendKeyRequest(
_SecurityAccessRequest,
service_id=UDSIsoServices.SecurityAccess,
response_type=SecurityAccessResponse,
minimal_length=3,
maximal_length=None,
):
def __init__(
self,
security_access_type: int,
security_key: bytes,
suppress_response: bool = False,
) -> None:
"""Sends the key for a security access level.
This is an implementation of the UDS request for the sendKey sub-function group
of the service SecurityAccess (0x27).
:param security_access_type: The securityAccess type sub-function.
:param security_key: The response to the seed challenge.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(security_access_type, suppress_response)
if security_access_type % 2 == 1:
raise ValueError(
f"SendKey requests must have an even securityAccessType: "
f"{hex(security_access_type)}"
)
self.security_key = security_key
@property
def pdu(self) -> bytes:
return (
pack("!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit)
+ self.security_key
)
@classmethod
def _from_pdu(cls, pdu: bytes) -> SendKeyRequest:
security_access_type, suppress_response = sub_function_split(pdu[1])
security_key = pdu[2:]
return SendKeyRequest(security_access_type, security_key, suppress_response)
class SecurityAccess(SpecializedSubFunctionService, service_id=UDSIsoServices.SecurityAccess):
class RequestSeed(SubFunction, sub_function_id=None):
Response = SecurityAccessResponse
Request = RequestSeedRequest
class SendKey(SubFunction, sub_function_id=None):
Response = SecurityAccessResponse
Request = SendKeyRequest
@classmethod
def _sub_function_type(cls, pdu: bytes) -> type[SubFunction]:
sub_function_id = pdu[1]
return SecurityAccess.RequestSeed if sub_function_id % 2 == 1 else SecurityAccess.SendKey
# *************************
# * Communication control *
# *************************
class CommunicationControlResponse(
SubFunctionResponse,
service_id=UDSIsoServices.CommunicationControl,
minimal_length=2,
maximal_length=2,
):
def __init__(self, control_type: int) -> None:
self.control_type = control_type
super().__init__()
@property
def pdu(self) -> bytes:
return pack("!BB", self.RESPONSE_SERVICE_ID, self.control_type)
@classmethod
def _from_pdu(cls, pdu: bytes) -> CommunicationControlResponse:
control_type = pdu[1]
return CommunicationControlResponse(control_type)
def matches(self, request: UDSRequest) -> bool:
return (
isinstance(request, CommunicationControlRequest)
and request.control_type == self.control_type
)
@property
def sub_function(self) -> int:
return self.control_type
class CommunicationControlRequest(
SubFunctionRequest,
service_id=UDSIsoServices.CommunicationControl,
response_type=CommunicationControlResponse,
minimal_length=3,
maximal_length=3,
):
"""Controls communication of the ECU.
This is an implementation of the UDS request for service CommunicationControl (0x28).
:param control_type: The control type sub-function.
:param communication_type: The communication type.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
def __init__(
self,
control_type: int,
communication_type: int,
suppress_response: bool = False,
) -> None:
self.control_type = control_type
self.communication_type = communication_type
super().__init__(suppress_response)
@property
def pdu(self) -> bytes:
return pack(
"!BBB",
self.SERVICE_ID,
self.sub_function_with_suppress_response_bit,
self.communication_type,
)
@classmethod
def _from_pdu(cls, pdu: bytes) -> CommunicationControlRequest:
control_type, suppress_response = sub_function_split(pdu[1])
communication_type = pdu[2]
return CommunicationControlRequest(control_type, communication_type, suppress_response)
@property
def sub_function(self) -> int:
return self.control_type
class CommunicationControl(UDSService, service_id=UDSIsoServices.CommunicationControl):
Response = CommunicationControlResponse
Request = CommunicationControlRequest
# ******************
# * Tester present *
# ******************
class TesterPresentResponse(
SpecializedSubFunctionResponse,
service_id=UDSIsoServices.TesterPresent,
sub_function_id=0,
minimal_length=2,
maximal_length=2,
):
@property
def pdu(self) -> bytes:
return pack("!BB", self.RESPONSE_SERVICE_ID, self.SUB_FUNCTION_ID)
@classmethod
def _from_pdu(cls, pdu: bytes) -> TesterPresentResponse:
return TesterPresentResponse()
def matches(self, request: UDSRequest) -> bool:
return isinstance(request, TesterPresentRequest)
class TesterPresentRequest(
SpecializedSubFunctionRequest,
service_id=UDSIsoServices.TesterPresent,
sub_function_id=0,
response_type=TesterPresentResponse,
minimal_length=2,
maximal_length=2,
):
"""Signals to the ECU, that the tester is still present.
This is an implementation of the UDS request for service TesterPresent (0x3E).
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
def __init__(self, suppress_response: bool = False) -> None:
super().__init__(suppress_response)
@property
def pdu(self) -> bytes:
return pack("!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit)
@classmethod
def _from_pdu(cls, pdu: bytes) -> TesterPresentRequest:
return TesterPresentRequest(cls.suppress_response_set(pdu))
class TesterPresent(UDSService, service_id=UDSIsoServices.TesterPresent):
Response = TesterPresentResponse
Request = TesterPresentRequest
# ******************
# * Authentication *
# ******************
# ***************************
# * Access timing parameter *
# ***************************
# *****************************
# * Secured data transmission *
# *****************************
# ***********************
# * Control DTC setting *
# ***********************
class ControlDTCSettingResponse(
SubFunctionResponse,
service_id=UDSIsoServices.ControlDTCSetting,
minimal_length=2,
maximal_length=2,
):
def __init__(self, dtc_setting_type: int) -> None:
self.dtc_setting_type = dtc_setting_type
super().__init__()
@property
def pdu(self) -> bytes:
return pack("!BB", self.RESPONSE_SERVICE_ID, self.dtc_setting_type)
@classmethod
def _from_pdu(cls, pdu: bytes) -> ControlDTCSettingResponse:
dtc_setting_type = pdu[1]
return ControlDTCSettingResponse(dtc_setting_type)
def matches(self, request: UDSRequest) -> bool:
return (
isinstance(request, ControlDTCSettingRequest)
and request.dtc_setting_type == self.dtc_setting_type
)
@property
def sub_function(self) -> int:
return self.dtc_setting_type
class ControlDTCSettingRequest(
SubFunctionRequest,
service_id=UDSIsoServices.ControlDTCSetting,
response_type=ControlDTCSettingResponse,
minimal_length=2,
maximal_length=None,
):
def __init__(
self,
dtc_setting_type: int,
dtc_setting_control_option_record: bytes = b"",
suppress_response: bool = False,
) -> None:
"""Control the setting of DTCs.
This is an implementation of the UDS request for service ControlDTCSetting (0x85).
:param dtc_setting_type: The setting type.
:param dtc_setting_control_option_record: Optional data.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
self.dtc_setting_type = dtc_setting_type
self.dtc_setting_control_option_record = dtc_setting_control_option_record
super().__init__(suppress_response)
@property
def pdu(self) -> bytes:
return (
pack("!BB", self.SERVICE_ID, self.dtc_setting_type)
+ self.dtc_setting_control_option_record
)
@classmethod
def _from_pdu(cls, pdu: bytes) -> ControlDTCSettingRequest:
dtc_setting_type, suppress_response = sub_function_split(pdu[1])
dtc_setting_control_option_record = pdu[2:]
return ControlDTCSettingRequest(
dtc_setting_type, dtc_setting_control_option_record, suppress_response
)
@property
def sub_function(self) -> int:
return self.dtc_setting_type
class ControlDTCSetting(UDSService, service_id=UDSIsoServices.ControlDTCSetting):
Response = ControlDTCSettingResponse
Request = ControlDTCSettingRequest
# *********************
# * Response on event *
# *********************
# ****************
# * Link control *
# ****************
# ***************************
# * Read data by identifier *
# ***************************
class ReadDataByIdentifierResponse(
PositiveResponse,
service_id=UDSIsoServices.ReadDataByIdentifier,
minimal_length=4,
maximal_length=None,
):
def __init__(
self,
data_identifiers: int | Sequence[int],
data_records: bytes | Sequence[bytes],
) -> None:
super().__init__()
if not isinstance(data_identifiers, int):
self.data_identifiers = list(data_identifiers)
else:
self.data_identifiers = [data_identifiers]
if not isinstance(data_records, bytes):
self.data_records = list(data_records)
else:
self.data_records = [data_records]
if len(self.data_identifiers) != len(self.data_records):
raise ValueError(
f"The number of data identifiers does not match the number of "
f"data_records: "
f"{len(self.data_identifiers)} != {len(self.data_records)}"
)
for identifier in self.data_identifiers:
check_data_identifier(identifier)
@property
def data_record(self) -> bytes:
return self.data_records[0]
@data_record.setter
def data_record(self, data_record: bytes) -> None:
self.data_records[0] = data_record
@property
def data_identifier(self) -> int:
return self.data_identifiers[0]
@data_identifier.setter
def data_identifier(self, data_identifier: int) -> None:
self.data_identifiers[0] = data_identifier
@property
def pdu(self) -> bytes:
pdu = pack("!B", self.RESPONSE_SERVICE_ID)
for data_identifier, data_record in zip(self.data_identifiers, self.data_records):
pdu = pdu + to_bytes(data_identifier, 2) + data_record
return pdu
@classmethod
def _from_pdu(cls, pdu: bytes) -> ReadDataByIdentifierResponse:
# Without knowing the lengths of the dataRecords in a response with multiple dataIdentifiers
# and dataRecords it's not possible to recover all ids.
# Therefore, only the first identifier is used and the rest is simply attributed to the
# first dataRecord
data_identifier = from_bytes(pdu[1:3])
data_record = pdu[3:]
return ReadDataByIdentifierResponse(data_identifier, data_record)
def matches(self, request: UDSRequest) -> bool:
if not isinstance(request, ReadDataByIdentifierRequest):
return False
# Without knowing the lengths of the dataRecords in a response with multiple dataIdentifiers
# and dataRecords it's not possible to recover all ids. This is respected here, where only
# if this was possible,a complete check is done, while otherwise only the first id of the
# request is taken into account.
if len(self.data_identifiers) > 1:
if len(request.data_identifiers) != len(self.data_identifiers):
return False
return all(
req_id == resp_id
for req_id, resp_id in zip(request.data_identifiers, self.data_identifiers)
)
return request.data_identifiers[0] == self.data_identifiers[0]
@property
def _minimal_length(self) -> int:
return 4
class ReadDataByIdentifierRequest(
UDSRequest,
service_id=UDSIsoServices.ReadDataByIdentifier,
response_type=ReadDataByIdentifierResponse,
minimal_length=3,
maximal_length=None,
):
def __init__(self, data_identifiers: int | Sequence[int]) -> None:
"""Reads data which is identified by a specific dataIdentifier.
This is an implementation of the UDS request for service ReadDataByIdentifier (0x22).
While this implementation supports requesting multiple dataIdentifiers at once, as is
permitted in the standard, it is recommended to request them separately, because the support
is optional on the server side.
Additionally, it is not possible to reliably determine each single dataRecord from a
corresponding response.
:param data_identifiers: One or multiple dataIdentifiers. A dataIdentifier is a max two
bytes integer.
"""
if isinstance(data_identifiers, Sequence):
self.data_identifiers = list(data_identifiers)
else:
self.data_identifiers = [data_identifiers]
for identifier in self.data_identifiers:
check_data_identifier(identifier)
@property
def data_identifier(self) -> int:
return self.data_identifiers[0]
@data_identifier.setter
def data_identifier(self, data_identifier: int) -> None:
self.data_identifiers[0] = data_identifier
@classmethod
def _from_pdu(cls, pdu: bytes) -> ReadDataByIdentifierRequest:
identifiers: list[int] = []
for i in range(1, len(pdu), 2):
identifiers.append(from_bytes(pdu[i : i + 2]))
return ReadDataByIdentifierRequest(identifiers)
@property
def pdu(self) -> bytes:
return pack(
f"!B{len(self.data_identifiers)}H",
UDSIsoServices.ReadDataByIdentifier,
*self.data_identifiers,
)
class ReadDataByIdentifier(UDSService, service_id=UDSIsoServices.ReadDataByIdentifier):
Response = ReadDataByIdentifierResponse
Request = ReadDataByIdentifierRequest
# **************************
# * Read memory by address *
# **************************
class ReadMemoryByAddressResponse(
PositiveResponse,
service_id=UDSIsoServices.ReadMemoryByAddress,
minimal_length=2,
maximal_length=None,
):
@property
def pdu(self) -> bytes:
return pack("!B", self.RESPONSE_SERVICE_ID) + self.data_record
@classmethod
def _from_pdu(cls, pdu: bytes) -> ReadMemoryByAddressResponse:
data_record = pdu[1:]
return ReadMemoryByAddressResponse(data_record)
def __init__(self, data_record: bytes) -> None:
super().__init__()
self.data_record = data_record
def matches(self, request: UDSRequest) -> bool:
return (
isinstance(request, ReadMemoryByAddressRequest)
and len(self.data_record) == request.memory_size
)
class ReadMemoryByAddressRequest(
UDSRequest,
service_id=UDSIsoServices.ReadMemoryByAddress,
response_type=ReadMemoryByAddressResponse,
minimal_length=4,
maximal_length=32,
):
def __init__(
self,
memory_address: int,
memory_size: int,
address_and_length_format_identifier: int | None = None,
) -> None:
"""Reads data from a specific memory address on the UDS server.
This is an implementation of the UDS request for service ReadMemoryByAddress (0x3d).
While it exposes each parameter of the corresponding specification,
some parameters can be computed from the remaining ones and can therefore be omitted.
:param memory_address: The start address.
:param memory_size: The number of bytes to read.
:param address_and_length_format_identifier: The byte lengths of the memory address and size.
If omitted, this parameter is computed based on
the memory_address and memory_size parameters.
"""
self.memory_address = memory_address
self.memory_size = memory_size
if address_and_length_format_identifier is None:
address_and_length_format_identifier, _, _ = uds_memory_parameters(
memory_address, memory_size, address_and_length_format_identifier
)
self.address_and_length_format_identifier = address_and_length_format_identifier
@property
def pdu(self) -> bytes:
_, address_bytes, size_bytes = uds_memory_parameters(
self.memory_address,
self.memory_size,
self.address_and_length_format_identifier,
)
pdu = pack("!BB", self.SERVICE_ID, self.address_and_length_format_identifier)
pdu += address_bytes + size_bytes
return pdu
@classmethod
def _from_pdu(cls, pdu: bytes) -> ReadMemoryByAddressRequest:
address_and_length_format_identifier = pdu[1]
address_length, size_length = address_and_size_length(address_and_length_format_identifier)
if len(pdu) != 2 + address_length + size_length:
raise ValueError("The addressAndLengthIdentifier is incompatible with the PDU size")
return ReadMemoryByAddressRequest(
from_bytes(pdu[2 : 2 + address_length]),
from_bytes(pdu[2 + address_length : 2 + address_length + size_length]),
address_and_length_format_identifier,
)
class ReadMemoryByAddress(UDSService, service_id=UDSIsoServices.ReadMemoryByAddress):
Request = ReadMemoryByAddressRequest
Response = ReadMemoryByAddressResponse
# ***********************************
# * Read scaling data by identifier *
# ***********************************
# ************************************
# * Read data by periodic identifier *
# ************************************
# **************************************
# * Dynamically define data identifier *
# **************************************
# ******************************
# * Write memory by identifier *
# ******************************
class WriteDataByIdentifierResponse(
PositiveResponse,
minimal_length=3,
maximal_length=3,
service_id=UDSIsoServices.WriteDataByIdentifier,
):
def __init__(self, data_identifier: int) -> None:
super().__init__()
check_data_identifier(data_identifier)
self.data_identifier = data_identifier
@property
def pdu(self) -> bytes:
return pack("!BH", self.RESPONSE_SERVICE_ID, self.data_identifier)
@classmethod
def _from_pdu(cls, pdu: bytes) -> WriteDataByIdentifierResponse:
data_identifier = from_bytes(pdu[1:3])
return WriteDataByIdentifierResponse(data_identifier)
def matches(self, request: UDSRequest) -> bool:
return (
isinstance(request, WriteDataByIdentifierRequest)
and request.data_identifier == self.data_identifier
)
class WriteDataByIdentifierRequest(
UDSRequest,
service_id=UDSIsoServices.WriteDataByIdentifier,
response_type=WriteDataByIdentifierResponse,
minimal_length=4,
maximal_length=None,
):
def __init__(self, data_identifier: int, data_record: bytes) -> None:
"""Writes data which is identified by a specific dataIdentifier.
This is an implementation of the UDS request for service WriteDataByIdentifier (0x2E).
:param data_identifier: The identifier. A dataIdentifier is a max two bytes integer.
:param data_record: The data to be written.
"""
check_data_identifier(data_identifier)
if len(data_record) < 1:
raise ValueError("The dataRecord must not be empty")
self.data_identifier = data_identifier
self.data_record = data_record
@property
def pdu(self) -> bytes:
return pack("!BH", self.SERVICE_ID, self.data_identifier) + self.data_record
@classmethod
def _from_pdu(cls, pdu: bytes) -> WriteDataByIdentifierRequest:
data_identifier = from_bytes(pdu[1:3])
data_record = pdu[3:]
return WriteDataByIdentifierRequest(data_identifier, data_record)
class WriteDataByIdentifier(UDSService, service_id=UDSIsoServices.WriteDataByIdentifier):
Response = WriteDataByIdentifierResponse
Request = WriteDataByIdentifierRequest
# ***************************
# * Write memory by address *
# ***************************
class WriteMemoryByAddressResponse(
PositiveResponse,
service_id=UDSIsoServices.WriteMemoryByAddress,
minimal_length=4,
maximal_length=32,
):
@property
def pdu(self) -> bytes:
_, address_bytes, size_bytes = uds_memory_parameters(
self.memory_address,
self.memory_size,
self.address_and_length_format_identifier,
)
pdu = pack("!BB", self.RESPONSE_SERVICE_ID, self.address_and_length_format_identifier)
pdu += address_bytes + size_bytes
return pdu
@classmethod
def _from_pdu(cls, pdu: bytes) -> WriteMemoryByAddressResponse:
address_and_length_format_identifier = pdu[1]
addr_len, size_len = address_and_size_length(address_and_length_format_identifier)
if len(pdu) < 2 + addr_len + size_len:
raise ValueError(
"The PDU is smaller as specified by the addressAndLengthFormatIdentifier"
)
memory_address = from_bytes(pdu[2 : 2 + addr_len])
memory_size = from_bytes(pdu[2 + addr_len : 2 + addr_len + size_len])
return WriteMemoryByAddressResponse(
memory_address, memory_size, address_and_length_format_identifier
)
def __init__(
self,
memory_address: int,
memory_size: int,
address_and_length_format_identifier: int | None = None,
) -> None:
super().__init__()
self.memory_address = memory_address
self.memory_size = memory_size
if address_and_length_format_identifier is None:
address_and_length_format_identifier, _, _ = uds_memory_parameters(
memory_address, memory_size, address_and_length_format_identifier
)
self.address_and_length_format_identifier = address_and_length_format_identifier
def matches(self, request: UDSRequest) -> bool:
return (
isinstance(request, WriteMemoryByAddressRequest)
and self.address_and_length_format_identifier
== request.address_and_length_format_identifier
and self.memory_address == request.memory_address
and self.memory_size == request.memory_size
)
class WriteMemoryByAddressRequest(
UDSRequest,
service_id=UDSIsoServices.WriteMemoryByAddress,
response_type=WriteMemoryByAddressResponse,
minimal_length=5,
maximal_length=None,
):
def __init__(
self,
memory_address: int,
data_record: bytes,
memory_size: int | None = None,
address_and_length_format_identifier: int | None = None,
) -> None:
"""Writes data to a specific memory on the UDS server.
This is an implementation of the UDS request for service writeMemoryByAddress (0x3d).
While it exposes each parameter of the corresponding specification,
some parameters can be computed from the remaining ones and can therefore be omitted.
:param memory_address: The start address.
:param data_record: The data to be written.
:param memory_size: The number of bytes to write.
If omitted, the byte length of the data is used.
:param address_and_length_format_identifier: The byte lengths of the memory address and
size. If omitted, this parameter is computed
based on the memory_address and memory_size
or data_record parameters.
"""
self.memory_address = memory_address
self.data_record = data_record
# If the size is given explicitly, use it as is, otherwise take the size of the data
if memory_size is None:
memory_size = len(data_record)
self.memory_size = memory_size
if address_and_length_format_identifier is None:
address_and_length_format_identifier, _, _ = uds_memory_parameters(
memory_address, memory_size, address_and_length_format_identifier
)
self.address_and_length_format_identifier = address_and_length_format_identifier
@property
def pdu(self) -> bytes:
_, address_bytes, size_bytes = uds_memory_parameters(
self.memory_address,
self.memory_size,
self.address_and_length_format_identifier,
)
pdu = pack("!BB", self.SERVICE_ID, self.address_and_length_format_identifier)
pdu += address_bytes + size_bytes + self.data_record
return pdu
@classmethod
def _from_pdu(cls, pdu: bytes) -> WriteMemoryByAddressRequest:
address_and_length_format_identifier = pdu[1]
address_length, size_length = address_and_size_length(address_and_length_format_identifier)
if len(pdu) < 2 + address_length + size_length:
raise ValueError("The addressAndLengthIdentifier is incompatible with the PDU size")
return WriteMemoryByAddressRequest(
from_bytes(pdu[2 : 2 + address_length]),
pdu[2 + address_length + size_length :],
from_bytes(pdu[2 + address_length : 2 + address_length + size_length]),
address_and_length_format_identifier,
)
class WriteMemoryByAddress(UDSService, service_id=UDSIsoServices.WriteMemoryByAddress):
Request = WriteMemoryByAddressRequest
Response = WriteMemoryByAddressResponse
# ********************************
# * Clear diagnostic information *
# ********************************
class ClearDiagnosticInformationResponse(
PositiveResponse,
minimal_length=1,
maximal_length=1,
service_id=UDSIsoServices.ClearDiagnosticInformation,
):
@property
def pdu(self) -> bytes:
assert self.RESPONSE_SERVICE_ID is not None
return bytes([self.RESPONSE_SERVICE_ID])
@classmethod
def _from_pdu(cls, pdu: bytes) -> ClearDiagnosticInformationResponse:
return ClearDiagnosticInformationResponse()
def matches(self, request: UDSRequest) -> bool:
return isinstance(request, ClearDiagnosticInformationRequest)
class ClearDiagnosticInformationRequest(
UDSRequest,
minimal_length=4,
maximal_length=4,
service_id=UDSIsoServices.ClearDiagnosticInformation,
response_type=ClearDiagnosticInformationResponse,
):
def __init__(self, group_of_dtc: int) -> None:
"""Clears diagnostic trouble codes according to a given mask.
This is an implementation of the UDS request for service clearDiagnosticInformation (0x14).
:param group_of_dtc: The three byte mask, which determines the DTCs to be cleared.
"""
check_range(group_of_dtc, "groupOfDTC", 0, 0xFFFFFF)
self.group_of_dtc = group_of_dtc
@property
def pdu(self) -> bytes:
assert self.SERVICE_ID is not None
return bytes([self.SERVICE_ID]) + to_bytes(self.group_of_dtc, 3)
@classmethod
def _from_pdu(cls, pdu: bytes) -> ClearDiagnosticInformationRequest:
group_of_dtc = from_bytes(pdu[1:])
return ClearDiagnosticInformationRequest(group_of_dtc)
class ClearDiagnosticInformation(UDSService, service_id=UDSIsoServices.ClearDiagnosticInformation):
Response = ClearDiagnosticInformationResponse
Request = ClearDiagnosticInformationRequest
# ************************
# * Read DTC information *
# ************************
class _ReadDTCResponse(
SpecializedSubFunctionResponse,
ABC,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=0,
minimal_length=None,
maximal_length=None,
):
def matches(self, request: UDSRequest) -> bool:
return isinstance(request, _ReadDTCRequest) and self.sub_function == request.sub_function
class _ReadDTCRequest(
SpecializedSubFunctionRequest,
ABC,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=0,
minimal_length=None,
maximal_length=None,
response_type=_ReadDTCResponse,
):
pass
T_ReadDTCType0Response = TypeVar("T_ReadDTCType0Response", bound="_ReadDTCType0Response")
class _ReadDTCType0Response(
_ReadDTCResponse,
ABC,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=0,
minimal_length=6,
maximal_length=6,
):
def __init__(
self,
dtc_status_availability_mask: int,
dtc_format_identifier: DTCFormatIdentifier,
dtc_count: int,
) -> None:
super().__init__()
check_range(dtc_status_availability_mask, "DTCStatusAvailabilityMask", 0, 0xFF)
check_range(dtc_count, "DTCCount", 0, 0xFFFF)
self.dtc_status_availability_mask = dtc_status_availability_mask
self.dtc_format_identifier = dtc_format_identifier
self.dtc_count = dtc_count
@property
def pdu(self) -> bytes:
return pack(
"!BBBBH",
self.RESPONSE_SERVICE_ID,
self.sub_function,
self.dtc_status_availability_mask,
self.dtc_format_identifier,
self.dtc_count,
)
@classmethod
def _from_pdu(cls: type[T_ReadDTCType0Response], pdu: bytes) -> T_ReadDTCType0Response:
dtc_status_availability_mask = pdu[2]
dtc_format_identifier = DTCFormatIdentifier(pdu[3])
dtc_count = from_bytes(pdu[4:])
return cls(dtc_status_availability_mask, dtc_format_identifier, dtc_count)
T_ReadDTCType1Response = TypeVar("T_ReadDTCType1Response", bound="_ReadDTCType1Response")
class _ReadDTCType1Response(
_ReadDTCResponse,
ABC,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=0,
minimal_length=3,
maximal_length=None,
):
def __init__(
self,
dtc_status_availability_mask: int,
dtc_and_status_record: bytes | dict[int, int],
) -> None:
super().__init__()
check_range(dtc_status_availability_mask, "DTCStatusAvailabilityMask", 0, 0xFF)
if isinstance(dtc_and_status_record, bytes):
if len(dtc_and_status_record) % 4 != 0:
raise ValueError("Not a valid dtc_and_status_record")
self.dtc_and_status_record = {
from_bytes(dtc_and_status_record[i : i + 3]): dtc_and_status_record[i + 3]
for i in range(0, len(dtc_and_status_record), 4)
}
else:
for dtc, status in dtc_and_status_record.items():
check_range(dtc, "DTC", 0, 0xFFFFFF)
check_range(status, "DTC Status", 0, 0xFF)
self.dtc_and_status_record = dtc_and_status_record
self.dtc_status_availability_mask = dtc_status_availability_mask
def dtc_and_status_record_bytes(self) -> bytes:
return bytes(
bytearray().join(
bytearray(to_bytes(dtc, 3)) + to_bytes(status, 1)
for dtc, status in self.dtc_and_status_record.items()
)
)
@property
def pdu(self) -> bytes:
return (
pack(
"!BBB",
self.RESPONSE_SERVICE_ID,
self.sub_function,
self.dtc_status_availability_mask,
)
+ self.dtc_and_status_record_bytes()
)
@classmethod
def _from_pdu(cls: type[T_ReadDTCType1Response], pdu: bytes) -> T_ReadDTCType1Response:
dtc_status_availability_mask = pdu[2]
dtc_and_status_record = pdu[3:]
return cls(dtc_status_availability_mask, dtc_and_status_record)
T_ReadDTCType0Request = TypeVar("T_ReadDTCType0Request", bound="_ReadDTCType0Request")
class _ReadDTCType0Request(
_ReadDTCRequest,
ABC,
response_type=None,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=0,
minimal_length=3,
maximal_length=3,
):
def __init__(self, dtc_status_mask: int, suppress_response: bool = False) -> None:
super().__init__(suppress_response)
check_range(dtc_status_mask, "DTCStatusMask", 0, 0xFF)
self.dtc_status_mask = dtc_status_mask
@property
def pdu(self) -> bytes:
return pack(
"!BBB",
self.SERVICE_ID,
self.sub_function_with_suppress_response_bit,
self.dtc_status_mask,
)
@classmethod
def _from_pdu(cls: type[T_ReadDTCType0Request], pdu: bytes) -> T_ReadDTCType0Request:
dtc_status_mask = pdu[2]
return cls(dtc_status_mask, cls.suppress_response_set(pdu))
T_ReadDTCType6Request = TypeVar("T_ReadDTCType6Request", bound="_ReadDTCType6Request")
class _ReadDTCType6Request(
_ReadDTCRequest,
ABC,
response_type=None,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=0,
minimal_length=2,
maximal_length=2,
):
def __init__(self, suppress_response: bool = False) -> None:
super().__init__(suppress_response)
@property
def pdu(self) -> bytes:
return pack("!BBB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit)
@classmethod
def _from_pdu(cls: type[T_ReadDTCType6Request], pdu: bytes) -> T_ReadDTCType6Request:
return cls(cls.suppress_response_set(pdu))
class ReportNumberOfDTCByStatusMaskResponse(
_ReadDTCType0Response,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportNumberOfDTCByStatusMask,
minimal_length=6,
maximal_length=6,
):
pass
class ReportNumberOfDTCByStatusMaskRequest(
_ReadDTCType0Request,
minimal_length=3,
maximal_length=3,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportNumberOfDTCByStatusMask,
response_type=ReportNumberOfDTCByStatusMaskResponse,
):
def __init__(self, dtc_status_mask: int, suppress_response: bool = False) -> None:
"""Read the number of DTCs with the specified state from the UDS server.
This is an implementation of the UDS request for the reportNumberOfDTCByStatusMask
sub-function of the service ReadDTCInformation (0x19).
:param dtc_status_mask: Used to select a portion of the DTCs based on their state.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(dtc_status_mask, suppress_response)
class ReportDTCByStatusMaskResponse(
_ReadDTCType1Response,
minimal_length=3,
maximal_length=None,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportDTCByStatusMask,
):
pass
class ReportDTCByStatusMaskRequest(
_ReadDTCType0Request,
minimal_length=3,
maximal_length=3,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportDTCByStatusMask,
response_type=ReportDTCByStatusMaskResponse,
):
def __init__(self, dtc_status_mask: int, suppress_response: bool = False) -> None:
"""Read DTCs and their state from the UDS server.
This is an implementation of the UDS request for the reportDTCByStatusMask sub-function of
the service ReadDTCInformation (0x19).
:param dtc_status_mask: Used to select a portion of the DTCs based on their state.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(dtc_status_mask, suppress_response)
class ReportMirrorMemoryDTCByStatusMaskResponse(
_ReadDTCType1Response,
minimal_length=3,
maximal_length=None,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportMirrorMemoryDTCByStatusMask,
):
pass
class ReportMirrorMemoryDTCByStatusMaskRequest(
_ReadDTCType0Request,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportMirrorMemoryDTCByStatusMask,
response_type=ReportMirrorMemoryDTCByStatusMaskResponse,
minimal_length=3,
maximal_length=3,
):
def __init__(self, dtc_status_mask: int, suppress_response: bool = False) -> None:
"""Read DTCs and their state from the UDS server's mirror memory.
This is an implementation of the UDS request for the reportMirrorMemoryDTCByStatusMask
sub-function of the service ReadDTCInformation (0x19).
:param dtc_status_mask: Used to select a portion of the DTCs based on their state.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(dtc_status_mask, suppress_response)
class ReportNumberOfMirrorMemoryDTCByStatusMaskResponse(
_ReadDTCType0Response,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportNumberOfMirrorMemoryDTCByStatusMask,
minimal_length=6,
maximal_length=6,
):
pass
class ReportNumberOfMirrorMemoryDTCByStatusMaskRequest(
_ReadDTCType0Request,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportNumberOfMirrorMemoryDTCByStatusMask,
response_type=ReportNumberOfMirrorMemoryDTCByStatusMaskResponse,
minimal_length=3,
maximal_length=3,
):
def __init__(self, dtc_status_mask: int, suppress_response: bool = False) -> None:
"""Read the number of DTCs with the specified state from the UDS server's mirror memory.
This is an implementation of the UDS request for the
reportNumberOfMirrorMemoryDTCByStatusMask sub-function of the service ReadDTCInformation
(0x19).
:param dtc_status_mask: Used to select a portion of the DTCs based on their state.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(dtc_status_mask, suppress_response)
class ReportNumberOfEmissionsRelatedOBDDTCByStatusMaskResponse(
_ReadDTCType0Response,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportNumberOfEmissionsRelatedOBDDTCByStatusMask,
minimal_length=6,
maximal_length=6,
):
pass
class ReportNumberOfEmissionsRelatedOBDDTCByStatusMaskRequest(
_ReadDTCType0Request,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportNumberOfEmissionsRelatedOBDDTCByStatusMask,
response_type=ReportNumberOfEmissionsRelatedOBDDTCByStatusMaskResponse,
minimal_length=3,
maximal_length=3,
):
def __init__(self, dtc_status_mask: int, suppress_response: bool = False) -> None:
"""Read the number of emission related DTCs with the specified state from the UDS server.
This is an implementation of the UDS request for the
reportNumberOfEmissionsRelatedOBDDTCByStatusMask sub-function of the service
ReadDTCInformation (0x19).
:param dtc_status_mask: Used to select a portion of the DTCs based on their state.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(dtc_status_mask, suppress_response)
class ReportEmissionsRelatedOBDDTCByStatusMaskResponse(
_ReadDTCType1Response,
minimal_length=3,
maximal_length=None,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportEmissionsRelatedOBDDTCByStatusMask,
):
pass
class ReportEmissionsRelatedOBDDTCByStatusMaskRequest(
_ReadDTCType0Request,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportEmissionsRelatedOBDDTCByStatusMask,
response_type=ReportEmissionsRelatedOBDDTCByStatusMaskResponse,
minimal_length=3,
maximal_length=3,
):
def __init__(self, dtc_status_mask: int, suppress_response: bool = False) -> None:
"""Read the number of emission related DTCs with the specified state from the UDS server.
This is an implementation of the UDS request for the
reportEmissionsRelatedOBDDTCByStatusMask sub-function of the service ReadDTCInformation
(0x19).
:param dtc_status_mask: Used to select a portion of the DTCs based on their state.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(dtc_status_mask, suppress_response)
class ReportSupportedDTCResponse(
_ReadDTCType1Response,
minimal_length=3,
maximal_length=None,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportSupportedDTC,
):
pass
class ReportSupportedDTCRequest(
_ReadDTCType6Request,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportSupportedDTC,
response_type=ReportSupportedDTCResponse,
minimal_length=2,
maximal_length=2,
):
def __init__(self, dtc_status_mask: int, suppress_response: bool = False) -> None:
"""Read the supported DTCs from the UDS server.
This is an implementation of the UDS request for the
reportSupportedDTC sub-function of the service ReadDTCInformation (0x19).
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(suppress_response)
class ReportFirstTestFailedDTCResponse(
_ReadDTCType1Response,
minimal_length=3,
maximal_length=7,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportFirstTestFailedDTC,
):
pass
class ReportFirstTestFailedDTCRequest(
_ReadDTCType6Request,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportFirstTestFailedDTC,
response_type=ReportFirstTestFailedDTCResponse,
minimal_length=2,
maximal_length=2,
):
def __init__(self, dtc_status_mask: int, suppress_response: bool = False) -> None:
"""Read the first failed DTC since last clearance from the UDS server.
This is an implementation of the UDS request for the
reportFirstTestFailedDTC sub-function of the service ReadDTCInformation (0x19).
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(suppress_response)
class ReportFirstConfirmedDTCResponse(
_ReadDTCType1Response,
minimal_length=3,
maximal_length=7,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportFirstConfirmedDTC,
):
pass
class ReportFirstConfirmedDTCRequest(
_ReadDTCType6Request,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportFirstConfirmedDTC,
response_type=ReportFirstConfirmedDTCResponse,
minimal_length=2,
maximal_length=2,
):
def __init__(self, dtc_status_mask: int, suppress_response: bool = False) -> None:
"""Read the first confirmed DTC since last clearance from the UDS server.
This is an implementation of the UDS request for the
reportFirstConfirmedDTC sub-function of the service ReadDTCInformation (0x19).
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(suppress_response)
class ReportMostRecentTestFailedDTCResponse(
_ReadDTCType1Response,
minimal_length=3,
maximal_length=7,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportMostRecentTestFailedDTC,
):
pass
class ReportMostRecentFirstTestFailedDTCRequest(
_ReadDTCType6Request,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportMostRecentTestFailedDTC,
response_type=ReportMostRecentTestFailedDTCResponse,
minimal_length=2,
maximal_length=2,
):
def __init__(self, dtc_status_mask: int, suppress_response: bool = False) -> None:
"""Read the most recent failed DTC since last clearance from the UDS server.
This is an implementation of the UDS request for the
reportMostRecentTestFailedDTC sub-function of the service ReadDTCInformation (0x19).
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(suppress_response)
class ReportMostrecentConfirmedDTCResponse(
_ReadDTCType1Response,
minimal_length=3,
maximal_length=7,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportMostRecentConfirmedDTC,
):
pass
class ReportMostRecentConfirmedDTCRequest(
_ReadDTCType6Request,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportMostRecentConfirmedDTC,
response_type=ReportMostrecentConfirmedDTCResponse,
minimal_length=2,
maximal_length=2,
):
def __init__(self, dtc_status_mask: int, suppress_response: bool = False) -> None:
"""Read the most recent confirmed DTC since last clearance from the UDS server.
This is an implementation of the UDS request for the
reportMostRecentConfirmedDTC sub-function of the service ReadDTCInformation (0x19).
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(suppress_response)
class ReportDTCWithPermanentStatusResponse(
_ReadDTCType1Response,
minimal_length=3,
maximal_length=None,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportDTCWithPermanentStatus,
):
pass
class ReportDTCWithPermanentStatusRequest(
_ReadDTCType6Request,
service_id=UDSIsoServices.ReadDTCInformation,
sub_function_id=ReadDTCInformationSubFuncs.reportDTCWithPermanentStatus,
response_type=ReportDTCWithPermanentStatusResponse,
minimal_length=2,
maximal_length=2,
):
def __init__(self, dtc_status_mask: int, suppress_response: bool = False) -> None:
"""Read the DTCs with permanent status from the UDS server.
This is an implementation of the UDS request for the
reportDTCWithPermanentStatus sub-function of the service ReadDTCInformation (0x19).
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(suppress_response)
class ReadDTCInformation(
SpecializedSubFunctionService, service_id=UDSIsoServices.ReadDTCInformation
):
class ReportNumberOfDTCByStatusMask(
SubFunction,
sub_function_id=ReadDTCInformationSubFuncs.reportNumberOfDTCByStatusMask,
):
Response = ReportNumberOfDTCByStatusMaskResponse
Request = ReportNumberOfDTCByStatusMaskRequest
class ReportDTCByStatusMask(
SubFunction, sub_function_id=ReadDTCInformationSubFuncs.reportDTCByStatusMask
):
Response = ReportDTCByStatusMaskResponse
Request = ReportDTCByStatusMaskRequest
class ReportMirrorMemoryDTCByStatusMask(
SubFunction,
sub_function_id=ReadDTCInformationSubFuncs.reportMirrorMemoryDTCByStatusMask,
):
Response = ReportMirrorMemoryDTCByStatusMaskResponse
Request = ReportMirrorMemoryDTCByStatusMaskRequest
class ReportNumberOfMirrorMemoryDTCByStatusMask(
SubFunction,
sub_function_id=ReadDTCInformationSubFuncs.reportNumberOfMirrorMemoryDTCByStatusMask,
):
Response = ReportNumberOfMirrorMemoryDTCByStatusMaskResponse
Request = ReportNumberOfMirrorMemoryDTCByStatusMaskRequest
class ReportNumberOfEmissionsRelatedOBDDTCByStatusMask(
SubFunction,
sub_function_id=ReadDTCInformationSubFuncs.reportNumberOfEmissionsRelatedOBDDTCByStatusMask,
):
Response = ReportNumberOfEmissionsRelatedOBDDTCByStatusMaskResponse
Request = ReportNumberOfEmissionsRelatedOBDDTCByStatusMaskRequest
class ReportEmissionsRelatedOBDDTCByStatusMask(
SubFunction,
sub_function_id=ReadDTCInformationSubFuncs.reportEmissionsRelatedOBDDTCByStatusMask,
):
Response = ReportEmissionsRelatedOBDDTCByStatusMaskResponse
Request = ReportEmissionsRelatedOBDDTCByStatusMaskRequest
class ReportSupportedDTC(
SubFunction, sub_function_id=ReadDTCInformationSubFuncs.reportSupportedDTC
):
Response = ReportSupportedDTCResponse
Request = ReportSupportedDTCRequest
class ReportFirstTestFailedDTC(
SubFunction, sub_function_id=ReadDTCInformationSubFuncs.reportFirstTestFailedDTC
):
Response = ReportFirstTestFailedDTCResponse
Request = ReportFirstTestFailedDTCRequest
class ReportFirstConfirmedDTC(
SubFunction, sub_function_id=ReadDTCInformationSubFuncs.reportFirstConfirmedDTC
):
Response = ReportFirstConfirmedDTCResponse
Request = ReportFirstConfirmedDTCRequest
class ReportMostRecentTestFailedDTC(
SubFunction,
sub_function_id=ReadDTCInformationSubFuncs.reportMostRecentTestFailedDTC,
):
Response = ReportMostRecentTestFailedDTCResponse
Request = ReportMostRecentFirstTestFailedDTCRequest
class ReportMostRecentConfirmedDTC(
SubFunction,
sub_function_id=ReadDTCInformationSubFuncs.reportMostRecentConfirmedDTC,
):
Response = ReportMostrecentConfirmedDTCResponse
Request = ReportMostRecentConfirmedDTCRequest
class ReportDTCWithPermanentStatus(
SubFunction,
sub_function_id=ReadDTCInformationSubFuncs.reportDTCWithPermanentStatus,
):
Response = ReportDTCWithPermanentStatusResponse
Request = ReportDTCWithPermanentStatusRequest
# **************************************
# * Input output control by identifier *
# **************************************
class InputOutputControlByIdentifierResponse(
PositiveResponse,
service_id=UDSIsoServices.InputOutputControlByIdentifier,
minimal_length=4,
maximal_length=None,
):
def __init__(self, data_identifier: int, control_status_record: bytes) -> None:
super().__init__()
check_data_identifier(data_identifier)
if len(control_status_record) < 1:
raise ValueError("The controlStatusRecord must not be empty")
self.data_identifier = data_identifier
self.control_status_record = control_status_record
@property
def pdu(self) -> bytes:
return (
pack("!BH", self.RESPONSE_SERVICE_ID, self.data_identifier) + self.control_status_record
)
@classmethod
def _from_pdu(cls, pdu: bytes) -> InputOutputControlByIdentifierResponse:
data_identifier = from_bytes(pdu[1:3])
control_status_record = pdu[3:]
return InputOutputControlByIdentifierResponse(data_identifier, control_status_record)
def matches(self, request: UDSRequest) -> bool:
return (
isinstance(request, InputOutputControlByIdentifierRequest)
and self.data_identifier == request.data_identifier
)
class InputOutputControlByIdentifierRequest(
UDSRequest,
service_id=UDSIsoServices.InputOutputControlByIdentifier,
response_type=InputOutputControlByIdentifierResponse,
minimal_length=4,
maximal_length=None,
):
def __init__(
self,
data_identifier: int,
control_option_record: bytes,
control_enable_mask_record: bytes = b"",
) -> None:
"""Controls input or output values on the server.
This is an implementation of the UDS request for the service
InputOutputControlByIdentifier (0x2F).
This function exposes the parameters as in the corresponding specification,
hence is suitable for all variants of this service.
For the variants which use an inputOutputControlParameter as the first byte of the
controlOptionRecord, using the corresponding wrappers is recommended.
:param data_identifier: The data identifier of the value(s) to be controlled.
:param control_option_record: The controlStates, which specify the intended values of the
input / output parameters, optionally prefixed with an
inputOutputControlParameter or only an
inputOutputControlParameter.
:param control_enable_mask_record: In cases where the dataIdentifier corresponds to multiple
input / output parameters, this mask specifies which ones
should be affected by this request.
"""
check_data_identifier(data_identifier)
if len(control_option_record) < 1:
raise ValueError("The controlOptionRecord must not be empty")
self.data_identifier = data_identifier
self.control_option_record = control_option_record
self.control_enable_mask_record = control_enable_mask_record
@property
def pdu(self) -> bytes:
return (
pack("!BH", self.SERVICE_ID, self.data_identifier)
+ self.control_option_record
+ self.control_enable_mask_record
)
@classmethod
def _from_pdu(cls, pdu: bytes) -> InputOutputControlByIdentifierRequest:
# Because both the controlOptionRecord as well as the controlEnableMaskRecord are of
# variable size, and there is no field which describes those parameters,
# it is impossible for the server to determine those fields reliably without vendor or ECU
# specific knowledge.
# Therefore, similar to the implementation for ReadDataByIdentifier,
# the first variable parameter consumes all remaining data.
data_identifier = from_bytes(pdu[1:3])
control_option_record = pdu[3:]
control_enable_mask_record = b""
return InputOutputControlByIdentifierRequest(
data_identifier, control_option_record, control_enable_mask_record
)
class ReturnControlToECUResponse(
InputOutputControlByIdentifierResponse,
service_id=UDSIsoServices.InputOutputControlByIdentifier,
minimal_length=4,
maximal_length=None,
):
def __init__(self, data_identifier: int, control_states: bytes = b"") -> None:
super().__init__(
data_identifier,
bytes([InputOutputControlParameter.returnControlToECU]) + control_states,
)
def matches(self, request: UDSRequest) -> bool:
return super().matches(request) and isinstance(request, ReturnControlToECURequest)
class ReturnControlToECURequest(
InputOutputControlByIdentifierRequest,
service_id=UDSIsoServices.InputOutputControlByIdentifier,
response_type=ReturnControlToECUResponse,
minimal_length=4,
maximal_length=None,
):
def __init__(self, data_identifier: int, control_enable_mask_record: bytes = b"") -> None:
"""Gives the control over input / output parameters back to the ECU.
This is a convenience wrapper of the generic request for the case where an
inputOutputControlParameter is used and is set to returnControlToECU. In that case no
further controlState parameters can be submitted.
:param data_identifier: The data identifier of the value(s) for which control should be
returned to the ECU.
:param control_enable_mask_record: In cases where the dataIdentifier corresponds to multiple
input / output parameters, this mask specifies which ones
should be affected by this request.
"""
super().__init__(
data_identifier,
bytes([InputOutputControlParameter.returnControlToECU]),
control_enable_mask_record,
)
@classmethod
def _from_pdu(cls, pdu: bytes) -> ReturnControlToECURequest:
data_identifier = from_bytes(pdu[1:3])
control_enable_mask_record = pdu[4:]
return ReturnControlToECURequest(data_identifier, control_enable_mask_record)
class ResetToDefaultResponse(
InputOutputControlByIdentifierResponse,
service_id=UDSIsoServices.InputOutputControlByIdentifier,
minimal_length=4,
maximal_length=None,
):
def __init__(self, data_identifier: int, control_states: bytes = b"") -> None:
super().__init__(
data_identifier,
bytes([InputOutputControlParameter.resetToDefault]) + control_states,
)
def matches(self, request: UDSRequest) -> bool:
return super().matches(request) and isinstance(request, ResetToDefaultRequest)
class ResetToDefaultRequest(
InputOutputControlByIdentifierRequest,
response_type=ResetToDefaultResponse,
service_id=UDSIsoServices.InputOutputControlByIdentifier,
minimal_length=4,
maximal_length=None,
):
def __init__(self, data_identifier: int, control_enable_mask_record: bytes = b"") -> None:
"""Sets the input / output parameters to the default value(s).
This is a convenience wrapper of the generic request for the case where an
inputOutputControlParameter is used and is set to resetToDefault.
In that case no further controlState parameters can be submitted.
:param data_identifier: The data identifier of the value(s) for which the values should be
reset.
:param control_enable_mask_record: In cases where the dataIdentifier corresponds to multiple
input / output parameters, this mask specifies which ones
should be affected by this request.
"""
super().__init__(
data_identifier,
bytes([InputOutputControlParameter.resetToDefault]),
control_enable_mask_record,
)
@classmethod
def _from_pdu(cls, pdu: bytes) -> ResetToDefaultRequest:
data_identifier = from_bytes(pdu[1:3])
control_enable_mask_record = pdu[4:]
return ResetToDefaultRequest(data_identifier, control_enable_mask_record)
class FreezeCurrentStateResponse(
InputOutputControlByIdentifierResponse,
service_id=UDSIsoServices.InputOutputControlByIdentifier,
minimal_length=4,
maximal_length=None,
):
def __init__(self, data_identifier: int, control_states: bytes = b"") -> None:
super().__init__(
data_identifier,
bytes([InputOutputControlParameter.freezeCurrentState]) + control_states,
)
def matches(self, request: UDSRequest) -> bool:
return super().matches(request) and isinstance(request, FreezeCurrentStateResponse)
class FreezeCurrentStateRequest(
InputOutputControlByIdentifierRequest,
response_type=FreezeCurrentStateResponse,
service_id=UDSIsoServices.InputOutputControlByIdentifier,
minimal_length=4,
maximal_length=None,
):
def __init__(self, data_identifier: int, control_enable_mask_record: bytes = b"") -> None:
"""Freezes the input / output parameters at their current state.
This is a convenience wrapper of the generic request for the case where an
inputOutputControlParameter is used and is set to freezeCurrentState.
In that case no further controlState parameters can be submitted.
:param data_identifier: The data identifier of the value(s) to be frozen.
:param control_enable_mask_record: In cases where the dataIdentifier corresponds to multiple
input / output parameters, this mask specifies which ones
should be affected by this request.
"""
super().__init__(
data_identifier,
bytes([InputOutputControlParameter.freezeCurrentState]),
control_enable_mask_record,
)
@classmethod
def _from_pdu(cls, pdu: bytes) -> FreezeCurrentStateRequest:
data_identifier = from_bytes(pdu[1:3])
control_enable_mask_record = pdu[4:]
return FreezeCurrentStateRequest(data_identifier, control_enable_mask_record)
class ShortTermAdjustmentResponse(
InputOutputControlByIdentifierResponse,
service_id=UDSIsoServices.InputOutputControlByIdentifier,
minimal_length=4,
maximal_length=None,
):
def __init__(self, data_identifier: int, control_states: bytes = b"") -> None:
super().__init__(
data_identifier,
bytes([InputOutputControlParameter.shortTermAdjustment]) + control_states,
)
class ShortTermAdjustmentRequest(
InputOutputControlByIdentifierRequest,
response_type=ShortTermAdjustmentResponse,
service_id=UDSIsoServices.InputOutputControlByIdentifier,
minimal_length=5,
maximal_length=None,
):
def __init__(
self,
data_identifier: int,
control_states: bytes,
control_enable_mask_record: bytes = b"",
) -> None:
"""Sets the input / output parameters as specified in the controlOptionRecord.
This is a convenience wrapper of the generic request for the case
where an inputOutputControlParameter is used and is set to freezeCurrentState.
In that case controlState parameters are required.
:param data_identifier: The data identifier of the value(s) to be adjusted.
:param control_states: The controlStates, which specify the intended values of the input /
output parameters.
:param control_enable_mask_record: In cases where the dataIdentifier corresponds to multiple
input / output parameters, this mask specifies which ones
should be affected by this request.
"""
control_option_record = (
bytes([InputOutputControlParameter.shortTermAdjustment]) + control_states
)
super().__init__(data_identifier, control_option_record, control_enable_mask_record)
class InputOutputControlByIdentifier(
UDSService, service_id=UDSIsoServices.InputOutputControlByIdentifier
):
Response = InputOutputControlByIdentifierResponse
Request = InputOutputControlByIdentifierRequest
class ReturnControlToECU:
Response = ReturnControlToECUResponse
Request = ReturnControlToECURequest
class ResetToDefault:
Response = ResetToDefaultResponse
Request = ResetToDefaultRequest
class FreezeCurrentState:
Response = FreezeCurrentStateResponse
Request = FreezeCurrentStateRequest
class ShortTermAdjustment:
Response = ShortTermAdjustmentResponse
Request = ShortTermAdjustmentRequest
# *******************
# * Routine control *
# *******************
T_RoutineControlResponse = TypeVar("T_RoutineControlResponse", bound="RoutineControlResponse")
class RoutineControlResponse(
SpecializedSubFunctionResponse,
ABC,
service_id=UDSIsoServices.RoutineControl,
sub_function_id=0,
minimal_length=4,
maximal_length=None,
):
@property
def pdu(self) -> bytes:
return (
pack(
"!BBH",
self.RESPONSE_SERVICE_ID,
self.sub_function,
self.routine_identifier,
)
+ self.routine_status_record
)
@classmethod
def _from_pdu(cls: type[T_RoutineControlResponse], pdu: bytes) -> T_RoutineControlResponse:
routine_identifier = from_bytes(pdu[2:4])
routine_status_record = pdu[4:]
return cls(routine_identifier, routine_status_record)
def __init__(self, routine_identifier: int, routine_status_record: bytes = b"") -> None:
super().__init__()
self.routine_control_type = self.sub_function
self.routine_identifier = routine_identifier
self.routine_status_record = routine_status_record
def matches(self, request: UDSRequest) -> bool:
return (
isinstance(request, RoutineControlRequest)
and self.routine_control_type == request.routine_control_type
and self.routine_identifier == request.routine_identifier
)
T_RoutineControlRequest = TypeVar("T_RoutineControlRequest", bound="RoutineControlRequest")
class RoutineControlRequest(
SpecializedSubFunctionRequest,
ABC,
service_id=UDSIsoServices.RoutineControl,
sub_function_id=0,
response_type=RoutineControlResponse,
minimal_length=4,
maximal_length=None,
):
def __init__(
self,
routine_identifier: int,
routine_control_option_record: bytes = b"",
suppress_response: bool = False,
) -> None:
super().__init__(suppress_response)
check_range(routine_identifier, "routineIdentifier", 0, 0xFFFF)
self.routine_control_type = self.sub_function
self.routine_identifier = routine_identifier
self.routine_control_option_record = routine_control_option_record
@property
def pdu(self) -> bytes:
return (
pack(
"!BBH",
self.SERVICE_ID,
self.sub_function_with_suppress_response_bit,
self.routine_identifier,
)
+ self.routine_control_option_record
)
@classmethod
def _from_pdu(cls: type[T_RoutineControlRequest], pdu: bytes) -> T_RoutineControlRequest:
routine_identifier = from_bytes(pdu[2:4])
routine_control_option_record = pdu[4:]
return cls(
routine_identifier,
routine_control_option_record,
cls.suppress_response_set(pdu),
)
class StartRoutineResponse(
RoutineControlResponse,
service_id=UDSIsoServices.RoutineControl,
sub_function_id=RoutineControlSubFuncs.startRoutine,
minimal_length=4,
maximal_length=None,
):
pass
class StartRoutineRequest(
RoutineControlRequest,
service_id=UDSIsoServices.RoutineControl,
sub_function_id=RoutineControlSubFuncs.startRoutine,
response_type=StartRoutineResponse,
minimal_length=4,
maximal_length=None,
):
def __init__(
self,
routine_identifier: int,
routine_control_option_record: bytes = b"",
suppress_response: bool = False,
) -> None:
"""Starts a specific routine on the server.
This is an implementation of the UDS request for the startRoutine sub-function of the
service routineControl (0x31).
:param routine_identifier: The identifier of the routine.
:param routine_control_option_record: Optional data.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(routine_identifier, routine_control_option_record, suppress_response)
class StopRoutineResponse(
RoutineControlResponse,
service_id=UDSIsoServices.RoutineControl,
sub_function_id=RoutineControlSubFuncs.stopRoutine,
minimal_length=4,
maximal_length=None,
):
pass
class StopRoutineRequest(
RoutineControlRequest,
service_id=UDSIsoServices.RoutineControl,
sub_function_id=RoutineControlSubFuncs.stopRoutine,
response_type=StopRoutineResponse,
minimal_length=4,
maximal_length=None,
):
def __init__(
self,
routine_identifier: int,
routine_control_option_record: bytes = b"",
suppress_response: bool = False,
) -> None:
"""Stops a specific routine on the server.
This is an implementation of the UDS request for the stopRoutine sub-function of the service
routineControl (0x31).
:param routine_identifier: The identifier of the routine.
:param routine_control_option_record: Optional data.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(routine_identifier, routine_control_option_record, suppress_response)
class RequestRoutineResultsResponse(
RoutineControlResponse,
minimal_length=4,
maximal_length=None,
service_id=UDSIsoServices.RoutineControl,
sub_function_id=RoutineControlSubFuncs.requestRoutineResults,
):
pass
class RequestRoutineResultsRequest(
RoutineControlRequest,
service_id=UDSIsoServices.RoutineControl,
sub_function_id=RoutineControlSubFuncs.requestRoutineResults,
response_type=RequestRoutineResultsResponse,
minimal_length=4,
maximal_length=None,
):
def __init__(
self,
routine_identifier: int,
routine_control_option_record: bytes = b"",
suppress_response: bool = False,
) -> None:
"""Requests the results of a specific routine on the server.
This is an implementation of the UDS request for the requestRoutineResults sub-function of
the service routineControl (0x31).
:param routine_identifier: The identifier of the routine.
:param routine_control_option_record: Optional data.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
super().__init__(routine_identifier, routine_control_option_record, suppress_response)
class RoutineControl(SpecializedSubFunctionService, service_id=UDSIsoServices.RoutineControl):
class StartRoutine(SubFunction, sub_function_id=RoutineControlSubFuncs.startRoutine):
Request = StartRoutineRequest
Response = StartRoutineResponse
class StopRoutine(SubFunction, sub_function_id=RoutineControlSubFuncs.stopRoutine):
Request = StopRoutineRequest
Response = StopRoutineResponse
class RequestRoutineResults(
SubFunction, sub_function_id=RoutineControlSubFuncs.requestRoutineResults
):
Request = RequestRoutineResultsRequest
Response = RequestRoutineResultsResponse
# ********************
# * Request download *
# ********************
T_RequestUpOrDownloadResponse = TypeVar(
"T_RequestUpOrDownloadResponse", bound="_RequestUpOrDownloadResponse"
)
class _RequestUpOrDownloadResponse(
PositiveResponse, service_id=None, minimal_length=3, maximal_length=None
):
def __init__(
self,
max_number_of_block_length: int,
length_format_identifier: int | None = None,
) -> None:
super().__init__()
if length_format_identifier is not None:
if not 0 <= length_format_identifier <= 0xF0 or length_format_identifier % 2**4 > 0:
raise ValueError(
f"Invalid value for lengthFormatIdentifier: " f"{length_format_identifier}"
)
uds_memory_parameters(0, max_number_of_block_length, length_format_identifier + 1)
else:
length_format_identifier, _, _ = uds_memory_parameters(0, max_number_of_block_length)
self.max_number_of_block_length = max_number_of_block_length
self.length_format_identifier = length_format_identifier - (length_format_identifier % 2**4)
@property
def pdu(self) -> bytes:
max_number_of_block_length = to_bytes(
self.max_number_of_block_length, self.length_format_identifier // 2**4
)
return (
pack("BB", self.RESPONSE_SERVICE_ID, self.length_format_identifier)
+ max_number_of_block_length
)
@classmethod
def _from_pdu(
cls: type[T_RequestUpOrDownloadResponse], pdu: bytes
) -> T_RequestUpOrDownloadResponse:
length_format_identifier = pdu[1]
max_number_of_block_length = from_bytes(pdu[2:])
return cls(max_number_of_block_length, length_format_identifier)
def matches(self, request: UDSRequest) -> bool:
return (
isinstance(request, _RequestUpOrDownloadRequest)
and request.SERVICE_ID == self.SERVICE_ID
)
T_RequestUpOrDownloadRequest = TypeVar(
"T_RequestUpOrDownloadRequest", bound="_RequestUpOrDownloadRequest"
)
class _RequestUpOrDownloadRequest(
UDSRequest,
service_id=None,
response_type=_RequestUpOrDownloadResponse,
minimal_length=4,
maximal_length=None,
):
def __init__( # noqa: PLR0913
self,
memory_address: int,
memory_size: int,
compression_method: int = 0x0,
encryption_method: int = 0x0,
address_and_length_format_identifier: int | None = None,
) -> None:
check_range(compression_method, "compressionMethod", 0, 0xF)
check_range(encryption_method, "encryptionMethod", 0, 0xF)
if address_and_length_format_identifier is not None:
check_range(
address_and_length_format_identifier,
"addressAndLengthFormatIdentifier",
0,
0xFF,
)
self.memory_address = memory_address
self.memory_size = memory_size
self.compression_method = compression_method
self.encryption_method = encryption_method
self.address_and_length_format_identifier, _, _ = uds_memory_parameters(
memory_address, memory_size, address_and_length_format_identifier
)
@property
def pdu(self) -> bytes:
data_format_identifier = (self.compression_method << 4) | self.encryption_method
addr_and_len_format_id, address_bytes, size_bytes = uds_memory_parameters(
self.memory_address,
self.memory_size,
self.address_and_length_format_identifier,
)
pdu = struct.pack("!BBB", self.SERVICE_ID, data_format_identifier, addr_and_len_format_id)
pdu += address_bytes + size_bytes
return pdu
@classmethod
def _from_pdu(
cls: type[T_RequestUpOrDownloadRequest], pdu: bytes
) -> T_RequestUpOrDownloadRequest:
data_format_identifier = pdu[1]
address_and_length_format_identifier = pdu[2]
address_length, size_length = address_and_size_length(address_and_length_format_identifier)
if len(pdu) != 3 + address_length + size_length:
raise ValueError("The addressAndLengthIdentifier is incompatible with the PDU size")
return cls(
from_bytes(pdu[3 : 3 + address_length]),
from_bytes(pdu[3 + address_length : 3 + address_length + size_length]),
data_format_identifier // 2**4,
data_format_identifier % 2**4,
address_and_length_format_identifier,
)
class RequestDownloadResponse(
_RequestUpOrDownloadResponse,
minimal_length=3,
maximal_length=None,
service_id=UDSIsoServices.RequestDownload,
):
pass
class RequestDownloadRequest(
_RequestUpOrDownloadRequest,
service_id=UDSIsoServices.RequestDownload,
response_type=RequestDownloadResponse,
minimal_length=4,
maximal_length=None,
):
def __init__( # noqa: PLR0913
self,
memory_address: int,
memory_size: int,
compression_method: int = 0x0,
encryption_method: int = 0x0,
address_and_length_format_identifier: int | None = None,
) -> None:
"""Requests the download of data, i.e. the possibility to send data from the client to the
server.
This is an implementation of the UDS request for requestDownload (0x34).
:param memory_address: The address at which data should be downloaded.
:param memory_size: The number of bytes to be downloaded.
:param compression_method: Encodes the utilized compressionFormat (0x0 for none)
:param encryption_method: Encodes the utilized encryptionFormat (0x0 for none)
:param address_and_length_format_identifier: The byte lengths of the memory address and
size. If omitted, this parameter is computed
based on the memory_address and
memory_size parameters.
"""
super().__init__(
memory_address,
memory_size,
compression_method,
encryption_method,
address_and_length_format_identifier,
)
class RequestDownload(UDSService, service_id=UDSIsoServices.RequestDownload):
Response = RequestDownloadResponse
Request = RequestDownloadRequest
# ******************
# * Request Upload *
# ******************
class RequestUploadResponse(
_RequestUpOrDownloadResponse,
service_id=UDSIsoServices.RequestUpload,
minimal_length=3,
maximal_length=None,
):
pass
class RequestUploadRequest(
_RequestUpOrDownloadRequest,
service_id=UDSIsoServices.RequestUpload,
response_type=RequestUploadResponse,
minimal_length=4,
maximal_length=None,
):
def __init__( # noqa: PLR0913
self,
memory_address: int,
memory_size: int,
compression_method: int = 0x0,
encryption_method: int = 0x0,
address_and_length_format_identifier: int | None = None,
) -> None:
"""Requests the upload of data, i.e. the possibility to receive data from the server.
This is an implementation of the UDS request for requestUpload (0x35).
:param memory_address: The address at which data should be uploaded.
:param memory_size: The number of bytes to be uploaded.
:param compression_method: Encodes the utilized compressionFormat (0x0 for none)
:param encryption_method: Encodes the utilized encryptionFormat (0x0 for none)
:param address_and_length_format_identifier: The byte lengths of the memory address and
size. If omitted, this parameter is computed
based on the memory_address and memory_size
parameters.
"""
super().__init__(
memory_address,
memory_size,
compression_method,
encryption_method,
address_and_length_format_identifier,
)
class RequestUpload(UDSService, service_id=UDSIsoServices.RequestUpload):
Response = RequestUploadResponse
Request = RequestUploadRequest
# *****************
# * Transfer data *
# *****************
class TransferDataResponse(
PositiveResponse,
service_id=UDSIsoServices.TransferData,
minimal_length=2,
maximal_length=None,
):
def __init__(
self,
block_sequence_counter: int,
transfer_response_parameter_record: bytes = b"",
) -> None:
super().__init__()
check_range(block_sequence_counter, "blockSequenceCounter", 0, 0xFF)
self.block_sequence_counter = block_sequence_counter
self.transfer_response_parameter_record = transfer_response_parameter_record
@classmethod
def _from_pdu(cls, pdu: bytes) -> TransferDataResponse:
block_sequence_counter = pdu[1]
transfer_response_parameter_record = pdu[2:]
return TransferDataResponse(block_sequence_counter, transfer_response_parameter_record)
@property
def pdu(self) -> bytes:
return (
pack("!BB", self.RESPONSE_SERVICE_ID, self.block_sequence_counter)
+ self.transfer_response_parameter_record
)
def matches(self, request: UDSRequest) -> bool:
return (
isinstance(request, TransferDataRequest)
and self.block_sequence_counter == request.block_sequence_counter
)
class TransferDataRequest(
UDSRequest,
service_id=UDSIsoServices.TransferData,
response_type=TransferDataResponse,
minimal_length=2,
maximal_length=None,
):
def __init__(
self,
block_sequence_counter: int,
transfer_request_parameter_record: bytes = b"",
) -> None:
"""Transfers data to the server or requests the next data from the server.
This is an implementation of the UDS request for transferData (0x36).
:param block_sequence_counter: The current block sequence counter.
Initialized with one and incremented for each new data.
After 0xff, the counter is resumed at 0
:param transfer_request_parameter_record: Contains the data to be transferred if downloading
to the server.
"""
check_range(block_sequence_counter, "blockSequenceCounter", 0, 0xFF)
self.block_sequence_counter = block_sequence_counter
self.transfer_request_parameter_record = transfer_request_parameter_record
@classmethod
def _from_pdu(cls, pdu: bytes) -> TransferDataRequest:
block_sequence_counter = pdu[1]
transfer_request_parameter_record = pdu[2:]
return TransferDataRequest(block_sequence_counter, transfer_request_parameter_record)
@property
def pdu(self) -> bytes:
return (
pack("!BB", self.SERVICE_ID, self.block_sequence_counter)
+ self.transfer_request_parameter_record
)
class TransferData(UDSService, service_id=UDSIsoServices.TransferData):
Response = TransferDataResponse
Request = TransferDataRequest
# *************************
# * Request transfer exit *
# *************************
class RequestTransferExitResponse(
PositiveResponse,
service_id=UDSIsoServices.RequestTransferExit,
minimal_length=1,
maximal_length=None,
):
def __init__(self, transfer_response_parameter_record: bytes = b"") -> None:
super().__init__()
self.transfer_response_parameter_record = transfer_response_parameter_record
@property
def pdu(self) -> bytes:
assert self.RESPONSE_SERVICE_ID is not None
return bytes([self.RESPONSE_SERVICE_ID]) + self.transfer_response_parameter_record
@classmethod
def _from_pdu(cls, pdu: bytes) -> RequestTransferExitResponse:
transfer_response_parameter_record = pdu[1:]
return RequestTransferExitResponse(transfer_response_parameter_record)
def matches(self, request: UDSRequest) -> bool:
return isinstance(request, RequestTransferExitRequest)
class RequestTransferExitRequest(
UDSRequest,
service_id=UDSIsoServices.RequestTransferExit,
response_type=RequestTransferExitResponse,
minimal_length=1,
maximal_length=None,
):
def __init__(self, transfer_request_parameter_record: bytes = b"") -> None:
"""Ends the transfer of data.
This is an implementation of the UDS request for requestTransferExit (0x77).
:param transfer_request_parameter_record: Optional data.
"""
self.transfer_request_parameter_record = transfer_request_parameter_record
@property
def pdu(self) -> bytes:
assert self.SERVICE_ID is not None
return bytes([self.SERVICE_ID]) + self.transfer_request_parameter_record
@classmethod
def _from_pdu(cls, pdu: bytes) -> RequestTransferExitRequest:
transfer_request_parameter_record = pdu[1:]
return RequestTransferExitRequest(transfer_request_parameter_record)
class RequestTransferExit(UDSService, service_id=UDSIsoServices.RequestTransferExit):
Response = RequestTransferExitResponse
Request = RequestTransferExitRequest
# *************************
# * Request file transfer *
# *************************