# SPDX-FileCopyrightText: AISEC Pentesting Team
#
# SPDX-License-Identifier: Apache-2.0
import json
from argparse import ArgumentParser, BooleanOptionalAction, Namespace
import aiofiles
from gallia.command.base import FileNames, Scanner
from gallia.config import Config
from gallia.log import get_logger
from gallia.plugins import load_ecu, load_ecu_plugins
from gallia.services.uds.core.service import NegativeResponse, UDSResponse
from gallia.services.uds.ecu import ECU
from gallia.services.uds.helpers import raise_for_error
logger = get_logger("gallia.base.udsscan")
[docs]
class UDSScanner(Scanner):
"""UDSScanner is a baseclass, particularly for scanning tasks
related to the UDS protocol. The differences to Scanner are:
- `self.ecu` contains a OEM specific UDS client object.
- A background tasks sends TesterPresent regularly to avoid timeouts.
"""
GROUP = "scan"
SUBGROUP: str | None = "uds"
def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None:
super().__init__(parser, config)
self.ecu: ECU
self._implicit_logging = True
def configure_class_parser(self) -> None:
super().configure_class_parser()
group = self.parser.add_argument_group("UDS scanner related arguments")
choices = ["default"] + [x.OEM for x in load_ecu_plugins()]
group.add_argument(
"--ecu-reset",
const=0x01,
nargs="?",
default=self.config.get_value("gallia.protocols.uds.ecu_reset"),
help="Trigger an initial ecu_reset via UDS; reset level is optional",
)
group.add_argument(
"--oem",
default=self.config.get_value("gallia.protocols.uds.oem", "default"),
choices=choices,
metavar="OEM",
help="The OEM of the ECU, used to choose a OEM specific ECU implementation",
)
group.add_argument(
"--timeout",
default=self.config.get_value("gallia.protocols.uds.timeout", 2),
type=float,
metavar="SECONDS",
help="Timeout value to wait for a response from the ECU",
)
group.add_argument(
"--max-retries",
default=self.config.get_value("gallia.protocols.uds.max_retries", 3),
type=int,
metavar="INT",
help="Number of maximum retries while sending UDS requests",
)
group.add_argument(
"--ping",
action=BooleanOptionalAction,
default=self.config.get_value("gallia.protocols.uds.ping", True),
help="Enable/Disable initial TesterPresent request",
)
group.add_argument(
"--tester-present-interval",
default=self.config.get_value("gallia.protocols.uds.tester_present_interval", 0.5),
type=float,
metavar="SECONDS",
help="Modify the interval of the cyclic tester present packets",
)
group.add_argument(
"--tester-present",
action=BooleanOptionalAction,
default=self.config.get_value("gallia.protocols.uds.tester_present", True),
help="Enable/Disable tester present background worker",
)
group.add_argument(
"--properties",
default=self.config.get_value("gallia.protocols.uds.properties", True),
action=BooleanOptionalAction,
help="Read and store the ECU proporties prior and after scan",
)
group.add_argument(
"--compare-properties",
default=self.config.get_value("gallia.protocols.uds.compare_properties", True),
action=BooleanOptionalAction,
help="Compare properties before and after the scan",
)
@property
def implicit_logging(self) -> bool:
return self._implicit_logging
@implicit_logging.setter
def implicit_logging(self, value: bool) -> None:
self._implicit_logging = value
if self.db_handler is not None:
self._apply_implicit_logging_setting()
def _apply_implicit_logging_setting(self) -> None:
self.ecu.implicit_logging = self._implicit_logging
async def setup(self, args: Namespace) -> None:
await super().setup(args)
self.ecu = load_ecu(args.oem)(
self.transport,
timeout=args.timeout,
max_retry=args.max_retries,
power_supply=self.power_supply,
)
self.ecu.db_handler = self.db_handler
if self.db_handler is not None:
try:
# No idea, but str(args.target) fails with a strange traceback.
# Lets use the attribute directly…
await self.db_handler.insert_scan_run(args.target.raw)
self._apply_implicit_logging_setting()
except Exception as e:
logger.warning(f"Could not write the scan run to the database: {e:!r}")
if args.ecu_reset is not None:
resp: UDSResponse = await self.ecu.ecu_reset(args.ecu_reset)
if isinstance(resp, NegativeResponse):
logger.warning(f"ECUReset failed: {resp}")
logger.warning("Switching to default session")
raise_for_error(await self.ecu.set_session(0x01))
resp = await self.ecu.ecu_reset(args.ecu_reset)
if isinstance(resp, NegativeResponse):
logger.warning(f"ECUReset in session 0x01 failed: {resp}")
# Handles connecting to the target and waits
# until it is ready.
if args.ping:
await self.ecu.wait_for_ecu()
await self.ecu.connect()
if args.tester_present:
await self.ecu.start_cyclic_tester_present(args.tester_present_interval)
if args.properties is True:
path = self.artifacts_dir.joinpath(FileNames.PROPERTIES_PRE.value)
async with aiofiles.open(path, "w") as file:
await file.write(json.dumps(await self.ecu.properties(True), indent=4))
await file.write("\n")
if self.db_handler is not None:
try:
await self.db_handler.insert_scan_run_properties_pre(await self.ecu.properties())
self._apply_implicit_logging_setting()
except Exception as e:
logger.warning(f"Could not write the properties_pre to the database: {e!r}")
async def teardown(self, args: Namespace) -> None:
if args.properties is True and not self.ecu.transport.is_closed:
path = self.artifacts_dir.joinpath(FileNames.PROPERTIES_POST.value)
async with aiofiles.open(path, "w") as file:
await file.write(json.dumps(await self.ecu.properties(True), indent=4))
await file.write("\n")
path_pre = self.artifacts_dir.joinpath(FileNames.PROPERTIES_PRE.value)
async with aiofiles.open(path_pre, "r") as file:
prop_pre = json.loads(await file.read())
if args.compare_properties and await self.ecu.properties(False) != prop_pre:
logger.warning("ecu properties differ, please investigate!")
if self.db_handler is not None:
try:
await self.db_handler.complete_scan_run(await self.ecu.properties(False))
except Exception as e:
logger.warning(f"Could not write the scan run to the database: {e!r}")
if args.tester_present:
await self.ecu.stop_cyclic_tester_present()
# This must be the last one.
await super().teardown(args)
[docs]
class UDSDiscoveryScanner(Scanner):
GROUP = "discover"
def configure_class_parser(self) -> None:
super().configure_class_parser()
self.parser.add_argument(
"--timeout",
type=float,
default=self.config.get_value("gallia.scanner.timeout", 0.5),
help="timeout value for request",
)
async def setup(self, args: Namespace) -> None:
await super().setup(args)
if self.db_handler is not None:
try:
await self.db_handler.insert_discovery_run(args.target.url.scheme)
except Exception as e:
logger.warning(f"Could not write the discovery run to the database: {e!r}")