Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
some llm trash and some ssh magic
  • Loading branch information
sshane committed Jul 26, 2025
commit 88024a107660dc259509b792695766ca0f7a5563
55 changes: 55 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,60 @@
from opendbc.car import *
from opendbc.car.structs import car
import pytest
# pytest attempts to execute shell scripts while collecting

collect_ignore_glob = [
"opendbc/safety/tests/misra/*.sh",
"opendbc/safety/tests/misra/cppcheck/",
]

from concurrent.futures import ThreadPoolExecutor
import xdist.workermanage as wm

# def setup_nodes_concurrent(self, putevent):
# self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs)
# self.trace("setting up nodes (concurrent)")
# t = time.monotonic()
# with ThreadPoolExecutor(max_workers=len(self.specs)) as pool:
# futs = [pool.submit(self.setup_node, spec, putevent) for spec in self.specs]
# ret = [f.result() for f in futs]
# print(f"setup_nodes_concurrent took {time.monotonic() - t:.3f} seconds")
# return ret

# wm.NodeManager.setup_nodes = setup_nodes_concurrent

# conftest.py (fixed)
import time, pytest

_T0 = None # will hold the perf‑counter at startup
_PHASES = {} # elapsed durations we actually care about


# ---------- bootstrap ----------
def pytest_cmdline_main(config):
global _T0
_T0 = time.perf_counter() # mark very first moment


# ---------- collection ----------
def pytest_sessionstart(session):
now = time.perf_counter()
_PHASES['collection'] = now - _T0
_PHASES['_run_start'] = now # internal: start of test run


# ---------- tests done ----------
def pytest_sessionfinish(session, exitstatus):
end = time.perf_counter()
_PHASES['tests'] = end - _PHASES['_run_start']
_PHASES['total'] = end - _T0
_PHASES.pop('_run_start', None) # remove internal key



@pytest.hookimpl(trylast=True)
def pytest_terminal_summary(terminalreporter, exitstatus, config):
tr = terminalreporter
tr.section("pytest overhead")
for k, v in _PHASES.items():
tr.write_line(f"{k:>10}: {v:8.3f} s")
100 changes: 58 additions & 42 deletions opendbc/car/tests/test_can_fingerprint.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,71 @@
import pytest
import unittest
from parameterized import parameterized

from opendbc.car.can_definitions import CanData
from opendbc.car.car_helpers import FRAME_FINGERPRINT, can_fingerprint
from opendbc.car.fingerprints import _FINGERPRINTS as FINGERPRINTS


class TestCanFingerprint:
@pytest.mark.parametrize("car_model, fingerprints", FINGERPRINTS.items())
def test_can_fingerprint(self, car_model, fingerprints):
"""Tests online fingerprinting function on offline fingerprints"""

for fingerprint in fingerprints: # can have multiple fingerprints for each platform
can = [CanData(address=address, dat=b'\x00' * length, src=src)
for address, length in fingerprint.items() for src in (0, 1)]

fingerprint_iter = iter([can])
car_fingerprint, finger = can_fingerprint(lambda **kwargs: [next(fingerprint_iter, [])]) # noqa: B023

assert car_fingerprint == car_model
assert finger[0] == fingerprint
assert finger[1] == fingerprint
assert finger[2] == {}
class TestCanFingerprint(unittest.TestCase):

def test_timing(self, subtests):
# just pick any CAN fingerprinting car
car_model = "CHEVROLET_BOLT_EUV"
fingerprint = FINGERPRINTS[car_model][0]
@parameterized.expand([
(car_model, fingerprint)
for car_model, fps in FINGERPRINTS.items()
for fingerprint in fps
])
def test_can_fingerprint(self, car_model, fingerprint):
"""Tests online fingerprinting function on offline fingerprints"""
can = [
CanData(address=addr, dat=b'\x00' * length, src=src)
for addr, length in fingerprint.items()
for src in (0, 1)
]
fingerprint_iter = iter([can])
car_fp, finger = can_fingerprint(
lambda **kwargs: [next(fingerprint_iter, [])] # noqa: B023
)
self.assertEqual(car_fp, car_model)
self.assertEqual(finger[0], fingerprint)
self.assertEqual(finger[1], fingerprint)
self.assertEqual(finger[2], {})

cases = []
def test_timing(self):
"""Ensure we iterate the correct number of frames before deciding"""
# pick a known model fingerprint
model = "CHEVROLET_BOLT_EUV"
fp = FINGERPRINTS[model][0]

# case 1 - one match, make sure we keep going for 100 frames
can = [CanData(address=address, dat=b'\x00' * length, src=src)
for address, length in fingerprint.items() for src in (0, 1)]
cases.append((FRAME_FINGERPRINT, car_model, can))
cases = [
# (frames_to_run, expected_car, can_frame)
(FRAME_FINGERPRINT, model, [
CanData(address=addr, dat=b'\x00' * length, src=src)
for addr, length in fp.items()
for src in (0, 1)
]),
(FRAME_FINGERPRINT, None, [
CanData(address=1, dat=b'\x00', src=src)
for src in (0, 1)
]),
(FRAME_FINGERPRINT * 2, None, [
CanData(address=2016, dat=b'\x00' * 8, src=src)
for src in (0, 1)
]),
]

# case 2 - no matches, make sure we keep going for 100 frames
can = [CanData(address=1, dat=b'\x00' * 1, src=src) for src in (0, 1)] # uncommon address
cases.append((FRAME_FINGERPRINT, None, can))
for expected_frames, expected_car, can in cases:
with self.subTest(expected_frames=expected_frames, car_model=expected_car):
frames = 0

# case 3 - multiple matches, make sure we keep going for 200 frames to try to eliminate some
can = [CanData(address=2016, dat=b'\x00' * 8, src=src) for src in (0, 1)] # common address
cases.append((FRAME_FINGERPRINT * 2, None, can))
def can_recv(**kwargs):
nonlocal frames
frames += 1
return [can]

for expected_frames, car_model, can in cases:
with subtests.test(expected_frames=expected_frames, car_model=car_model):
frames = 0
car_fp, _ = can_fingerprint(can_recv)
self.assertEqual(car_fp, expected_car)
# implementation currently does +2 extra frames
self.assertEqual(frames, expected_frames + 2)

def can_recv(**kwargs):
nonlocal frames
frames += 1
return [can] # noqa: B023

car_fingerprint, _ = can_fingerprint(can_recv)
assert car_fingerprint == car_model
assert frames == expected_frames + 2 # TODO: fix extra frames
if __name__ == "__main__":
unittest.main()
Loading
Loading