From eab607bfcc7e24947bfa2570d27d418ee260a902 Mon Sep 17 00:00:00 2001 From: Mike Stitt Date: Sat, 5 Apr 2025 13:15:20 -0500 Subject: [PATCH 1/8] iterativerobotpy, timedrobotpy, and tests. First version of iterativerobotpy.py Signed-off-by: Mike Stitt Simulates, but does not update robot position in simulation. Signed-off-by: Mike Stitt Simulates, but does not update robot position in simulation. Signed-off-by: Mike Stitt Only call the inits on a transition. Signed-off-by: Mike Stitt This version does simulate in autonomous and teleop okay. Need to test timer math. Signed-off-by: Mike Stitt Cleaning Up Signed-off-by: Mike Stitt Cleaning up. Signed-off-by: Mike Stitt Fix typos Signed-off-by: Mike Stitt Correct periodS to be period, use from . to import Signed-off-by: Mike Stitt Fix up watchdog interface. Signed-off-by: Mike Stitt Cleaning up. Signed-off-by: Mike Stitt Cleaning up code. Signed-off-by: Mike Stitt Running through black Signed-off-by: Mike Stitt Added docstrings, new todo's based upon docstring comparison. Signed-off-by: Mike Stitt Added missing wpimath.units.second Signed-off-by: Mike Stitt Fix typo, second to seconds Signed-off-by: Mike Stitt use if rather than match for python 3.9 Signed-off-by: Mike Stitt simulationPeriodic becomes _simulationPeriodic Signed-off-by: Mike Stitt Get the physics engine to work. Signed-off-by: Mike Stitt Fix units bug for getLoopStartTime. now returns microseconds. Signed-off-by: Mike Stitt Make work with python 3.9 Signed-off-by: Mike Stitt Add test of period math, and improve comments. Signed-off-by: Mike Stitt fix black formatting Signed-off-by: Mike Stitt Proof of Concept for TimedRobot Functional Tests Signed-off-by: Mike Stitt WIP still sims. Signed-off-by: Mike Stitt Improving simulation. Signed-off-by: Mike Stitt Getting new tests to not break existing tests. Signed-off-by: Mike Stitt Progress on tests. Signed-off-by: Mike Stitt WIP - tests should pass. Signed-off-by: Mike Stitt Debugging Signed-off-by: Mike Stitt Run black, add to test requirements. Signed-off-by: Mike Stitt Only call endCompetition once during tests. Signed-off-by: Mike Stitt Adjusting tests. Signed-off-by: Mike Stitt Add tests for addPeriodic, fix Override me text. Signed-off-by: Mike Stitt --- subprojects/robotpy-wpilib/tests/conftest.py | 6 +- .../robotpy-wpilib/tests/requirements.txt | 1 + .../tests/test_poc_timedrobot.py | 986 ++++++++++++++++++ .../robotpy-wpilib/tests/test_timedrobot.py | 41 + subprojects/robotpy-wpilib/wpilib/__init__.py | 4 +- .../robotpy-wpilib/wpilib/iterativerobotpy.py | 476 +++++++++ .../robotpy-wpilib/wpilib/timedrobotpy.py | 272 +++++ 7 files changed, 1784 insertions(+), 2 deletions(-) create mode 100644 subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py create mode 100644 subprojects/robotpy-wpilib/tests/test_timedrobot.py create mode 100644 subprojects/robotpy-wpilib/wpilib/iterativerobotpy.py create mode 100644 subprojects/robotpy-wpilib/wpilib/timedrobotpy.py diff --git a/subprojects/robotpy-wpilib/tests/conftest.py b/subprojects/robotpy-wpilib/tests/conftest.py index 88728aa91..f7d7f9655 100644 --- a/subprojects/robotpy-wpilib/tests/conftest.py +++ b/subprojects/robotpy-wpilib/tests/conftest.py @@ -2,9 +2,13 @@ import pytest import ntcore -import wpilib from wpilib.simulation._simulation import _resetWpilibSimulationData +try: + import commands2 +except ImportError: + commands2 = None + @pytest.fixture def cfg_logging(caplog): diff --git a/subprojects/robotpy-wpilib/tests/requirements.txt b/subprojects/robotpy-wpilib/tests/requirements.txt index e079f8a60..30e882850 100644 --- a/subprojects/robotpy-wpilib/tests/requirements.txt +++ b/subprojects/robotpy-wpilib/tests/requirements.txt @@ -1 +1,2 @@ pytest +pytest-reraise diff --git a/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py b/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py new file mode 100644 index 000000000..9f6ff6d64 --- /dev/null +++ b/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py @@ -0,0 +1,986 @@ +""" +Proof of concept to test TimedRobotPy using PyTest + +This POC was made by deconstructing pytest_plugin.py so that it is no longer a plugin but a class that provides +fixtures. + +To run / debug this: + +pytest subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py --no-header -vvv -s + +""" + +import contextlib +import copy +from enum import Enum +import gc +import inspect +import pytest +import threading +import typing +import weakref + +import ntcore +import hal +import hal.simulation +import wpilib +from wpilib import RobotController +import wpilib.shuffleboard +from wpilib.simulation._simulation import _resetWpilibSimulationData +import wpilib.simulation +from wpilib.simulation import pauseTiming, restartTiming +from wpilib.simulation import DriverStationSim, stepTiming, stepTimingAsync +from wpilib.timedrobotpy import TimedRobotPy +from wpilib import TimedRobot + + +def nottest(obj): + obj.__test__ = False + return obj + + +@nottest +class TestController: + """ + Use this object to control the robot's state during tests + """ + + def __init__(self, reraise, robot: wpilib.RobotBase, expectFinished: bool): + self._reraise = reraise + + self._thread: typing.Optional[threading.Thread] = None + self._robot = robot + self._expectFinished = expectFinished + + self._cond = threading.Condition() + self._robot_started = False + self._robot_init_started = False + self._robot_finished = False + + def _on_robot__init_started(self): + with self._cond: + self._robot_init_started = True + self._cond.notify_all() + + def _robot_thread(self, robot): + with self._cond: + self._robot_started = True + self._cond.notify_all() + + with self._reraise(catch=True): + assert robot is not None # shouldn't happen... + + robot._TestRobot__robotInitStarted = self._on_robot__init_started + + try: + robot.startCompetition() + assert self._expectFinished == self._robot_finished + finally: + del robot + + @contextlib.contextmanager + def run_robot(self): + """ + Use this in a "with" statement to start your robot code at the + beginning of the with block, and end your robot code at the end + of the with block. + + Your `robotInit` function will not be called until this function + is called. + """ + + # remove robot reference so it gets cleaned up when gc.collect() is called + robot = self._robot + self._robot = None + + self._thread = th = threading.Thread( + target=self._robot_thread, args=(robot,), daemon=True + ) + th.start() + + with self._cond: + # make sure the thread didn't die + assert self._cond.wait_for(lambda: self._robot_started, timeout=1) + + # If your robotInit is taking more than 2 seconds in simulation, you're + # probably doing something wrong... but if not, please report a bug! + assert self._cond.wait_for(lambda: self._robot_init_started, timeout=2) + + try: + # in this block you should tell the sim to do sim things + yield + finally: + self._robot_finished = True + robot.endCompetition() + + if isinstance(self._reraise.exception, RuntimeError): + if str(self._reraise.exception).startswith( + "HAL: A handle parameter was passed incorrectly" + ): + msg = ( + "Do not reuse HAL objects in tests! This error may occur if you" + " stored a motor/sensor as a global or as a class variable" + " outside of a method." + ) + if hasattr(Exception, "add_note"): + self._reraise.exception.add_note(f"*** {msg}") + else: + e = self._reraise.exception + self._reraise.reset() + raise RuntimeError(msg) from e + + # Increment time by 1 second to ensure that any notifiers fire + stepTimingAsync(1.0) + + # the robot thread should exit quickly + th.join(timeout=1) + if th.is_alive(): + pytest.fail("robot did not exit within 2 seconds") + + self._thread = None + + @property + def robot_is_alive(self) -> bool: + """ + True if the robot code is alive + """ + th = self._thread + if not th: + return False + + return th.is_alive() + + def step_timing( + self, + *, + seconds: float, + autonomous: bool = False, + test: bool = False, + enabled: bool = False, + assert_alive: bool = True, + ) -> float: + """ + This utility will increment simulated time, while pretending that + there's a driver station connected and delivering new packets + every 0.2 seconds. + + :param seconds: Number of seconds to run (will step in increments of 0.2) + :param autonomous: Tell the robot that it is in autonomous mode + :param enabled: Tell the robot that it is enabled + + :returns: Number of seconds time was incremented + """ + + if self._expectFinished: + assert self.robot_is_alive, "did you call control.run_robot()?" + + assert seconds > 0 + + DriverStationSim.setDsAttached(True) + DriverStationSim.setAutonomous(autonomous) + DriverStationSim.setTest(test) + DriverStationSim.setEnabled(enabled) + + tm = 0.0 + + while tm < seconds: + DriverStationSim.notifyNewData() + stepTiming(0.001) + if assert_alive and self._expectFinished: + assert self.robot_is_alive + tm += 0.001 + + return tm + + +@pytest.fixture(scope="function") +def decorated_robot_class(myrobot_class) -> tuple: + # attach physics + + robotClass = myrobot_class + + # Tests need to know when robotInit is called, so override the robot + # to do that + class TestRobot(robotClass): + def robotInit(self): + self.__robotInitStarted() + super().robotInit() + + TestRobot.__name__ = robotClass.__name__ + TestRobot.__module__ = robotClass.__module__ + TestRobot.__qualname__ = robotClass.__qualname__ + + return TestRobot + + +@pytest.fixture(scope="function") +def robot_with_sim_setup_teardown(decorated_robot_class): + """ + Your robot instance + + .. note:: RobotPy/WPILib testing infrastructure is really sensitive + to ensuring that things get cleaned up properly. Make sure + that you don't store references to your robot or other + WPILib objects in a global or static context. + """ + + # + # This function needs to do the same things that RobotBase.main does + # plus some extra things needed for testing + # + # Previously this was separate from robot fixture, but we need to + # ensure that the robot cleanup happens deterministically relative to + # when handle cleanup/etc happens, otherwise unnecessary HAL errors will + # bubble up to the user + # + + nt_inst = ntcore.NetworkTableInstance.getDefault() + nt_inst.startLocal() + + pauseTiming() + restartTiming() + + wpilib.DriverStation.silenceJoystickConnectionWarning(True) + DriverStationSim.setAutonomous(False) + DriverStationSim.setEnabled(False) + DriverStationSim.notifyNewData() + + robot = decorated_robot_class() + + # Tests only get a proxy to ensure cleanup is more reliable + yield weakref.proxy(robot) + + # HACK: avoid motor safety deadlock + wpilib.simulation._simulation._resetMotorSafety() + + del robot + + # Double-check all objects are destroyed so that HAL handles are released + gc.collect() + + # shutdown networktables before other kinds of global cleanup + # -> some reset functions will re-register listeners, so it's important + # to do this before so that the listeners are active on the current + # NetworkTables instance + nt_inst.stopLocal() + nt_inst._reset() + + # Cleanup WPILib globals + # -> preferences, SmartDashboard, Shuffleboard, LiveWindow, MotorSafety + wpilib.simulation._simulation._resetWpilibSimulationData() + wpilib._wpilib._clearSmartDashboardData() + wpilib.shuffleboard._shuffleboard._clearShuffleboardData() + + # Cancel all periodic callbacks + hal.simulation.cancelAllSimPeriodicCallbacks() + + # Reset the HAL handles + hal.simulation.resetGlobalHandles() + + # Reset the HAL data + hal.simulation.resetAllSimData() + + # Don't call HAL shutdown! This is only used to cleanup HAL extensions, + # and functions will only be called the first time (unless re-registered) + # hal.shutdown() + + +@pytest.fixture(scope="function") +def getTestController( + reraise, robot_with_sim_setup_teardown: wpilib.RobotBase, expectFinished: bool +) -> TestController: + """ + A pytest fixture that provides control over your robot_with_sim_setup_teardown + """ + return TestController(reraise, robot_with_sim_setup_teardown, expectFinished) + + +class TimedRobotPyExpectsException(TimedRobotPy): + + def __init__(self, period=TimedRobotPy.kDefaultPeriod): + super().__init__(period=period) + self._callOrder = ":" + + def startCompetition(self) -> None: + hasAssertionError = False + try: + super().startCompetition() + except AssertionError: + hasAssertionError = True + print(f"TimedRobotPyExpectsException hasAssertionError={hasAssertionError}") + assert hasAssertionError + + +class TimedRobotPyDoNotExpectException(TimedRobotPy): + + def __init__(self, period=TimedRobotPy.kDefaultPeriod): + super().__init__(period=period) + self._callOrder = ":" + + def startCompetition(self) -> None: + hasAssertionError = False + try: + super().startCompetition() + except AssertionError: + hasAssertionError = True + print(f"TimedRobotPyExpectsException hasAssertionError={hasAssertionError}") + assert not hasAssertionError + + +def getFPGATimeInSecondsAsStr(): + return f"{RobotController.getFPGATime()/1000_000.0:.3f}s" + + +def printEntryAndExit(func): + def wrapper(*args, **kwargs): + name = func.__name__ + args[0]._callOrder += name + "+:" + print(f"{getFPGATimeInSecondsAsStr()}:Enter:{name}") + result = func(*args, **kwargs) + args[0]._callOrder += name + "-:" + print(f"{getFPGATimeInSecondsAsStr()}:Exit_:{name}") + return result + + return wrapper + + +class MyRobotDefaultPass: + + @printEntryAndExit + def robotInit(self): + pass + + @printEntryAndExit + def robotPeriodic(self): + pass + + @printEntryAndExit + def autonomousInit(self): + pass + + @printEntryAndExit + def autonomousPeriodic(self): + pass + + @printEntryAndExit + def autonomousExit(self): + pass + + @printEntryAndExit + def teleopInit(self): + pass + + @printEntryAndExit + def teleopPeriodic(self): + pass + + @printEntryAndExit + def teleopExit(self): + pass + + @printEntryAndExit + def testInit(self): + pass + + @printEntryAndExit + def testPeriodic(self): + pass + + @printEntryAndExit + def testExit(self): + pass + + @printEntryAndExit + def disabledInit(self): + pass + + @printEntryAndExit + def disabledPeriodic(self): + pass + + @printEntryAndExit + def disabledExit(self): + pass + + @printEntryAndExit + def _simulationInit(self): + pass + + @printEntryAndExit + def _simulationPeriodic(self): + pass + + +class MyRobotRobotInitFails(MyRobotDefaultPass): + @printEntryAndExit + def robotInit(self): + assert False + + +class MyRobotRobotPeriodicFails(MyRobotDefaultPass): + @printEntryAndExit + def robotPeriodic(self): + assert False + + +class MyRobotAutonomousInitFails(MyRobotDefaultPass): + @printEntryAndExit + def autonomousInit(self): + assert False + + +class MyRobotAutonomousPeriodicFails(MyRobotDefaultPass): + @printEntryAndExit + def autonomousPeriodic(self): + assert False + + +class MyRobotAutonomousExitFails(MyRobotDefaultPass): + @printEntryAndExit + def autonomousExit(self): + assert False + + +class MyRobotTeleopInitFails(MyRobotDefaultPass): + @printEntryAndExit + def teleopInit(self): + assert False + + +class MyRobotTeleopPeriodicFails(MyRobotDefaultPass): + @printEntryAndExit + def teleopPeriodic(self): + assert False + + +class MyRobotTeleopExitFails(MyRobotDefaultPass): + @printEntryAndExit + def teleopExit(self): + assert False + + +class MyRobotDisabledInitFails(MyRobotDefaultPass): + @printEntryAndExit + def disabledInit(self): + assert False + + +class MyRobotDisabledPeriodicFails(MyRobotDefaultPass): + @printEntryAndExit + def disabledPeriodic(self): + assert False + + +class MyRobotDisabledExitFails(MyRobotDefaultPass): + @printEntryAndExit + def disabledExit(self): + assert False + + +class MyRobotTestInitFails(MyRobotDefaultPass): + @printEntryAndExit + def testInit(self): + assert False + + +class MyRobotTestPeriodicFails(MyRobotDefaultPass): + @printEntryAndExit + def testPeriodic(self): + assert False + + +class MyRobotTestExitFails(MyRobotDefaultPass): + @printEntryAndExit + def testExit(self): + assert False + + +class ExpectFinished(Enum): + kNotFinished = 0 + kFinished = 1 + + +class RobotMode(Enum): + kNone = 0 + kDisabled = 1 + kAutonomous = 2 + kTeleop = 3 + kTest = 4 + + def __init__(self, code): + self._code = code + + @property + def autonomous(self): + return self is RobotMode.kAutonomous + + @property + def test(self): + return self is RobotMode.kTest + + @property + def enabled(self): + return (self is not RobotMode.kDisabled) and (self is not RobotMode.kNone) + + +@pytest.fixture(scope="function") +def myrobot_class( + myRobotAddMethods, + timedRobotExpectation, + _expectFinished, + _robotMode, + _callSequenceStr, +) -> type[TimedRobotPy]: + class MyRobot(myRobotAddMethods, timedRobotExpectation): + + @printEntryAndExit + def startCompetition(self): + super().startCompetition() + + @printEntryAndExit + def endCompetition(self): + super().endCompetition() + + return MyRobot + + +@pytest.fixture(scope="function") +def expectFinished( + myRobotAddMethods, + timedRobotExpectation, + _expectFinished, + _robotMode, + _callSequenceStr, +) -> bool: + return _expectFinished is ExpectFinished.kFinished + + +@pytest.fixture(scope="function") +def robotModeFixture( + myRobotAddMethods, + timedRobotExpectation, + _expectFinished, + _robotMode, + _callSequenceStr, +) -> RobotMode: + return _robotMode + + +@pytest.fixture(scope="function") +def callSequenceStr( + myRobotAddMethods, + timedRobotExpectation, + _expectFinished, + _robotMode, + _callSequenceStr, +) -> str: + return _callSequenceStr + + +@pytest.mark.parametrize( + "myRobotAddMethods, timedRobotExpectation, _expectFinished, _robotMode, _callSequenceStr", + [ + ( + MyRobotDefaultPass, + TimedRobotPyDoNotExpectException, + ExpectFinished.kFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:autonomousInit-:autonomousPeriodic+:autonomousPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":autonomousExit+:autonomousExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:", + ), + ( + MyRobotDefaultPass, + TimedRobotPyDoNotExpectException, + ExpectFinished.kFinished, + RobotMode.kTeleop, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:teleopInit-:teleopPeriodic+:teleopPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":teleopExit+:teleopExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:", + ), + ( + MyRobotDefaultPass, + TimedRobotPyDoNotExpectException, + ExpectFinished.kFinished, + RobotMode.kTest, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:testInit-:testPeriodic+:testPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":testExit+:testExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:", + ), + ( + MyRobotRobotInitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:startCompetition-:", + ), + ( + MyRobotAutonomousInitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:startCompetition-:", + ), + ( + MyRobotAutonomousPeriodicFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:autonomousInit-:autonomousPeriodic+:startCompetition-:", + ), + ( + MyRobotAutonomousExitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:autonomousInit-:autonomousPeriodic+:autonomousPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":autonomousExit+:startCompetition-:", + ), + ( + MyRobotTeleopInitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTeleop, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:startCompetition-:", + ), + ( + MyRobotTeleopPeriodicFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTeleop, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:teleopInit-:teleopPeriodic+:startCompetition-:", + ), + ( + MyRobotTeleopExitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTeleop, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:teleopInit-:teleopPeriodic+:teleopPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":teleopExit+:startCompetition-:", + ), + ( + MyRobotTestInitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTest, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:startCompetition-:", + ), + ( + MyRobotTestPeriodicFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTest, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:testInit-:testPeriodic+:startCompetition-:", + ), + ( + MyRobotTestExitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTest, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:testInit-:testPeriodic+:testPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":testExit+:startCompetition-:", + ), + ], +) +class TestCanThrowExceptions: + + def test_robot_mode_with_exceptions( + self, + getTestController, + robot_with_sim_setup_teardown, + robotModeFixture, + callSequenceStr, + ): + with getTestController.run_robot(): + rmf = robotModeFixture + periodS = robot_with_sim_setup_teardown.getPeriod() + # Run disabled for a short period + print( + f"periodS={periodS} or {periodS*1.5} a={rmf.autonomous} t={rmf.test} e={rmf.enabled}" + ) + getTestController.step_timing( + seconds=periodS * 1.5, + autonomous=rmf.autonomous, + test=rmf.test, + enabled=False, + ) + + # Run in desired mode for 1 period + getTestController.step_timing( + seconds=periodS, + autonomous=rmf.autonomous, + test=rmf.test, + enabled=rmf.enabled, + ) + + # Disabled for 1 period + getTestController.step_timing( + seconds=periodS, autonomous=rmf.autonomous, test=rmf.test, enabled=False + ) + print(f"result={robot_with_sim_setup_teardown._callOrder}") + assert robot_with_sim_setup_teardown._callOrder == callSequenceStr + + +@pytest.mark.parametrize( + "myRobotAddMethods, timedRobotExpectation, _expectFinished, _robotMode, _callSequenceStr", + [ + ( + MyRobotDefaultPass, + TimedRobotPyDoNotExpectException, + ExpectFinished.kFinished, + None, + None, + ), + ], +) +class TestSequenceThroughModes: + + def test_robot_mode_sequence( + self, + getTestController, + robot_with_sim_setup_teardown, + ): + callSequenceStr = ( + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:autonomousInit-" + ":autonomousPeriodic+:autonomousPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":autonomousPeriodic+:autonomousPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":autonomousExit+:autonomousExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:teleopInit-" + ":teleopPeriodic+:teleopPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":teleopPeriodic+:teleopPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":teleopExit+:teleopExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:testInit-" + ":testPeriodic+:testPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":testPeriodic+:testPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":testExit+:testExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:" + ) + + with getTestController.run_robot(): + periodS = robot_with_sim_setup_teardown.getPeriod() + # Run disabled for a short period + getTestController.step_timing( + seconds=periodS * 1.5, + autonomous=True, + test=False, + enabled=False, + ) + + # Run in autonomous mode for 2 periods + getTestController.step_timing( + seconds=periodS * 2, + autonomous=True, + test=False, + enabled=True, + ) + + # Disabled for 1 period + getTestController.step_timing( + seconds=periodS, autonomous=False, test=False, enabled=False + ) + + # Run in teleop mode for 2 periods + getTestController.step_timing( + seconds=periodS * 2, + autonomous=False, + test=False, + enabled=True, + ) + + # Disabled for 1 period + getTestController.step_timing( + seconds=periodS, autonomous=False, test=False, enabled=False + ) + + # Run in test mode for 2 periods + getTestController.step_timing( + seconds=periodS * 2, + autonomous=False, + test=True, + enabled=True, + ) + + # Disabled for 1 period + getTestController.step_timing( + seconds=periodS, autonomous=False, test=False, enabled=False + ) + + print(f"result={robot_with_sim_setup_teardown._callOrder}") + assert robot_with_sim_setup_teardown._callOrder == callSequenceStr + + +class TimedRobotDoNotExpectException(TimedRobot): + + def __init__(self, period=TimedRobotPy.kDefaultPeriod): + super().__init__(period=period) + self._callOrder = ":" + + def startCompetition(self) -> None: + hasAssertionError = False + try: + super().startCompetition() + except AssertionError: + hasAssertionError = True + print(f"TimedRobotPyExpectsException hasAssertionError={hasAssertionError}") + assert not hasAssertionError + + +class TimedRobotDoNotExpectException1msPeriod(TimedRobotDoNotExpectException): + + def __init__(self, period=0.010): + super().__init__(period=period) + + +class TimedRobotPyDoNotExpectException1msPeriod(TimedRobotPyDoNotExpectException): + + def __init__(self, period=0.010): + super().__init__(period=period) + + +class MyRobotAddPeriodic: + + def addCallLog(self, name: str): + self._calls.append({"name": name, "time": RobotController.getFPGATime()}) + if name not in self._callCount: + self._callCount[name] = 0 + self._callCount[name] += 1 + + @printEntryAndExit + def _periodicN(self, name: str): + print( + f"{getFPGATimeInSecondsAsStr()}:Function {name} executed count={self.count}" + ) + self.addCallLog(name) + + @printEntryAndExit + def robotInit(self): + self.count = 0 + + self._calls = [] + self._callCount = {} + + self.addPeriodic(lambda: self._periodicN("periodic_0_0"), 0.010, 0.001) + self.addPeriodic(lambda: self._periodicN("periodic_0_1"), 0.010, 0.001) + + @printEntryAndExit + def robotPeriodic(self): + self.count += 1 + self.addCallLog("robotPeriodic") + + name = f"periodic_{self.count}" + + def periodic_N(): + self._periodicN(name) + + self.addPeriodic(periodic_N, 0.020, 0.002) + + +@pytest.mark.parametrize( + "myRobotAddMethods, timedRobotExpectation, _expectFinished, _robotMode, _callSequenceStr", + [ + ( + MyRobotAddPeriodic, + TimedRobotDoNotExpectException1msPeriod, + # TimedRobotPyDoNotExpectException1msPeriod, + ExpectFinished.kFinished, + None, + None, + ), + ], +) +class TestSequenceAddPeriodics: + + def test_robot_add_periodic( + self, + getTestController, + robot_with_sim_setup_teardown, + ): + + with getTestController.run_robot(): + periodS = robot_with_sim_setup_teardown.getPeriod() + # Run disabled for a short period + getTestController.step_timing( + seconds=periodS * 1.5, + autonomous=True, + test=False, + enabled=False, + ) + + # Run disabled mode for periods + getTestController.step_timing( + seconds=periodS * 6, + autonomous=False, + test=False, + enabled=False, + ) + + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 7 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 7 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 7 + assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 3 + assert robot_with_sim_setup_teardown._callCount["periodic_2"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_3"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_4"] == 1 + assert robot_with_sim_setup_teardown._callCount["periodic_5"] == 1 diff --git a/subprojects/robotpy-wpilib/tests/test_timedrobot.py b/subprojects/robotpy-wpilib/tests/test_timedrobot.py new file mode 100644 index 000000000..168d5b878 --- /dev/null +++ b/subprojects/robotpy-wpilib/tests/test_timedrobot.py @@ -0,0 +1,41 @@ +import pytest + +from wpilib.timedrobotpy import _Callback + + +def test_calcFutureExpirationUs() -> None: + cb = _Callback(func=None, periodUs=20_000, expirationUs=100) + assert cb.calcFutureExpirationUs(100) == 20_100 + assert cb.calcFutureExpirationUs(101) == 20_100 + assert cb.calcFutureExpirationUs(20_099) == 20_100 + assert cb.calcFutureExpirationUs(20_100) == 40_100 + assert cb.calcFutureExpirationUs(20_101) == 40_100 + + cb = _Callback(func=None, periodUs=40_000, expirationUs=500) + assert cb.calcFutureExpirationUs(500) == 40_500 + assert cb.calcFutureExpirationUs(501) == 40_500 + assert cb.calcFutureExpirationUs(40_499) == 40_500 + assert cb.calcFutureExpirationUs(40_500) == 80_500 + assert cb.calcFutureExpirationUs(40_501) == 80_500 + + cb = _Callback(func=None, periodUs=1_000, expirationUs=0) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_000_000) + == 1_000_000_000_000_001_000 + ) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_000_001) + == 1_000_000_000_000_001_000 + ) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_000_999) + == 1_000_000_000_000_001_000 + ) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_001_000) + == 1_000_000_000_000_002_000 + ) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_001_001) + == 1_000_000_000_000_002_000 + ) diff --git a/subprojects/robotpy-wpilib/wpilib/__init__.py b/subprojects/robotpy-wpilib/wpilib/__init__.py index fd50fc34f..10a2409a4 100644 --- a/subprojects/robotpy-wpilib/wpilib/__init__.py +++ b/subprojects/robotpy-wpilib/wpilib/__init__.py @@ -233,6 +233,8 @@ from .cameraserver import CameraServer from .deployinfo import getDeployData +from .iterativerobotpy import IterativeRobotPy +from .timedrobotpy import TimedRobotPy try: from .version import version as __version__ @@ -241,4 +243,4 @@ from ._impl.main import run -__all__ += ["CameraServer", "run"] +__all__ += ["CameraServer", "run", "IterativeRobotPy", "TimedRobotPy"] diff --git a/subprojects/robotpy-wpilib/wpilib/iterativerobotpy.py b/subprojects/robotpy-wpilib/wpilib/iterativerobotpy.py new file mode 100644 index 000000000..679afc468 --- /dev/null +++ b/subprojects/robotpy-wpilib/wpilib/iterativerobotpy.py @@ -0,0 +1,476 @@ +from enum import Enum + +from hal import ( + report, + tResourceType, + tInstances, + observeUserProgramDisabled, + observeUserProgramTest, + observeUserProgramAutonomous, + observeUserProgramTeleop, + simPeriodicBefore, + simPeriodicAfter, +) +from ntcore import NetworkTableInstance +from wpilib import ( + DriverStation, + DSControlWord, + Watchdog, + LiveWindow, + RobotBase, + SmartDashboard, + reportWarning, +) +from wpilib.shuffleboard import Shuffleboard +import wpimath.units + +_kResourceType_SmartDashboard = tResourceType.kResourceType_SmartDashboard +_kSmartDashboard_LiveWindow = tInstances.kSmartDashboard_LiveWindow + + +class IterativeRobotMode(Enum): + kNone = 0 + kDisabled = 1 + kAutonomous = 2 + kTeleop = 3 + kTest = 4 + + +# todo should this be IterativeRobotPy or IterativeRobotBase (replacing) or IterativeRobotBasePy +class IterativeRobotPy(RobotBase): + """ + IterativeRobotPy implements a specific type of robot program framework, + extending the RobotBase class. It provides similar functionality as + IterativeRobotBase does in a python wrapper of C++, but in pure python. + + The IterativeRobotPy class does not implement StartCompetition(), so it + should not be used by teams directly. + + This class provides the following functions which are called by the main + loop, StartCompetition(), at the appropriate times: + + robotInit() -- provide for initialization at robot power-on + + DriverStationConnected() -- provide for initialization the first time the DS + is connected + + Init() functions -- each of the following functions is called once when the + appropriate mode is entered: + + - disabledInit() -- called each and every time disabled is entered from another mode + - autonomousInit() -- called each and every time autonomous is entered from another mode + - teleopInit() -- called each and every time teleop is entered from another mode + - testInit() -- called each and every time test is entered from another mode + + Periodic() functions -- each of these functions is called on an interval: + + - robotPeriodic() + - disabledPeriodic() + - autonomousPeriodic() + - teleopPeriodic() + - testPeriodic() + + Exit() functions -- each of the following functions is called once when the + appropriate mode is exited: + + - disabledExit() -- called each and every time disabled is exited + - autonomousExit() -- called each and every time autonomous is exited + - teleopExit() -- called each and every time teleop is exited + - testExit() -- called each and every time test is exited + """ + + def __init__(self, period: wpimath.units.seconds) -> None: + """ + Constructor for IterativeRobotPy. + + :param period: period of the main robot periodic loop in seconds. + """ + super().__init__() + self._periodS = period + self.watchdog = Watchdog(self._periodS, self.printLoopOverrunMessage) + self._networkTableInstanceDefault = NetworkTableInstance.getDefault() + self._mode: IterativeRobotMode = IterativeRobotMode.kNone + self._lastMode: IterativeRobotMode = IterativeRobotMode.kNone + self._ntFlushEnabled: bool = True + self._lwEnabledInTest: bool = False + self._calledDsConnected: bool = False + self._reportedLw: bool = False + self._robotPeriodicHasRun: bool = False + self._simulationPeriodicHasRun: bool = False + self._disabledPeriodicHasRun: bool = False + self._autonomousPeriodicHasRun: bool = False + self._teleopPeriodicHasRun: bool = False + self._testPeriodicHasRun: bool = False + + def robotInit(self) -> None: + """ + Robot-wide initialization code should go here. + + Users should override this method for default Robot-wide initialization + which will be called when the robot is first powered on. It will be called + exactly one time. + + Note: This method is functionally identical to the class constructor so + that should be used instead. + """ + pass + + def driverStationConnected(self) -> None: + """ + Code that needs to know the DS state should go here. + + Users should override this method for initialization that needs to occur + after the DS is connected, such as needing the alliance information. + """ + pass + + def _simulationInit(self) -> None: + """ + Robot-wide simulation initialization code should go here. + + Users should override this method for default Robot-wide simulation + related initialization which will be called when the robot is first + started. It will be called exactly one time after robotInit is called + only when the robot is in simulation. + """ + pass + + def disabledInit(self) -> None: + """ + Initialization code for disabled mode should go here. + + Users should override this method for initialization code which will be + called each time + the robot enters disabled mode. + """ + pass + + def autonomousInit(self) -> None: + """ + Initialization code for autonomous mode should go here. + + Users should override this method for initialization code which will be + called each time the robot enters autonomous mode. + """ + pass + + def teleopInit(self) -> None: + """ + Initialization code for teleop mode should go here. + + Users should override this method for initialization code which will be + called each time the robot enters teleop mode. + """ + pass + + def testInit(self) -> None: + """ + Initialization code for test mode should go here. + + Users should override this method for initialization code which will be + called each time the robot enters test mode. + """ + pass + + def robotPeriodic(self) -> None: + """ + Periodic code for all modes should go here. + + This function is called each time a new packet is received from the driver + station. + """ + if not self._robotPeriodicHasRun: + print(f"Default robotPeriodic() method... Override me!") + self._robotPeriodicHasRun = True + + def _simulationPeriodic(self) -> None: + """ + Periodic simulation code should go here. + + This function is called in a simulated robot after user code executes. + """ + if not self._simulationPeriodicHasRun: + print(f"Default _simulationPeriodic() method... Override me!") + self._simulationPeriodicHasRun = True + + def disabledPeriodic(self) -> None: + """ + Periodic code for disabled mode should go here. + + Users should override this method for code which will be called each time a + new packet is received from the driver station and the robot is in disabled + mode. + """ + if not self._disabledPeriodicHasRun: + print(f"Default disabledPeriodic() method... Override me!") + self._disabledPeriodicHasRun = True + + def autonomousPeriodic(self) -> None: + """ + Periodic code for autonomous mode should go here. + + Users should override this method for code which will be called each time a + new packet is received from the driver station and the robot is in + autonomous mode. + """ + if not self._autonomousPeriodicHasRun: + print(f"Default autonomousPeriodic() method... Override me!") + self._autonomousPeriodicHasRun = True + + def teleopPeriodic(self) -> None: + """ + Periodic code for teleop mode should go here. + + Users should override this method for code which will be called each time a + new packet is received from the driver station and the robot is in teleop + mode. + """ + if not self._teleopPeriodicHasRun: + print(f"Default teleopPeriodic() method... Override me!") + self._teleopPeriodicHasRun = True + + def testPeriodic(self) -> None: + """ + Periodic code for test mode should go here. + + Users should override this method for code which will be called each time a + new packet is received from the driver station and the robot is in test + mode. + """ + if not self._testPeriodicHasRun: + print(f"Default testPeriodic() method... Override me!") + self._testPeriodicHasRun = True + + def disabledExit(self) -> None: + """ + Exit code for disabled mode should go here. + + Users should override this method for code which will be called each time + the robot exits disabled mode. + """ + pass + + def autonomousExit(self) -> None: + """ + Exit code for autonomous mode should go here. + + Users should override this method for code which will be called each time + the robot exits autonomous mode. + """ + pass + + def teleopExit(self) -> None: + """ + Exit code for teleop mode should go here. + + Users should override this method for code which will be called each time + the robot exits teleop mode. + """ + pass + + def testExit(self) -> None: + """ + Exit code for test mode should go here. + + Users should override this method for code which will be called each time + the robot exits test mode. + """ + pass + + def setNetworkTablesFlushEnabled(self, enabled: bool) -> None: + """ + Enables or disables flushing NetworkTables every loop iteration. + By default, this is enabled. + + :deprecated: Deprecated without replacement. + + :param enabled: True to enable, false to disable. + """ + + self._ntFlushEnabled = enabled + + def enableLiveWindowInTest(self, testLW: bool) -> None: + """ + Sets whether LiveWindow operation is enabled during test mode. + + :param testLW: True to enable, false to disable. Defaults to false. + @throws if called in test mode. + """ + if self.isTestEnabled(): + raise RuntimeError("Can't configure test mode while in test mode!") + if not self._reportedLw and testLW: + report(_kResourceType_SmartDashboard, _kSmartDashboard_LiveWindow) + self._reportedLw = True + self._lwEnabledInTest = testLW + + def isLiveWindowEnabledInTest(self) -> bool: + """ + Whether LiveWindow operation is enabled during test mode. + """ + return self._lwEnabledInTest + + def getPeriod(self) -> wpimath.units.seconds: + """ + Gets time period between calls to Periodic() functions. + """ + return self._periodS + + def _loopFunc(self) -> None: + """ + Loop function. + """ + DriverStation.refreshData() + self.watchdog.reset() + + isEnabled, isAutonomous, isTest = self.getControlState() + if not isEnabled: + self._mode = IterativeRobotMode.kDisabled + elif isAutonomous: + self._mode = IterativeRobotMode.kAutonomous + elif isTest: + self._mode = IterativeRobotMode.kTest + else: + self._mode = IterativeRobotMode.kTeleop + + if not self._calledDsConnected and DSControlWord().isDSAttached(): + self._calledDsConnected = True + self.driverStationConnected() + + # If self._mode changed, call self._mode exit and entry functions + if self._lastMode is not self._mode: + + if self._lastMode is IterativeRobotMode.kDisabled: + self.disabledExit() + elif self._lastMode is IterativeRobotMode.kAutonomous: + self.autonomousExit() + elif self._lastMode is IterativeRobotMode.kTeleop: + self.teleopExit() + elif self._lastMode is IterativeRobotMode.kTest: + if self._lwEnabledInTest: + LiveWindow.setEnabled(False) + Shuffleboard.disableActuatorWidgets() + self.testExit() + """ + todo switch to match statements when we don't build with python 3.9 + match self._lastMode: + case IterativeRobotMode.kDisabled: + self.disabledExit() + case IterativeRobotMode.kAutonomous: + self.autonomousExit() + case IterativeRobotMode.kTeleop: + self.teleopExit() + case IterativeRobotMode.kTest: + if self._lwEnabledInTest: + LiveWindow.setEnabled(False) + Shuffleboard.disableActuatorWidgets() + self.testExit() + """ + + if self._mode is IterativeRobotMode.kDisabled: + self.disabledInit() + self.watchdog.addEpoch("disabledInit()") + elif self._mode is IterativeRobotMode.kAutonomous: + self.autonomousInit() + self.watchdog.addEpoch("autonomousInit()") + elif self._mode is IterativeRobotMode.kTeleop: + self.teleopInit() + self.watchdog.addEpoch("teleopInit()") + elif self._mode is IterativeRobotMode.kTest: + if self._lwEnabledInTest: + LiveWindow.setEnabled(True) + Shuffleboard.enableActuatorWidgets() + self.testInit() + self.watchdog.addEpoch("testInit()") + """ + match self._mode: + case IterativeRobotMode.kDisabled: + self.disabledInit() + self._watchdog.addEpoch("disabledInit()") + case IterativeRobotMode.kAutonomous: + self.autonomousInit() + self._watchdog.addEpoch("autonomousInit()") + case IterativeRobotMode.kTeleop: + self.teleopInit() + self._watchdog.addEpoch("teleopInit()") + case IterativeRobotMode.kTest: + if self._lwEnabledInTest: + LiveWindow.setEnabled(True) + Shuffleboard.enableActuatorWidgets() + self.testInit() + self._watchdog.addEpoch("testInit()") + """ + self._lastMode = self._mode + + # Call the appropriate function depending upon the current robot mode + if self._mode is IterativeRobotMode.kDisabled: + observeUserProgramDisabled() + self.disabledPeriodic() + self.watchdog.addEpoch("disabledPeriodic()") + elif self._mode is IterativeRobotMode.kAutonomous: + observeUserProgramAutonomous() + self.autonomousPeriodic() + self.watchdog.addEpoch("autonomousPeriodic()") + elif self._mode is IterativeRobotMode.kTeleop: + observeUserProgramTeleop() + self.teleopPeriodic() + self.watchdog.addEpoch("teleopPeriodic()") + elif self._mode is IterativeRobotMode.kTest: + observeUserProgramTest() + self.testPeriodic() + self.watchdog.addEpoch("testPeriodic()") + """ + match self._mode: + case IterativeRobotMode.kDisabled: + observeUserProgramDisabled() + self.disabledPeriodic() + self.watchdog.addEpoch("disabledPeriodic()") + case IterativeRobotMode.kAutonomous: + observeUserProgramAutonomous() + self.autonomousPeriodic() + self.watchdog.addEpoch("autonomousPeriodic()") + case IterativeRobotMode.kTeleop: + observeUserProgramTeleop() + self.teleopPeriodic() + self.watchdog.addEpoch("teleopPeriodic()") + case IterativeRobotMode.kTest: + observeUserProgramTest() + self.testPeriodic() + self.watchdog.addEpoch("testPeriodic()") + """ + + self.robotPeriodic() + self.watchdog.addEpoch("robotPeriodic()") + + SmartDashboard.updateValues() + self.watchdog.addEpoch("SmartDashboard.updateValues()") + + LiveWindow.updateValues() + self.watchdog.addEpoch("LiveWindow.updateValues()") + + Shuffleboard.update() + self.watchdog.addEpoch("Shuffleboard.update()") + + if self.isSimulation(): + simPeriodicBefore() + self._simulationPeriodic() + simPeriodicAfter() + self.watchdog.addEpoch("_simulationPeriodic()") + + self.watchdog.disable() + + # Flush NetworkTables + if self._ntFlushEnabled: + self._networkTableInstanceDefault.flushLocal() + + # Warn on loop time overruns + if self.watchdog.isExpired(): + self.printWatchdogEpochs() + + def printLoopOverrunMessage(self) -> None: + reportWarning(f"Loop time of {self.watchdog.getTimeout()}s overrun", False) + + def printWatchdogEpochs(self) -> None: + """ + Prints list of epochs added so far and their times. + """ + self.watchdog.printEpochs() diff --git a/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py b/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py new file mode 100644 index 000000000..713fae6e6 --- /dev/null +++ b/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py @@ -0,0 +1,272 @@ +from typing import Any, Callable, Iterable, ClassVar +from heapq import heappush, heappop +from hal import ( + report, + initializeNotifier, + setNotifierName, + observeUserProgramStarting, + updateNotifierAlarm, + waitForNotifierAlarm, + stopNotifier, + tResourceType, + tInstances, +) +from wpilib import RobotController +import wpimath.units + +from .iterativerobotpy import IterativeRobotPy + +_getFPGATime = RobotController.getFPGATime +_kResourceType_Framework = tResourceType.kResourceType_Framework +_kFramework_Timed = tInstances.kFramework_Timed + +microsecondsAsInt = int + + +class _Callback: + def __init__( + self, + func: Callable[[], None], + periodUs: microsecondsAsInt, + expirationUs: microsecondsAsInt, + ) -> None: + self.func = func + self._periodUs = periodUs + self.expirationUs = expirationUs + + @classmethod + def makeCallBack( + cls, + func: Callable[[], None], + startTimeUs: microsecondsAsInt, + periodUs: microsecondsAsInt, + offsetUs: microsecondsAsInt, + ) -> "_Callback": + + callback = _Callback(func=func, periodUs=periodUs, expirationUs=startTimeUs) + + currentTimeUs = _getFPGATime() + callback.expirationUs = offsetUs + callback.calcFutureExpirationUs( + currentTimeUs + ) + return callback + + def calcFutureExpirationUs( + self, currentTimeUs: microsecondsAsInt + ) -> microsecondsAsInt: + # increment the expiration time by the number of full periods it's behind + # plus one to avoid rapid repeat fires from a large loop overrun. + # + # This routine is called when either: + # this callback has never ran and self.expirationUs is + # TimedRobot._starttimeUs and we are calculating where the first + # expiration should be relative to TimedRobot._starttimeUs + # or: + # this call back has ran and self.expirationUs is when it was scheduled to + # expire and we are calculating when the next expirations should be relative + # to the last scheduled expiration + # + # We assume currentTime ≥ self.expirationUs rather than checking for it since the + # callback wouldn't be running otherwise. + # + # We take when we previously expired or when we started: self.expirationUs + # add + self._periodUs to get at least one period in the future + # then calculate how many whole periods we are behind: + # ((currentTimeUs - self.expirationUs) // self._periodUs) + # and multiply that by self._periodUs to calculate how much time in full + # periods we need to skip to catch up, and add that to the sum to calculate + # when we should run again. + return ( + self.expirationUs + + self._periodUs + + ((currentTimeUs - self.expirationUs) // self._periodUs) * self._periodUs + ) + + def setNextStartTimeUs(self, currentTimeUs: microsecondsAsInt) -> None: + self.expirationUs = self.calcFutureExpirationUs(currentTimeUs) + + def __lt__(self, other) -> bool: + return self.expirationUs < other.expirationUs + + def __bool__(self) -> bool: + return True + + +class _OrderedList: + def __init__(self) -> None: + self._data: list[Any] = [] + + def add(self, item: Any) -> None: + heappush(self._data, item) + + def pop(self) -> Any: + return heappop(self._data) + + def peek( + self, + ) -> Any: # todo change to Any | None when we don't build with python 3.9 + if self._data: + return self._data[0] + else: + return None + + def __len__(self) -> int: + return len(self._data) + + def __iter__(self) -> Iterable[Any]: + return iter(sorted(self._data)) + + def __contains__(self, item) -> bool: + return item in self._data + + def __str__(self) -> str: + return str(sorted(self._data)) + + +# todo what should the name of this class be? +class TimedRobotPy(IterativeRobotPy): + """ + TimedRobotPy implements the IterativeRobotBase robot program framework. + + The TimedRobotPy class is intended to be subclassed by a user creating a + robot program. + + Periodic() functions from the base class are called on an interval by a + Notifier instance. + """ + + kDefaultPeriod: ClassVar[wpimath.units.seconds] = ( + 0.020 # todo this is a change to keep consistent units in the API + ) + + def __init__(self, period: wpimath.units.seconds = kDefaultPeriod) -> None: + """ + Constructor for TimedRobotPy. + + :param period: period of the main robot periodic loop in seconds. + """ + super().__init__(period) + + # All periodic functions created by addPeriodic are relative + # to this self._startTimeUs + self._startTimeUs = _getFPGATime() + self._callbacks = _OrderedList() + self._loopStartTimeUs = 0 + self.addPeriodic(self._loopFunc, period=self._periodS) + + self._notifier, status = initializeNotifier() + print(f"{self._notifier}, {status} = initializeNotifier()") + if status != 0: + raise RuntimeError( + f"initializeNotifier() returned {self._notifier}, {status}" + ) + + status = setNotifierName(self._notifier, "TimedRobotPy") + if status != 0: + raise RuntimeError(f"setNotifierName() returned {status}") + + report(_kResourceType_Framework, _kFramework_Timed) + + def startCompetition(self) -> None: + """ + Provide an alternate "main loop" via startCompetition(). + """ + try: + self.robotInit() + + if self.isSimulation(): + self._simulationInit() + + # Tell the DS that the robot is ready to be enabled + print("********** Robot program startup complete **********", flush=True) + observeUserProgramStarting() + + # Loop forever, calling the appropriate mode-dependent function + # (really not forever, there is a check for a break) + while True: + # We don't have to check there's an element in the queue first because + # there's always at least one (the constructor adds one). It's re-enqueued + # at the end of the loop. + callback = self._callbacks.pop() + + status = updateNotifierAlarm(self._notifier, callback.expirationUs) + if status != 0: + raise RuntimeError(f"updateNotifierAlarm() returned {status}") + + currentTimeUs, status = waitForNotifierAlarm(self._notifier) + if status != 0: + raise RuntimeError( + f"waitForNotifierAlarm() returned currentTimeUs={currentTimeUs} status={status}" + ) + + if currentTimeUs == 0: + # when HAL_StopNotifier(self.notifier) is called the above waitForNotifierAlarm + # will return a currentTimeUs==0 and the API requires robots to stop any loops. + # See the API for waitForNotifierAlarm + break + + self._loopStartTimeUs = _getFPGATime() + self._runCallbackAndReschedule(callback, currentTimeUs) + + # Process all other callbacks that are ready to run + while self._callbacks.peek().expirationUs <= currentTimeUs: + callback = self._callbacks.pop() + self._runCallbackAndReschedule(callback, currentTimeUs) + finally: + self._stopNotifier() + + def _runCallbackAndReschedule( + self, callback: _Callback, currentTimeUs: microsecondsAsInt + ) -> None: + callback.func() + callback.setNextStartTimeUs(currentTimeUs) + self._callbacks.add(callback) + + def _stopNotifier(self): + stopNotifier(self._notifier) + + def endCompetition(self) -> None: + """ + Ends the main loop in startCompetition(). + """ + self._stopNotifier() + + def getLoopStartTime(self) -> microsecondsAsInt: + """ + Return the system clock time in microseconds + for the start of the current + periodic loop. This is in the same time base as Timer.getFPGATimestamp(), + but is stable through a loop. It is updated at the beginning of every + periodic callback (including the normal periodic loop). + + :returns: Robot running time in microseconds, + as of the start of the current + periodic function. + """ + + return self._loopStartTimeUs + + def addPeriodic( + self, + callback: Callable[[], None], + period: wpimath.units.seconds, + offset: wpimath.units.seconds = 0.0, + ) -> None: + """ + Add a callback to run at a specific period with a starting time offset. + + This is scheduled on TimedRobotPy's Notifier, so TimedRobotPy and the callback + run synchronously. Interactions between them are thread-safe. + + :param callback: The callback to run. + :param period: The period at which to run the callback. + :param offset: The offset from the common starting time. This is useful + for scheduling a callback in a different timeslot relative + to TimedRobotPy. + """ + + self._callbacks.add( + _Callback.makeCallBack( + callback, self._startTimeUs, int(period * 1e6), int(offset * 1e6) + ) + ) From 14c4096c144db46aaf4f1e6901703c511e0c6b3b Mon Sep 17 00:00:00 2001 From: Mike Stitt Date: Wed, 21 May 2025 13:49:40 -0400 Subject: [PATCH 2/8] Don't add the commans2 test. Signed-off-by: Mike Stitt --- subprojects/robotpy-wpilib/tests/conftest.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/subprojects/robotpy-wpilib/tests/conftest.py b/subprojects/robotpy-wpilib/tests/conftest.py index f7d7f9655..274f25e51 100644 --- a/subprojects/robotpy-wpilib/tests/conftest.py +++ b/subprojects/robotpy-wpilib/tests/conftest.py @@ -4,11 +4,6 @@ import ntcore from wpilib.simulation._simulation import _resetWpilibSimulationData -try: - import commands2 -except ImportError: - commands2 = None - @pytest.fixture def cfg_logging(caplog): From 7d8d7c199f5fa6878e8aed19a7960fb956104bbf Mon Sep 17 00:00:00 2001 From: Mike Stitt Date: Thu, 22 May 2025 17:42:49 -0400 Subject: [PATCH 3/8] waitForNotifierAlarm provides loopStartTime, _getFPGATime for setNextStartTime Signed-off-by: Mike Stitt --- .../robotpy-wpilib/wpilib/timedrobotpy.py | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py b/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py index 713fae6e6..e1fe39e24 100644 --- a/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py +++ b/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py @@ -91,6 +91,9 @@ def __lt__(self, other) -> bool: def __bool__(self) -> bool: return True + def __repr__(self) -> str: + return f"{{func={self.func.__name__}, _periodUs={self._periodUs}, expirationUs={self.expirationUs}}}" + class _OrderedList: def __init__(self) -> None: @@ -119,7 +122,7 @@ def __iter__(self) -> Iterable[Any]: def __contains__(self, item) -> bool: return item in self._data - def __str__(self) -> str: + def __repr__(self) -> str: return str(sorted(self._data)) @@ -155,7 +158,6 @@ def __init__(self, period: wpimath.units.seconds = kDefaultPeriod) -> None: self.addPeriodic(self._loopFunc, period=self._periodS) self._notifier, status = initializeNotifier() - print(f"{self._notifier}, {status} = initializeNotifier()") if status != 0: raise RuntimeError( f"initializeNotifier() returned {self._notifier}, {status}" @@ -193,33 +195,35 @@ def startCompetition(self) -> None: if status != 0: raise RuntimeError(f"updateNotifierAlarm() returned {status}") - currentTimeUs, status = waitForNotifierAlarm(self._notifier) + self._loopStartTimeUs, status = waitForNotifierAlarm(self._notifier) if status != 0: raise RuntimeError( - f"waitForNotifierAlarm() returned currentTimeUs={currentTimeUs} status={status}" + f"waitForNotifierAlarm() returned _loopStartTimeUs={self._loopStartTimeUs} status={status}" ) - if currentTimeUs == 0: + if self._loopStartTimeUs == 0: # when HAL_StopNotifier(self.notifier) is called the above waitForNotifierAlarm - # will return a currentTimeUs==0 and the API requires robots to stop any loops. + # will return a _loopStartTimeUs==0 and the API requires robots to stop any loops. # See the API for waitForNotifierAlarm break - self._loopStartTimeUs = _getFPGATime() - self._runCallbackAndReschedule(callback, currentTimeUs) + self._runCallbackAndReschedule(callback) # Process all other callbacks that are ready to run - while self._callbacks.peek().expirationUs <= currentTimeUs: + while self._callbacks.peek().expirationUs <= self._loopStartTimeUs: callback = self._callbacks.pop() - self._runCallbackAndReschedule(callback, currentTimeUs) + self._runCallbackAndReschedule(callback) finally: + # pytests hang on PC when we don't force a call to self._stopNotifier() self._stopNotifier() - def _runCallbackAndReschedule( - self, callback: _Callback, currentTimeUs: microsecondsAsInt - ) -> None: + def _runCallbackAndReschedule(self, callback: _Callback) -> None: callback.func() - callback.setNextStartTimeUs(currentTimeUs) + # The c++ implementation used the current time before the callback ran, + # to reschedule. By using _getFPGATime(), on a callback.func() + # that ran long we immediately push the next invocation to the + # following period. + callback.setNextStartTimeUs(_getFPGATime()) self._callbacks.add(callback) def _stopNotifier(self): From 109313829e1b1eb73e3d16cd7e8f8a5459ef41fb Mon Sep 17 00:00:00 2001 From: Mike Stitt Date: Thu, 22 May 2025 20:45:19 -0400 Subject: [PATCH 4/8] Cleaning up tests. Signed-off-by: Mike Stitt --- .../tests/test_poc_timedrobot.py | 190 +++++++++++++----- 1 file changed, 141 insertions(+), 49 deletions(-) diff --git a/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py b/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py index 9f6ff6d64..cc7c9eaa0 100644 --- a/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py +++ b/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py @@ -45,7 +45,7 @@ class TestController: Use this object to control the robot's state during tests """ - def __init__(self, reraise, robot: wpilib.RobotBase, expectFinished: bool): + def __init__(self, reraise, robot: wpilib.RobotBase, expectFinished: bool) -> None: self._reraise = reraise self._thread: typing.Optional[threading.Thread] = None @@ -53,33 +53,33 @@ def __init__(self, reraise, robot: wpilib.RobotBase, expectFinished: bool): self._expectFinished = expectFinished self._cond = threading.Condition() - self._robot_started = False - self._robot_init_started = False - self._robot_finished = False + self._robotStarted = False + self._robotInitStarted = False + self._robotFinished = False - def _on_robot__init_started(self): + def _onRobotInitStarted(self) -> None: with self._cond: - self._robot_init_started = True + self._robotInitStarted = True self._cond.notify_all() - def _robot_thread(self, robot): + def _robotThread(self, robot: TimedRobot | TimedRobotPy) -> None: with self._cond: - self._robot_started = True + self._robotStarted = True self._cond.notify_all() with self._reraise(catch=True): assert robot is not None # shouldn't happen... - robot._TestRobot__robotInitStarted = self._on_robot__init_started + robot._TestRobot__robotInitStarted = self._onRobotInitStarted try: robot.startCompetition() - assert self._expectFinished == self._robot_finished + assert self._expectFinished == self._robotFinished finally: del robot @contextlib.contextmanager - def run_robot(self): + def runRobot(self) -> None: """ Use this in a "with" statement to start your robot code at the beginning of the with block, and end your robot code at the end @@ -94,23 +94,23 @@ def run_robot(self): self._robot = None self._thread = th = threading.Thread( - target=self._robot_thread, args=(robot,), daemon=True + target=self._robotThread, args=(robot,), daemon=True ) th.start() with self._cond: # make sure the thread didn't die - assert self._cond.wait_for(lambda: self._robot_started, timeout=1) + assert self._cond.wait_for(lambda: self._robotStarted, timeout=1) # If your robotInit is taking more than 2 seconds in simulation, you're # probably doing something wrong... but if not, please report a bug! - assert self._cond.wait_for(lambda: self._robot_init_started, timeout=2) + assert self._cond.wait_for(lambda: self._robotInitStarted, timeout=2) try: # in this block you should tell the sim to do sim things yield finally: - self._robot_finished = True + self._robotFinished = True robot.endCompetition() if isinstance(self._reraise.exception, RuntimeError): @@ -140,7 +140,7 @@ def run_robot(self): self._thread = None @property - def robot_is_alive(self) -> bool: + def robotIsAlive(self) -> bool: """ True if the robot code is alive """ @@ -150,7 +150,7 @@ def robot_is_alive(self) -> bool: return th.is_alive() - def step_timing( + def stepTiming( self, *, seconds: float, @@ -172,7 +172,7 @@ def step_timing( """ if self._expectFinished: - assert self.robot_is_alive, "did you call control.run_robot()?" + assert self.robotIsAlive, "did you call control.run_robot()?" assert seconds > 0 @@ -187,7 +187,7 @@ def step_timing( DriverStationSim.notifyNewData() stepTiming(0.001) if assert_alive and self._expectFinished: - assert self.robot_is_alive + assert self.robotIsAlive tm += 0.001 return tm @@ -286,13 +286,13 @@ def robot_with_sim_setup_teardown(decorated_robot_class): @pytest.fixture(scope="function") -def getTestController( - reraise, robot_with_sim_setup_teardown: wpilib.RobotBase, expectFinished: bool +def get_test_controller( + reraise, robot_with_sim_setup_teardown: wpilib.RobotBase, expect_finished: bool ) -> TestController: """ A pytest fixture that provides control over your robot_with_sim_setup_teardown """ - return TestController(reraise, robot_with_sim_setup_teardown, expectFinished) + return TestController(reraise, robot_with_sim_setup_teardown, expect_finished) class TimedRobotPyExpectsException(TimedRobotPy): @@ -545,7 +545,7 @@ def endCompetition(self): @pytest.fixture(scope="function") -def expectFinished( +def expect_finished( myRobotAddMethods, timedRobotExpectation, _expectFinished, @@ -556,7 +556,7 @@ def expectFinished( @pytest.fixture(scope="function") -def robotModeFixture( +def robot_mode_fixture( myRobotAddMethods, timedRobotExpectation, _expectFinished, @@ -567,7 +567,7 @@ def robotModeFixture( @pytest.fixture(scope="function") -def callSequenceStr( +def call_sequence_str( myRobotAddMethods, timedRobotExpectation, _expectFinished, @@ -731,19 +731,19 @@ class TestCanThrowExceptions: def test_robot_mode_with_exceptions( self, - getTestController, + get_test_controller, robot_with_sim_setup_teardown, - robotModeFixture, - callSequenceStr, + robot_mode_fixture, + call_sequence_str, ): - with getTestController.run_robot(): - rmf = robotModeFixture + with get_test_controller.runRobot(): + rmf = robot_mode_fixture periodS = robot_with_sim_setup_teardown.getPeriod() # Run disabled for a short period print( f"periodS={periodS} or {periodS*1.5} a={rmf.autonomous} t={rmf.test} e={rmf.enabled}" ) - getTestController.step_timing( + get_test_controller.stepTiming( seconds=periodS * 1.5, autonomous=rmf.autonomous, test=rmf.test, @@ -751,7 +751,7 @@ def test_robot_mode_with_exceptions( ) # Run in desired mode for 1 period - getTestController.step_timing( + get_test_controller.stepTiming( seconds=periodS, autonomous=rmf.autonomous, test=rmf.test, @@ -759,11 +759,11 @@ def test_robot_mode_with_exceptions( ) # Disabled for 1 period - getTestController.step_timing( + get_test_controller.stepTiming( seconds=periodS, autonomous=rmf.autonomous, test=rmf.test, enabled=False ) print(f"result={robot_with_sim_setup_teardown._callOrder}") - assert robot_with_sim_setup_teardown._callOrder == callSequenceStr + assert robot_with_sim_setup_teardown._callOrder == call_sequence_str @pytest.mark.parametrize( @@ -782,7 +782,7 @@ class TestSequenceThroughModes: def test_robot_mode_sequence( self, - getTestController, + get_test_controller, robot_with_sim_setup_teardown, ): callSequenceStr = ( @@ -815,10 +815,10 @@ def test_robot_mode_sequence( ":_simulationPeriodic+:_simulationPeriodic-:" ) - with getTestController.run_robot(): + with get_test_controller.runRobot(): periodS = robot_with_sim_setup_teardown.getPeriod() # Run disabled for a short period - getTestController.step_timing( + get_test_controller.stepTiming( seconds=periodS * 1.5, autonomous=True, test=False, @@ -826,7 +826,7 @@ def test_robot_mode_sequence( ) # Run in autonomous mode for 2 periods - getTestController.step_timing( + get_test_controller.stepTiming( seconds=periodS * 2, autonomous=True, test=False, @@ -834,12 +834,12 @@ def test_robot_mode_sequence( ) # Disabled for 1 period - getTestController.step_timing( + get_test_controller.stepTiming( seconds=periodS, autonomous=False, test=False, enabled=False ) # Run in teleop mode for 2 periods - getTestController.step_timing( + get_test_controller.stepTiming( seconds=periodS * 2, autonomous=False, test=False, @@ -847,12 +847,12 @@ def test_robot_mode_sequence( ) # Disabled for 1 period - getTestController.step_timing( + get_test_controller.stepTiming( seconds=periodS, autonomous=False, test=False, enabled=False ) # Run in test mode for 2 periods - getTestController.step_timing( + get_test_controller.stepTiming( seconds=periodS * 2, autonomous=False, test=True, @@ -860,7 +860,7 @@ def test_robot_mode_sequence( ) # Disabled for 1 period - getTestController.step_timing( + get_test_controller.stepTiming( seconds=periodS, autonomous=False, test=False, enabled=False ) @@ -951,23 +951,115 @@ class TestSequenceAddPeriodics: def test_robot_add_periodic( self, - getTestController, + get_test_controller, robot_with_sim_setup_teardown, ): - with getTestController.run_robot(): + with get_test_controller.runRobot(): periodS = robot_with_sim_setup_teardown.getPeriod() # Run disabled for a short period - getTestController.step_timing( + get_test_controller.stepTiming( seconds=periodS * 1.5, autonomous=True, test=False, enabled=False, ) - # Run disabled mode for periods - getTestController.step_timing( - seconds=periodS * 6, + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 1 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 1 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 1 + + # Run disabled mode for 1 period + get_test_controller.stepTiming( + seconds=periodS, + autonomous=False, + test=False, + enabled=False, + ) + + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 1 + + # Run disabled mode for 1 period + get_test_controller.stepTiming( + seconds=periodS, + autonomous=False, + test=False, + enabled=False, + ) + + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 3 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 3 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 3 + assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 1 + + # Run disabled mode for 1 period + get_test_controller.stepTiming( + seconds=periodS, + autonomous=False, + test=False, + enabled=False, + ) + + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 4 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 4 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 4 + assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_2"] == 1 + + # Run disabled mode for 1 period + get_test_controller.stepTiming( + seconds=periodS, + autonomous=False, + test=False, + enabled=False, + ) + + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 5 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 5 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 5 + assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_2"] == 1 + + # Run disabled mode for 1 period + get_test_controller.stepTiming( + seconds=periodS, + autonomous=False, + test=False, + enabled=False, + ) + + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 6 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 6 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 6 + assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 3 + assert robot_with_sim_setup_teardown._callCount["periodic_2"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_3"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_4"] == 1 + + # Run disabled mode for 1 period + get_test_controller.stepTiming( + seconds=periodS, autonomous=False, test=False, enabled=False, From 63bba96f38b5b77f72b63a4a526fa4d272ea047a Mon Sep 17 00:00:00 2001 From: Mike Stitt Date: Thu, 22 May 2025 20:54:21 -0400 Subject: [PATCH 5/8] Consolidate tests in test_timedrobot.py Signed-off-by: Mike Stitt --- .../tests/test_poc_timedrobot.py | 1078 ----------------- 1 file changed, 1078 deletions(-) delete mode 100644 subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py diff --git a/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py b/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py deleted file mode 100644 index cc7c9eaa0..000000000 --- a/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py +++ /dev/null @@ -1,1078 +0,0 @@ -""" -Proof of concept to test TimedRobotPy using PyTest - -This POC was made by deconstructing pytest_plugin.py so that it is no longer a plugin but a class that provides -fixtures. - -To run / debug this: - -pytest subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py --no-header -vvv -s - -""" - -import contextlib -import copy -from enum import Enum -import gc -import inspect -import pytest -import threading -import typing -import weakref - -import ntcore -import hal -import hal.simulation -import wpilib -from wpilib import RobotController -import wpilib.shuffleboard -from wpilib.simulation._simulation import _resetWpilibSimulationData -import wpilib.simulation -from wpilib.simulation import pauseTiming, restartTiming -from wpilib.simulation import DriverStationSim, stepTiming, stepTimingAsync -from wpilib.timedrobotpy import TimedRobotPy -from wpilib import TimedRobot - - -def nottest(obj): - obj.__test__ = False - return obj - - -@nottest -class TestController: - """ - Use this object to control the robot's state during tests - """ - - def __init__(self, reraise, robot: wpilib.RobotBase, expectFinished: bool) -> None: - self._reraise = reraise - - self._thread: typing.Optional[threading.Thread] = None - self._robot = robot - self._expectFinished = expectFinished - - self._cond = threading.Condition() - self._robotStarted = False - self._robotInitStarted = False - self._robotFinished = False - - def _onRobotInitStarted(self) -> None: - with self._cond: - self._robotInitStarted = True - self._cond.notify_all() - - def _robotThread(self, robot: TimedRobot | TimedRobotPy) -> None: - with self._cond: - self._robotStarted = True - self._cond.notify_all() - - with self._reraise(catch=True): - assert robot is not None # shouldn't happen... - - robot._TestRobot__robotInitStarted = self._onRobotInitStarted - - try: - robot.startCompetition() - assert self._expectFinished == self._robotFinished - finally: - del robot - - @contextlib.contextmanager - def runRobot(self) -> None: - """ - Use this in a "with" statement to start your robot code at the - beginning of the with block, and end your robot code at the end - of the with block. - - Your `robotInit` function will not be called until this function - is called. - """ - - # remove robot reference so it gets cleaned up when gc.collect() is called - robot = self._robot - self._robot = None - - self._thread = th = threading.Thread( - target=self._robotThread, args=(robot,), daemon=True - ) - th.start() - - with self._cond: - # make sure the thread didn't die - assert self._cond.wait_for(lambda: self._robotStarted, timeout=1) - - # If your robotInit is taking more than 2 seconds in simulation, you're - # probably doing something wrong... but if not, please report a bug! - assert self._cond.wait_for(lambda: self._robotInitStarted, timeout=2) - - try: - # in this block you should tell the sim to do sim things - yield - finally: - self._robotFinished = True - robot.endCompetition() - - if isinstance(self._reraise.exception, RuntimeError): - if str(self._reraise.exception).startswith( - "HAL: A handle parameter was passed incorrectly" - ): - msg = ( - "Do not reuse HAL objects in tests! This error may occur if you" - " stored a motor/sensor as a global or as a class variable" - " outside of a method." - ) - if hasattr(Exception, "add_note"): - self._reraise.exception.add_note(f"*** {msg}") - else: - e = self._reraise.exception - self._reraise.reset() - raise RuntimeError(msg) from e - - # Increment time by 1 second to ensure that any notifiers fire - stepTimingAsync(1.0) - - # the robot thread should exit quickly - th.join(timeout=1) - if th.is_alive(): - pytest.fail("robot did not exit within 2 seconds") - - self._thread = None - - @property - def robotIsAlive(self) -> bool: - """ - True if the robot code is alive - """ - th = self._thread - if not th: - return False - - return th.is_alive() - - def stepTiming( - self, - *, - seconds: float, - autonomous: bool = False, - test: bool = False, - enabled: bool = False, - assert_alive: bool = True, - ) -> float: - """ - This utility will increment simulated time, while pretending that - there's a driver station connected and delivering new packets - every 0.2 seconds. - - :param seconds: Number of seconds to run (will step in increments of 0.2) - :param autonomous: Tell the robot that it is in autonomous mode - :param enabled: Tell the robot that it is enabled - - :returns: Number of seconds time was incremented - """ - - if self._expectFinished: - assert self.robotIsAlive, "did you call control.run_robot()?" - - assert seconds > 0 - - DriverStationSim.setDsAttached(True) - DriverStationSim.setAutonomous(autonomous) - DriverStationSim.setTest(test) - DriverStationSim.setEnabled(enabled) - - tm = 0.0 - - while tm < seconds: - DriverStationSim.notifyNewData() - stepTiming(0.001) - if assert_alive and self._expectFinished: - assert self.robotIsAlive - tm += 0.001 - - return tm - - -@pytest.fixture(scope="function") -def decorated_robot_class(myrobot_class) -> tuple: - # attach physics - - robotClass = myrobot_class - - # Tests need to know when robotInit is called, so override the robot - # to do that - class TestRobot(robotClass): - def robotInit(self): - self.__robotInitStarted() - super().robotInit() - - TestRobot.__name__ = robotClass.__name__ - TestRobot.__module__ = robotClass.__module__ - TestRobot.__qualname__ = robotClass.__qualname__ - - return TestRobot - - -@pytest.fixture(scope="function") -def robot_with_sim_setup_teardown(decorated_robot_class): - """ - Your robot instance - - .. note:: RobotPy/WPILib testing infrastructure is really sensitive - to ensuring that things get cleaned up properly. Make sure - that you don't store references to your robot or other - WPILib objects in a global or static context. - """ - - # - # This function needs to do the same things that RobotBase.main does - # plus some extra things needed for testing - # - # Previously this was separate from robot fixture, but we need to - # ensure that the robot cleanup happens deterministically relative to - # when handle cleanup/etc happens, otherwise unnecessary HAL errors will - # bubble up to the user - # - - nt_inst = ntcore.NetworkTableInstance.getDefault() - nt_inst.startLocal() - - pauseTiming() - restartTiming() - - wpilib.DriverStation.silenceJoystickConnectionWarning(True) - DriverStationSim.setAutonomous(False) - DriverStationSim.setEnabled(False) - DriverStationSim.notifyNewData() - - robot = decorated_robot_class() - - # Tests only get a proxy to ensure cleanup is more reliable - yield weakref.proxy(robot) - - # HACK: avoid motor safety deadlock - wpilib.simulation._simulation._resetMotorSafety() - - del robot - - # Double-check all objects are destroyed so that HAL handles are released - gc.collect() - - # shutdown networktables before other kinds of global cleanup - # -> some reset functions will re-register listeners, so it's important - # to do this before so that the listeners are active on the current - # NetworkTables instance - nt_inst.stopLocal() - nt_inst._reset() - - # Cleanup WPILib globals - # -> preferences, SmartDashboard, Shuffleboard, LiveWindow, MotorSafety - wpilib.simulation._simulation._resetWpilibSimulationData() - wpilib._wpilib._clearSmartDashboardData() - wpilib.shuffleboard._shuffleboard._clearShuffleboardData() - - # Cancel all periodic callbacks - hal.simulation.cancelAllSimPeriodicCallbacks() - - # Reset the HAL handles - hal.simulation.resetGlobalHandles() - - # Reset the HAL data - hal.simulation.resetAllSimData() - - # Don't call HAL shutdown! This is only used to cleanup HAL extensions, - # and functions will only be called the first time (unless re-registered) - # hal.shutdown() - - -@pytest.fixture(scope="function") -def get_test_controller( - reraise, robot_with_sim_setup_teardown: wpilib.RobotBase, expect_finished: bool -) -> TestController: - """ - A pytest fixture that provides control over your robot_with_sim_setup_teardown - """ - return TestController(reraise, robot_with_sim_setup_teardown, expect_finished) - - -class TimedRobotPyExpectsException(TimedRobotPy): - - def __init__(self, period=TimedRobotPy.kDefaultPeriod): - super().__init__(period=period) - self._callOrder = ":" - - def startCompetition(self) -> None: - hasAssertionError = False - try: - super().startCompetition() - except AssertionError: - hasAssertionError = True - print(f"TimedRobotPyExpectsException hasAssertionError={hasAssertionError}") - assert hasAssertionError - - -class TimedRobotPyDoNotExpectException(TimedRobotPy): - - def __init__(self, period=TimedRobotPy.kDefaultPeriod): - super().__init__(period=period) - self._callOrder = ":" - - def startCompetition(self) -> None: - hasAssertionError = False - try: - super().startCompetition() - except AssertionError: - hasAssertionError = True - print(f"TimedRobotPyExpectsException hasAssertionError={hasAssertionError}") - assert not hasAssertionError - - -def getFPGATimeInSecondsAsStr(): - return f"{RobotController.getFPGATime()/1000_000.0:.3f}s" - - -def printEntryAndExit(func): - def wrapper(*args, **kwargs): - name = func.__name__ - args[0]._callOrder += name + "+:" - print(f"{getFPGATimeInSecondsAsStr()}:Enter:{name}") - result = func(*args, **kwargs) - args[0]._callOrder += name + "-:" - print(f"{getFPGATimeInSecondsAsStr()}:Exit_:{name}") - return result - - return wrapper - - -class MyRobotDefaultPass: - - @printEntryAndExit - def robotInit(self): - pass - - @printEntryAndExit - def robotPeriodic(self): - pass - - @printEntryAndExit - def autonomousInit(self): - pass - - @printEntryAndExit - def autonomousPeriodic(self): - pass - - @printEntryAndExit - def autonomousExit(self): - pass - - @printEntryAndExit - def teleopInit(self): - pass - - @printEntryAndExit - def teleopPeriodic(self): - pass - - @printEntryAndExit - def teleopExit(self): - pass - - @printEntryAndExit - def testInit(self): - pass - - @printEntryAndExit - def testPeriodic(self): - pass - - @printEntryAndExit - def testExit(self): - pass - - @printEntryAndExit - def disabledInit(self): - pass - - @printEntryAndExit - def disabledPeriodic(self): - pass - - @printEntryAndExit - def disabledExit(self): - pass - - @printEntryAndExit - def _simulationInit(self): - pass - - @printEntryAndExit - def _simulationPeriodic(self): - pass - - -class MyRobotRobotInitFails(MyRobotDefaultPass): - @printEntryAndExit - def robotInit(self): - assert False - - -class MyRobotRobotPeriodicFails(MyRobotDefaultPass): - @printEntryAndExit - def robotPeriodic(self): - assert False - - -class MyRobotAutonomousInitFails(MyRobotDefaultPass): - @printEntryAndExit - def autonomousInit(self): - assert False - - -class MyRobotAutonomousPeriodicFails(MyRobotDefaultPass): - @printEntryAndExit - def autonomousPeriodic(self): - assert False - - -class MyRobotAutonomousExitFails(MyRobotDefaultPass): - @printEntryAndExit - def autonomousExit(self): - assert False - - -class MyRobotTeleopInitFails(MyRobotDefaultPass): - @printEntryAndExit - def teleopInit(self): - assert False - - -class MyRobotTeleopPeriodicFails(MyRobotDefaultPass): - @printEntryAndExit - def teleopPeriodic(self): - assert False - - -class MyRobotTeleopExitFails(MyRobotDefaultPass): - @printEntryAndExit - def teleopExit(self): - assert False - - -class MyRobotDisabledInitFails(MyRobotDefaultPass): - @printEntryAndExit - def disabledInit(self): - assert False - - -class MyRobotDisabledPeriodicFails(MyRobotDefaultPass): - @printEntryAndExit - def disabledPeriodic(self): - assert False - - -class MyRobotDisabledExitFails(MyRobotDefaultPass): - @printEntryAndExit - def disabledExit(self): - assert False - - -class MyRobotTestInitFails(MyRobotDefaultPass): - @printEntryAndExit - def testInit(self): - assert False - - -class MyRobotTestPeriodicFails(MyRobotDefaultPass): - @printEntryAndExit - def testPeriodic(self): - assert False - - -class MyRobotTestExitFails(MyRobotDefaultPass): - @printEntryAndExit - def testExit(self): - assert False - - -class ExpectFinished(Enum): - kNotFinished = 0 - kFinished = 1 - - -class RobotMode(Enum): - kNone = 0 - kDisabled = 1 - kAutonomous = 2 - kTeleop = 3 - kTest = 4 - - def __init__(self, code): - self._code = code - - @property - def autonomous(self): - return self is RobotMode.kAutonomous - - @property - def test(self): - return self is RobotMode.kTest - - @property - def enabled(self): - return (self is not RobotMode.kDisabled) and (self is not RobotMode.kNone) - - -@pytest.fixture(scope="function") -def myrobot_class( - myRobotAddMethods, - timedRobotExpectation, - _expectFinished, - _robotMode, - _callSequenceStr, -) -> type[TimedRobotPy]: - class MyRobot(myRobotAddMethods, timedRobotExpectation): - - @printEntryAndExit - def startCompetition(self): - super().startCompetition() - - @printEntryAndExit - def endCompetition(self): - super().endCompetition() - - return MyRobot - - -@pytest.fixture(scope="function") -def expect_finished( - myRobotAddMethods, - timedRobotExpectation, - _expectFinished, - _robotMode, - _callSequenceStr, -) -> bool: - return _expectFinished is ExpectFinished.kFinished - - -@pytest.fixture(scope="function") -def robot_mode_fixture( - myRobotAddMethods, - timedRobotExpectation, - _expectFinished, - _robotMode, - _callSequenceStr, -) -> RobotMode: - return _robotMode - - -@pytest.fixture(scope="function") -def call_sequence_str( - myRobotAddMethods, - timedRobotExpectation, - _expectFinished, - _robotMode, - _callSequenceStr, -) -> str: - return _callSequenceStr - - -@pytest.mark.parametrize( - "myRobotAddMethods, timedRobotExpectation, _expectFinished, _robotMode, _callSequenceStr", - [ - ( - MyRobotDefaultPass, - TimedRobotPyDoNotExpectException, - ExpectFinished.kFinished, - RobotMode.kAutonomous, - ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":autonomousInit+:autonomousInit-:autonomousPeriodic+:autonomousPeriodic-" - ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" - ":autonomousExit+:autonomousExit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:", - ), - ( - MyRobotDefaultPass, - TimedRobotPyDoNotExpectException, - ExpectFinished.kFinished, - RobotMode.kTeleop, - ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":teleopInit+:teleopInit-:teleopPeriodic+:teleopPeriodic-" - ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" - ":teleopExit+:teleopExit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:", - ), - ( - MyRobotDefaultPass, - TimedRobotPyDoNotExpectException, - ExpectFinished.kFinished, - RobotMode.kTest, - ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":testInit+:testInit-:testPeriodic+:testPeriodic-" - ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" - ":testExit+:testExit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:", - ), - ( - MyRobotRobotInitFails, - TimedRobotPyExpectsException, - ExpectFinished.kNotFinished, - RobotMode.kAutonomous, - ":startCompetition+:robotInit+:startCompetition-:", - ), - ( - MyRobotAutonomousInitFails, - TimedRobotPyExpectsException, - ExpectFinished.kNotFinished, - RobotMode.kAutonomous, - ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":autonomousInit+:startCompetition-:", - ), - ( - MyRobotAutonomousPeriodicFails, - TimedRobotPyExpectsException, - ExpectFinished.kNotFinished, - RobotMode.kAutonomous, - ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":autonomousInit+:autonomousInit-:autonomousPeriodic+:startCompetition-:", - ), - ( - MyRobotAutonomousExitFails, - TimedRobotPyExpectsException, - ExpectFinished.kNotFinished, - RobotMode.kAutonomous, - ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":autonomousInit+:autonomousInit-:autonomousPeriodic+:autonomousPeriodic-" - ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" - ":autonomousExit+:startCompetition-:", - ), - ( - MyRobotTeleopInitFails, - TimedRobotPyExpectsException, - ExpectFinished.kNotFinished, - RobotMode.kTeleop, - ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":teleopInit+:startCompetition-:", - ), - ( - MyRobotTeleopPeriodicFails, - TimedRobotPyExpectsException, - ExpectFinished.kNotFinished, - RobotMode.kTeleop, - ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":teleopInit+:teleopInit-:teleopPeriodic+:startCompetition-:", - ), - ( - MyRobotTeleopExitFails, - TimedRobotPyExpectsException, - ExpectFinished.kNotFinished, - RobotMode.kTeleop, - ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":teleopInit+:teleopInit-:teleopPeriodic+:teleopPeriodic-" - ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" - ":teleopExit+:startCompetition-:", - ), - ( - MyRobotTestInitFails, - TimedRobotPyExpectsException, - ExpectFinished.kNotFinished, - RobotMode.kTest, - ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":testInit+:startCompetition-:", - ), - ( - MyRobotTestPeriodicFails, - TimedRobotPyExpectsException, - ExpectFinished.kNotFinished, - RobotMode.kTest, - ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":testInit+:testInit-:testPeriodic+:startCompetition-:", - ), - ( - MyRobotTestExitFails, - TimedRobotPyExpectsException, - ExpectFinished.kNotFinished, - RobotMode.kTest, - ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":testInit+:testInit-:testPeriodic+:testPeriodic-" - ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" - ":testExit+:startCompetition-:", - ), - ], -) -class TestCanThrowExceptions: - - def test_robot_mode_with_exceptions( - self, - get_test_controller, - robot_with_sim_setup_teardown, - robot_mode_fixture, - call_sequence_str, - ): - with get_test_controller.runRobot(): - rmf = robot_mode_fixture - periodS = robot_with_sim_setup_teardown.getPeriod() - # Run disabled for a short period - print( - f"periodS={periodS} or {periodS*1.5} a={rmf.autonomous} t={rmf.test} e={rmf.enabled}" - ) - get_test_controller.stepTiming( - seconds=periodS * 1.5, - autonomous=rmf.autonomous, - test=rmf.test, - enabled=False, - ) - - # Run in desired mode for 1 period - get_test_controller.stepTiming( - seconds=periodS, - autonomous=rmf.autonomous, - test=rmf.test, - enabled=rmf.enabled, - ) - - # Disabled for 1 period - get_test_controller.stepTiming( - seconds=periodS, autonomous=rmf.autonomous, test=rmf.test, enabled=False - ) - print(f"result={robot_with_sim_setup_teardown._callOrder}") - assert robot_with_sim_setup_teardown._callOrder == call_sequence_str - - -@pytest.mark.parametrize( - "myRobotAddMethods, timedRobotExpectation, _expectFinished, _robotMode, _callSequenceStr", - [ - ( - MyRobotDefaultPass, - TimedRobotPyDoNotExpectException, - ExpectFinished.kFinished, - None, - None, - ), - ], -) -class TestSequenceThroughModes: - - def test_robot_mode_sequence( - self, - get_test_controller, - robot_with_sim_setup_teardown, - ): - callSequenceStr = ( - ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":autonomousInit+:autonomousInit-" - ":autonomousPeriodic+:autonomousPeriodic-" - ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" - ":autonomousPeriodic+:autonomousPeriodic-" - ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" - ":autonomousExit+:autonomousExit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":teleopInit+:teleopInit-" - ":teleopPeriodic+:teleopPeriodic-" - ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" - ":teleopPeriodic+:teleopPeriodic-" - ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" - ":teleopExit+:teleopExit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" - ":testInit+:testInit-" - ":testPeriodic+:testPeriodic-" - ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" - ":testPeriodic+:testPeriodic-" - ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" - ":testExit+:testExit-" - ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" - ":_simulationPeriodic+:_simulationPeriodic-:" - ) - - with get_test_controller.runRobot(): - periodS = robot_with_sim_setup_teardown.getPeriod() - # Run disabled for a short period - get_test_controller.stepTiming( - seconds=periodS * 1.5, - autonomous=True, - test=False, - enabled=False, - ) - - # Run in autonomous mode for 2 periods - get_test_controller.stepTiming( - seconds=periodS * 2, - autonomous=True, - test=False, - enabled=True, - ) - - # Disabled for 1 period - get_test_controller.stepTiming( - seconds=periodS, autonomous=False, test=False, enabled=False - ) - - # Run in teleop mode for 2 periods - get_test_controller.stepTiming( - seconds=periodS * 2, - autonomous=False, - test=False, - enabled=True, - ) - - # Disabled for 1 period - get_test_controller.stepTiming( - seconds=periodS, autonomous=False, test=False, enabled=False - ) - - # Run in test mode for 2 periods - get_test_controller.stepTiming( - seconds=periodS * 2, - autonomous=False, - test=True, - enabled=True, - ) - - # Disabled for 1 period - get_test_controller.stepTiming( - seconds=periodS, autonomous=False, test=False, enabled=False - ) - - print(f"result={robot_with_sim_setup_teardown._callOrder}") - assert robot_with_sim_setup_teardown._callOrder == callSequenceStr - - -class TimedRobotDoNotExpectException(TimedRobot): - - def __init__(self, period=TimedRobotPy.kDefaultPeriod): - super().__init__(period=period) - self._callOrder = ":" - - def startCompetition(self) -> None: - hasAssertionError = False - try: - super().startCompetition() - except AssertionError: - hasAssertionError = True - print(f"TimedRobotPyExpectsException hasAssertionError={hasAssertionError}") - assert not hasAssertionError - - -class TimedRobotDoNotExpectException1msPeriod(TimedRobotDoNotExpectException): - - def __init__(self, period=0.010): - super().__init__(period=period) - - -class TimedRobotPyDoNotExpectException1msPeriod(TimedRobotPyDoNotExpectException): - - def __init__(self, period=0.010): - super().__init__(period=period) - - -class MyRobotAddPeriodic: - - def addCallLog(self, name: str): - self._calls.append({"name": name, "time": RobotController.getFPGATime()}) - if name not in self._callCount: - self._callCount[name] = 0 - self._callCount[name] += 1 - - @printEntryAndExit - def _periodicN(self, name: str): - print( - f"{getFPGATimeInSecondsAsStr()}:Function {name} executed count={self.count}" - ) - self.addCallLog(name) - - @printEntryAndExit - def robotInit(self): - self.count = 0 - - self._calls = [] - self._callCount = {} - - self.addPeriodic(lambda: self._periodicN("periodic_0_0"), 0.010, 0.001) - self.addPeriodic(lambda: self._periodicN("periodic_0_1"), 0.010, 0.001) - - @printEntryAndExit - def robotPeriodic(self): - self.count += 1 - self.addCallLog("robotPeriodic") - - name = f"periodic_{self.count}" - - def periodic_N(): - self._periodicN(name) - - self.addPeriodic(periodic_N, 0.020, 0.002) - - -@pytest.mark.parametrize( - "myRobotAddMethods, timedRobotExpectation, _expectFinished, _robotMode, _callSequenceStr", - [ - ( - MyRobotAddPeriodic, - TimedRobotDoNotExpectException1msPeriod, - # TimedRobotPyDoNotExpectException1msPeriod, - ExpectFinished.kFinished, - None, - None, - ), - ], -) -class TestSequenceAddPeriodics: - - def test_robot_add_periodic( - self, - get_test_controller, - robot_with_sim_setup_teardown, - ): - - with get_test_controller.runRobot(): - periodS = robot_with_sim_setup_teardown.getPeriod() - # Run disabled for a short period - get_test_controller.stepTiming( - seconds=periodS * 1.5, - autonomous=True, - test=False, - enabled=False, - ) - - for key, value in robot_with_sim_setup_teardown._callCount.items(): - print(f"{key}:callCount={value}") - - assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 1 - assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 1 - assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 1 - - # Run disabled mode for 1 period - get_test_controller.stepTiming( - seconds=periodS, - autonomous=False, - test=False, - enabled=False, - ) - - for key, value in robot_with_sim_setup_teardown._callCount.items(): - print(f"{key}:callCount={value}") - - assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 2 - assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 2 - assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 2 - assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 1 - - # Run disabled mode for 1 period - get_test_controller.stepTiming( - seconds=periodS, - autonomous=False, - test=False, - enabled=False, - ) - - for key, value in robot_with_sim_setup_teardown._callCount.items(): - print(f"{key}:callCount={value}") - - assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 3 - assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 3 - assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 3 - assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 1 - - # Run disabled mode for 1 period - get_test_controller.stepTiming( - seconds=periodS, - autonomous=False, - test=False, - enabled=False, - ) - - for key, value in robot_with_sim_setup_teardown._callCount.items(): - print(f"{key}:callCount={value}") - - assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 4 - assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 4 - assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 4 - assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 2 - assert robot_with_sim_setup_teardown._callCount["periodic_2"] == 1 - - # Run disabled mode for 1 period - get_test_controller.stepTiming( - seconds=periodS, - autonomous=False, - test=False, - enabled=False, - ) - - for key, value in robot_with_sim_setup_teardown._callCount.items(): - print(f"{key}:callCount={value}") - - assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 5 - assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 5 - assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 5 - assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 2 - assert robot_with_sim_setup_teardown._callCount["periodic_2"] == 1 - - # Run disabled mode for 1 period - get_test_controller.stepTiming( - seconds=periodS, - autonomous=False, - test=False, - enabled=False, - ) - - for key, value in robot_with_sim_setup_teardown._callCount.items(): - print(f"{key}:callCount={value}") - - assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 6 - assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 6 - assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 6 - assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 3 - assert robot_with_sim_setup_teardown._callCount["periodic_2"] == 2 - assert robot_with_sim_setup_teardown._callCount["periodic_3"] == 2 - assert robot_with_sim_setup_teardown._callCount["periodic_4"] == 1 - - # Run disabled mode for 1 period - get_test_controller.stepTiming( - seconds=periodS, - autonomous=False, - test=False, - enabled=False, - ) - - for key, value in robot_with_sim_setup_teardown._callCount.items(): - print(f"{key}:callCount={value}") - - assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 7 - assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 7 - assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 7 - assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 3 - assert robot_with_sim_setup_teardown._callCount["periodic_2"] == 2 - assert robot_with_sim_setup_teardown._callCount["periodic_3"] == 2 - assert robot_with_sim_setup_teardown._callCount["periodic_4"] == 1 - assert robot_with_sim_setup_teardown._callCount["periodic_5"] == 1 From 1dd067edddbb5c964a8f5c2029aabced1e5cefbc Mon Sep 17 00:00:00 2001 From: Mike Stitt Date: Thu, 22 May 2025 20:54:37 -0400 Subject: [PATCH 6/8] Consolidate tests in test_timedrobot.py Signed-off-by: Mike Stitt --- .../robotpy-wpilib/tests/test_timedrobot.py | 1073 ++++++++++++++++- 1 file changed, 1072 insertions(+), 1 deletion(-) diff --git a/subprojects/robotpy-wpilib/tests/test_timedrobot.py b/subprojects/robotpy-wpilib/tests/test_timedrobot.py index 168d5b878..4ef7570c7 100644 --- a/subprojects/robotpy-wpilib/tests/test_timedrobot.py +++ b/subprojects/robotpy-wpilib/tests/test_timedrobot.py @@ -1,6 +1,33 @@ +""" " + +Test TimedRobotPy and IterativeRobotPy + +To run / debug this: + +pytest subprojects/robotpy-wpilib/tests/test_timedrobot.py --no-header -vvv -s + +""" + +import contextlib +from enum import Enum +import gc import pytest +import threading +import typing +import weakref -from wpilib.timedrobotpy import _Callback +import ntcore +import hal +import hal.simulation +import wpilib +from wpilib import RobotController +import wpilib.shuffleboard +from wpilib.simulation._simulation import _resetWpilibSimulationData +import wpilib.simulation +from wpilib.simulation import pauseTiming, restartTiming +from wpilib.simulation import DriverStationSim, stepTiming, stepTimingAsync +from wpilib.timedrobotpy import TimedRobotPy, _Callback +from wpilib import TimedRobot def test_calcFutureExpirationUs() -> None: @@ -39,3 +66,1047 @@ def test_calcFutureExpirationUs() -> None: cb.calcFutureExpirationUs(1_000_000_000_000_001_001) == 1_000_000_000_000_002_000 ) + + +def nottest(obj): + obj.__test__ = False + return obj + + +@nottest +class TestController: + """ + Use this object to control the robot's state during tests + """ + + def __init__(self, reraise, robot: wpilib.RobotBase, expectFinished: bool) -> None: + self._reraise = reraise + + self._thread: typing.Optional[threading.Thread] = None + self._robot = robot + self._expectFinished = expectFinished + + self._cond = threading.Condition() + self._robotStarted = False + self._robotInitStarted = False + self._robotFinished = False + + def _onRobotInitStarted(self) -> None: + with self._cond: + self._robotInitStarted = True + self._cond.notify_all() + + def _robotThread(self, robot: TimedRobot | TimedRobotPy) -> None: + with self._cond: + self._robotStarted = True + self._cond.notify_all() + + with self._reraise(catch=True): + assert robot is not None # shouldn't happen... + + robot._TestRobot__robotInitStarted = self._onRobotInitStarted + + try: + robot.startCompetition() + assert self._expectFinished == self._robotFinished + finally: + del robot + + @contextlib.contextmanager + def runRobot(self) -> None: + """ + Use this in a "with" statement to start your robot code at the + beginning of the with block, and end your robot code at the end + of the with block. + + Your `robotInit` function will not be called until this function + is called. + """ + + # remove robot reference so it gets cleaned up when gc.collect() is called + robot = self._robot + self._robot = None + + self._thread = th = threading.Thread( + target=self._robotThread, args=(robot,), daemon=True + ) + th.start() + + with self._cond: + # make sure the thread didn't die + assert self._cond.wait_for(lambda: self._robotStarted, timeout=1) + + # If your robotInit is taking more than 2 seconds in simulation, you're + # probably doing something wrong... but if not, please report a bug! + assert self._cond.wait_for(lambda: self._robotInitStarted, timeout=2) + + try: + # in this block you should tell the sim to do sim things + yield + finally: + self._robotFinished = True + robot.endCompetition() + + if isinstance(self._reraise.exception, RuntimeError): + if str(self._reraise.exception).startswith( + "HAL: A handle parameter was passed incorrectly" + ): + msg = ( + "Do not reuse HAL objects in tests! This error may occur if you" + " stored a motor/sensor as a global or as a class variable" + " outside of a method." + ) + if hasattr(Exception, "add_note"): + self._reraise.exception.add_note(f"*** {msg}") + else: + e = self._reraise.exception + self._reraise.reset() + raise RuntimeError(msg) from e + + # Increment time by 1 second to ensure that any notifiers fire + stepTimingAsync(1.0) + + # the robot thread should exit quickly + th.join(timeout=1) + if th.is_alive(): + pytest.fail("robot did not exit within 2 seconds") + + self._thread = None + + @property + def robotIsAlive(self) -> bool: + """ + True if the robot code is alive + """ + th = self._thread + if not th: + return False + + return th.is_alive() + + def stepTiming( + self, + *, + seconds: float, + autonomous: bool = False, + test: bool = False, + enabled: bool = False, + assert_alive: bool = True, + ) -> float: + """ + This utility will increment simulated time, while pretending that + there's a driver station connected and delivering new packets + every 0.2 seconds. + + :param seconds: Number of seconds to run (will step in increments of 0.2) + :param autonomous: Tell the robot that it is in autonomous mode + :param enabled: Tell the robot that it is enabled + + :returns: Number of seconds time was incremented + """ + + if self._expectFinished: + assert self.robotIsAlive, "did you call control.run_robot()?" + + assert seconds > 0 + + DriverStationSim.setDsAttached(True) + DriverStationSim.setAutonomous(autonomous) + DriverStationSim.setTest(test) + DriverStationSim.setEnabled(enabled) + + tm = 0.0 + + while tm < seconds: + DriverStationSim.notifyNewData() + stepTiming(0.001) + if assert_alive and self._expectFinished: + assert self.robotIsAlive + tm += 0.001 + + return tm + + +@pytest.fixture(scope="function") +def decorated_robot_class(myrobot_class) -> tuple: + # attach physics + + robotClass = myrobot_class + + # Tests need to know when robotInit is called, so override the robot + # to do that + class TestRobot(robotClass): + def robotInit(self): + self.__robotInitStarted() + super().robotInit() + + TestRobot.__name__ = robotClass.__name__ + TestRobot.__module__ = robotClass.__module__ + TestRobot.__qualname__ = robotClass.__qualname__ + + return TestRobot + + +@pytest.fixture(scope="function") +def robot_with_sim_setup_teardown(decorated_robot_class): + """ + Your robot instance + + .. note:: RobotPy/WPILib testing infrastructure is really sensitive + to ensuring that things get cleaned up properly. Make sure + that you don't store references to your robot or other + WPILib objects in a global or static context. + """ + + # + # This function needs to do the same things that RobotBase.main does + # plus some extra things needed for testing + # + # Previously this was separate from robot fixture, but we need to + # ensure that the robot cleanup happens deterministically relative to + # when handle cleanup/etc happens, otherwise unnecessary HAL errors will + # bubble up to the user + # + + nt_inst = ntcore.NetworkTableInstance.getDefault() + nt_inst.startLocal() + + pauseTiming() + restartTiming() + + wpilib.DriverStation.silenceJoystickConnectionWarning(True) + DriverStationSim.setAutonomous(False) + DriverStationSim.setEnabled(False) + DriverStationSim.notifyNewData() + + robot = decorated_robot_class() + + # Tests only get a proxy to ensure cleanup is more reliable + yield weakref.proxy(robot) + + # HACK: avoid motor safety deadlock + wpilib.simulation._simulation._resetMotorSafety() + + del robot + + # Double-check all objects are destroyed so that HAL handles are released + gc.collect() + + # shutdown networktables before other kinds of global cleanup + # -> some reset functions will re-register listeners, so it's important + # to do this before so that the listeners are active on the current + # NetworkTables instance + nt_inst.stopLocal() + nt_inst._reset() + + # Cleanup WPILib globals + # -> preferences, SmartDashboard, Shuffleboard, LiveWindow, MotorSafety + wpilib.simulation._simulation._resetWpilibSimulationData() + wpilib._wpilib._clearSmartDashboardData() + wpilib.shuffleboard._shuffleboard._clearShuffleboardData() + + # Cancel all periodic callbacks + hal.simulation.cancelAllSimPeriodicCallbacks() + + # Reset the HAL handles + hal.simulation.resetGlobalHandles() + + # Reset the HAL data + hal.simulation.resetAllSimData() + + # Don't call HAL shutdown! This is only used to cleanup HAL extensions, + # and functions will only be called the first time (unless re-registered) + # hal.shutdown() + + +@pytest.fixture(scope="function") +def get_test_controller( + reraise, robot_with_sim_setup_teardown: wpilib.RobotBase, expect_finished: bool +) -> TestController: + """ + A pytest fixture that provides control over your robot_with_sim_setup_teardown + """ + return TestController(reraise, robot_with_sim_setup_teardown, expect_finished) + + +class TimedRobotPyExpectsException(TimedRobotPy): + + def __init__(self, period=TimedRobotPy.kDefaultPeriod): + super().__init__(period=period) + self._callOrder = ":" + + def startCompetition(self) -> None: + hasAssertionError = False + try: + super().startCompetition() + except AssertionError: + hasAssertionError = True + print(f"TimedRobotPyExpectsException hasAssertionError={hasAssertionError}") + assert hasAssertionError + + +class TimedRobotPyDoNotExpectException(TimedRobotPy): + + def __init__(self, period=TimedRobotPy.kDefaultPeriod): + super().__init__(period=period) + self._callOrder = ":" + + def startCompetition(self) -> None: + hasAssertionError = False + try: + super().startCompetition() + except AssertionError: + hasAssertionError = True + print(f"TimedRobotPyExpectsException hasAssertionError={hasAssertionError}") + assert not hasAssertionError + + +def getFPGATimeInSecondsAsStr(): + return f"{RobotController.getFPGATime()/1000_000.0:.3f}s" + + +def printEntryAndExit(func): + def wrapper(*args, **kwargs): + name = func.__name__ + args[0]._callOrder += name + "+:" + print(f"{getFPGATimeInSecondsAsStr()}:Enter:{name}") + result = func(*args, **kwargs) + args[0]._callOrder += name + "-:" + print(f"{getFPGATimeInSecondsAsStr()}:Exit_:{name}") + return result + + return wrapper + + +class MyRobotDefaultPass: + + @printEntryAndExit + def robotInit(self): + pass + + @printEntryAndExit + def robotPeriodic(self): + pass + + @printEntryAndExit + def autonomousInit(self): + pass + + @printEntryAndExit + def autonomousPeriodic(self): + pass + + @printEntryAndExit + def autonomousExit(self): + pass + + @printEntryAndExit + def teleopInit(self): + pass + + @printEntryAndExit + def teleopPeriodic(self): + pass + + @printEntryAndExit + def teleopExit(self): + pass + + @printEntryAndExit + def testInit(self): + pass + + @printEntryAndExit + def testPeriodic(self): + pass + + @printEntryAndExit + def testExit(self): + pass + + @printEntryAndExit + def disabledInit(self): + pass + + @printEntryAndExit + def disabledPeriodic(self): + pass + + @printEntryAndExit + def disabledExit(self): + pass + + @printEntryAndExit + def _simulationInit(self): + pass + + @printEntryAndExit + def _simulationPeriodic(self): + pass + + +class MyRobotRobotInitFails(MyRobotDefaultPass): + @printEntryAndExit + def robotInit(self): + assert False + + +class MyRobotRobotPeriodicFails(MyRobotDefaultPass): + @printEntryAndExit + def robotPeriodic(self): + assert False + + +class MyRobotAutonomousInitFails(MyRobotDefaultPass): + @printEntryAndExit + def autonomousInit(self): + assert False + + +class MyRobotAutonomousPeriodicFails(MyRobotDefaultPass): + @printEntryAndExit + def autonomousPeriodic(self): + assert False + + +class MyRobotAutonomousExitFails(MyRobotDefaultPass): + @printEntryAndExit + def autonomousExit(self): + assert False + + +class MyRobotTeleopInitFails(MyRobotDefaultPass): + @printEntryAndExit + def teleopInit(self): + assert False + + +class MyRobotTeleopPeriodicFails(MyRobotDefaultPass): + @printEntryAndExit + def teleopPeriodic(self): + assert False + + +class MyRobotTeleopExitFails(MyRobotDefaultPass): + @printEntryAndExit + def teleopExit(self): + assert False + + +class MyRobotDisabledInitFails(MyRobotDefaultPass): + @printEntryAndExit + def disabledInit(self): + assert False + + +class MyRobotDisabledPeriodicFails(MyRobotDefaultPass): + @printEntryAndExit + def disabledPeriodic(self): + assert False + + +class MyRobotDisabledExitFails(MyRobotDefaultPass): + @printEntryAndExit + def disabledExit(self): + assert False + + +class MyRobotTestInitFails(MyRobotDefaultPass): + @printEntryAndExit + def testInit(self): + assert False + + +class MyRobotTestPeriodicFails(MyRobotDefaultPass): + @printEntryAndExit + def testPeriodic(self): + assert False + + +class MyRobotTestExitFails(MyRobotDefaultPass): + @printEntryAndExit + def testExit(self): + assert False + + +class ExpectFinished(Enum): + kNotFinished = 0 + kFinished = 1 + + +class RobotMode(Enum): + kNone = 0 + kDisabled = 1 + kAutonomous = 2 + kTeleop = 3 + kTest = 4 + + def __init__(self, code): + self._code = code + + @property + def autonomous(self): + return self is RobotMode.kAutonomous + + @property + def test(self): + return self is RobotMode.kTest + + @property + def enabled(self): + return (self is not RobotMode.kDisabled) and (self is not RobotMode.kNone) + + +@pytest.fixture(scope="function") +def myrobot_class( + myRobotAddMethods, + timedRobotExpectation, + _expectFinished, + _robotMode, + _callSequenceStr, +) -> type[TimedRobotPy]: + class MyRobot(myRobotAddMethods, timedRobotExpectation): + + @printEntryAndExit + def startCompetition(self): + super().startCompetition() + + @printEntryAndExit + def endCompetition(self): + super().endCompetition() + + return MyRobot + + +@pytest.fixture(scope="function") +def expect_finished( + myRobotAddMethods, + timedRobotExpectation, + _expectFinished, + _robotMode, + _callSequenceStr, +) -> bool: + return _expectFinished is ExpectFinished.kFinished + + +@pytest.fixture(scope="function") +def robot_mode_fixture( + myRobotAddMethods, + timedRobotExpectation, + _expectFinished, + _robotMode, + _callSequenceStr, +) -> RobotMode: + return _robotMode + + +@pytest.fixture(scope="function") +def call_sequence_str( + myRobotAddMethods, + timedRobotExpectation, + _expectFinished, + _robotMode, + _callSequenceStr, +) -> str: + return _callSequenceStr + + +@pytest.mark.parametrize( + "myRobotAddMethods, timedRobotExpectation, _expectFinished, _robotMode, _callSequenceStr", + [ + ( + MyRobotDefaultPass, + TimedRobotPyDoNotExpectException, + ExpectFinished.kFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:autonomousInit-:autonomousPeriodic+:autonomousPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":autonomousExit+:autonomousExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:", + ), + ( + MyRobotDefaultPass, + TimedRobotPyDoNotExpectException, + ExpectFinished.kFinished, + RobotMode.kTeleop, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:teleopInit-:teleopPeriodic+:teleopPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":teleopExit+:teleopExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:", + ), + ( + MyRobotDefaultPass, + TimedRobotPyDoNotExpectException, + ExpectFinished.kFinished, + RobotMode.kTest, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:testInit-:testPeriodic+:testPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":testExit+:testExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:", + ), + ( + MyRobotRobotInitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:startCompetition-:", + ), + ( + MyRobotAutonomousInitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:startCompetition-:", + ), + ( + MyRobotAutonomousPeriodicFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:autonomousInit-:autonomousPeriodic+:startCompetition-:", + ), + ( + MyRobotAutonomousExitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:autonomousInit-:autonomousPeriodic+:autonomousPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":autonomousExit+:startCompetition-:", + ), + ( + MyRobotTeleopInitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTeleop, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:startCompetition-:", + ), + ( + MyRobotTeleopPeriodicFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTeleop, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:teleopInit-:teleopPeriodic+:startCompetition-:", + ), + ( + MyRobotTeleopExitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTeleop, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:teleopInit-:teleopPeriodic+:teleopPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":teleopExit+:startCompetition-:", + ), + ( + MyRobotTestInitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTest, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:startCompetition-:", + ), + ( + MyRobotTestPeriodicFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTest, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:testInit-:testPeriodic+:startCompetition-:", + ), + ( + MyRobotTestExitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTest, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:testInit-:testPeriodic+:testPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":testExit+:startCompetition-:", + ), + ], +) +class TestCanThrowExceptions: + + def test_robot_mode_with_exceptions( + self, + get_test_controller, + robot_with_sim_setup_teardown, + robot_mode_fixture, + call_sequence_str, + ): + with get_test_controller.runRobot(): + rmf = robot_mode_fixture + periodS = robot_with_sim_setup_teardown.getPeriod() + # Run disabled for a short period + print( + f"periodS={periodS} or {periodS*1.5} a={rmf.autonomous} t={rmf.test} e={rmf.enabled}" + ) + get_test_controller.stepTiming( + seconds=periodS * 1.5, + autonomous=rmf.autonomous, + test=rmf.test, + enabled=False, + ) + + # Run in desired mode for 1 period + get_test_controller.stepTiming( + seconds=periodS, + autonomous=rmf.autonomous, + test=rmf.test, + enabled=rmf.enabled, + ) + + # Disabled for 1 period + get_test_controller.stepTiming( + seconds=periodS, autonomous=rmf.autonomous, test=rmf.test, enabled=False + ) + print(f"result={robot_with_sim_setup_teardown._callOrder}") + assert robot_with_sim_setup_teardown._callOrder == call_sequence_str + + +@pytest.mark.parametrize( + "myRobotAddMethods, timedRobotExpectation, _expectFinished, _robotMode, _callSequenceStr", + [ + ( + MyRobotDefaultPass, + TimedRobotPyDoNotExpectException, + ExpectFinished.kFinished, + None, + None, + ), + ], +) +class TestSequenceThroughModes: + + def test_robot_mode_sequence( + self, + get_test_controller, + robot_with_sim_setup_teardown, + ): + callSequenceStr = ( + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:autonomousInit-" + ":autonomousPeriodic+:autonomousPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":autonomousPeriodic+:autonomousPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":autonomousExit+:autonomousExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:teleopInit-" + ":teleopPeriodic+:teleopPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":teleopPeriodic+:teleopPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":teleopExit+:teleopExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:testInit-" + ":testPeriodic+:testPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":testPeriodic+:testPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":testExit+:testExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:" + ) + + with get_test_controller.runRobot(): + periodS = robot_with_sim_setup_teardown.getPeriod() + # Run disabled for a short period + get_test_controller.stepTiming( + seconds=periodS * 1.5, + autonomous=True, + test=False, + enabled=False, + ) + + # Run in autonomous mode for 2 periods + get_test_controller.stepTiming( + seconds=periodS * 2, + autonomous=True, + test=False, + enabled=True, + ) + + # Disabled for 1 period + get_test_controller.stepTiming( + seconds=periodS, autonomous=False, test=False, enabled=False + ) + + # Run in teleop mode for 2 periods + get_test_controller.stepTiming( + seconds=periodS * 2, + autonomous=False, + test=False, + enabled=True, + ) + + # Disabled for 1 period + get_test_controller.stepTiming( + seconds=periodS, autonomous=False, test=False, enabled=False + ) + + # Run in test mode for 2 periods + get_test_controller.stepTiming( + seconds=periodS * 2, + autonomous=False, + test=True, + enabled=True, + ) + + # Disabled for 1 period + get_test_controller.stepTiming( + seconds=periodS, autonomous=False, test=False, enabled=False + ) + + print(f"result={robot_with_sim_setup_teardown._callOrder}") + assert robot_with_sim_setup_teardown._callOrder == callSequenceStr + + +class TimedRobotDoNotExpectException(TimedRobot): + + def __init__(self, period=TimedRobotPy.kDefaultPeriod): + super().__init__(period=period) + self._callOrder = ":" + + def startCompetition(self) -> None: + hasAssertionError = False + try: + super().startCompetition() + except AssertionError: + hasAssertionError = True + print(f"TimedRobotPyExpectsException hasAssertionError={hasAssertionError}") + assert not hasAssertionError + + +class TimedRobotDoNotExpectException1msPeriod(TimedRobotDoNotExpectException): + + def __init__(self, period=0.010): + super().__init__(period=period) + + +class TimedRobotPyDoNotExpectException1msPeriod(TimedRobotPyDoNotExpectException): + + def __init__(self, period=0.010): + super().__init__(period=period) + + +class MyRobotAddPeriodic: + + def addCallLog(self, name: str): + self._calls.append({"name": name, "time": RobotController.getFPGATime()}) + if name not in self._callCount: + self._callCount[name] = 0 + self._callCount[name] += 1 + + @printEntryAndExit + def _periodicN(self, name: str): + print( + f"{getFPGATimeInSecondsAsStr()}:Function {name} executed count={self.count}" + ) + self.addCallLog(name) + + @printEntryAndExit + def robotInit(self): + self.count = 0 + + self._calls = [] + self._callCount = {} + + self.addPeriodic(lambda: self._periodicN("periodic_0_0"), 0.010, 0.001) + self.addPeriodic(lambda: self._periodicN("periodic_0_1"), 0.010, 0.001) + + @printEntryAndExit + def robotPeriodic(self): + self.count += 1 + self.addCallLog("robotPeriodic") + + name = f"periodic_{self.count}" + + def periodic_N(): + self._periodicN(name) + + self.addPeriodic(periodic_N, 0.020, 0.002) + + +@pytest.mark.parametrize( + "myRobotAddMethods, timedRobotExpectation, _expectFinished, _robotMode, _callSequenceStr", + [ + ( + MyRobotAddPeriodic, + TimedRobotDoNotExpectException1msPeriod, + # TimedRobotPyDoNotExpectException1msPeriod, + ExpectFinished.kFinished, + None, + None, + ), + ], +) +class TestSequenceAddPeriodics: + + def test_robot_add_periodic( + self, + get_test_controller, + robot_with_sim_setup_teardown, + ): + + with get_test_controller.runRobot(): + periodS = robot_with_sim_setup_teardown.getPeriod() + # Run disabled for a short period + get_test_controller.stepTiming( + seconds=periodS * 1.5, + autonomous=True, + test=False, + enabled=False, + ) + + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 1 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 1 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 1 + + # Run disabled mode for 1 period + get_test_controller.stepTiming( + seconds=periodS, + autonomous=False, + test=False, + enabled=False, + ) + + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 1 + + # Run disabled mode for 1 period + get_test_controller.stepTiming( + seconds=periodS, + autonomous=False, + test=False, + enabled=False, + ) + + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 3 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 3 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 3 + assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 1 + + # Run disabled mode for 1 period + get_test_controller.stepTiming( + seconds=periodS, + autonomous=False, + test=False, + enabled=False, + ) + + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 4 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 4 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 4 + assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_2"] == 1 + + # Run disabled mode for 1 period + get_test_controller.stepTiming( + seconds=periodS, + autonomous=False, + test=False, + enabled=False, + ) + + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 5 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 5 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 5 + assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_2"] == 1 + + # Run disabled mode for 1 period + get_test_controller.stepTiming( + seconds=periodS, + autonomous=False, + test=False, + enabled=False, + ) + + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 6 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 6 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 6 + assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 3 + assert robot_with_sim_setup_teardown._callCount["periodic_2"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_3"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_4"] == 1 + + # Run disabled mode for 1 period + get_test_controller.stepTiming( + seconds=periodS, + autonomous=False, + test=False, + enabled=False, + ) + + for key, value in robot_with_sim_setup_teardown._callCount.items(): + print(f"{key}:callCount={value}") + + assert robot_with_sim_setup_teardown._callCount["robotPeriodic"] == 7 + assert robot_with_sim_setup_teardown._callCount["periodic_0_0"] == 7 + assert robot_with_sim_setup_teardown._callCount["periodic_0_1"] == 7 + assert robot_with_sim_setup_teardown._callCount["periodic_1"] == 3 + assert robot_with_sim_setup_teardown._callCount["periodic_2"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_3"] == 2 + assert robot_with_sim_setup_teardown._callCount["periodic_4"] == 1 + assert robot_with_sim_setup_teardown._callCount["periodic_5"] == 1 From 0f33e6824b0e60d053b5d1a6772f268c539e4b49 Mon Sep 17 00:00:00 2001 From: Mike Stitt Date: Fri, 23 May 2025 08:11:40 -0400 Subject: [PATCH 7/8] Fix build for python 3.9 Signed-off-by: Mike Stitt --- subprojects/robotpy-wpilib/tests/test_timedrobot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subprojects/robotpy-wpilib/tests/test_timedrobot.py b/subprojects/robotpy-wpilib/tests/test_timedrobot.py index 4ef7570c7..94d246d82 100644 --- a/subprojects/robotpy-wpilib/tests/test_timedrobot.py +++ b/subprojects/robotpy-wpilib/tests/test_timedrobot.py @@ -96,7 +96,7 @@ def _onRobotInitStarted(self) -> None: self._robotInitStarted = True self._cond.notify_all() - def _robotThread(self, robot: TimedRobot | TimedRobotPy) -> None: + def _robotThread(self, robot: TimedRobotPy) -> None: with self._cond: self._robotStarted = True self._cond.notify_all() From 9b71587ad10902446388b4a7b51843091e2a87d5 Mon Sep 17 00:00:00 2001 From: spiresFrc9106 Date: Fri, 23 May 2025 12:43:59 -0400 Subject: [PATCH 8/8] Improve comments, siftup rather than pop and add. Signed-off-by: spiresFrc9106 --- .../robotpy-wpilib/wpilib/timedrobotpy.py | 47 +++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py b/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py index e1fe39e24..39dc3a9c0 100644 --- a/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py +++ b/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py @@ -1,5 +1,5 @@ from typing import Any, Callable, Iterable, ClassVar -from heapq import heappush, heappop +from heapq import heappush, heappop, _siftup from hal import ( report, initializeNotifier, @@ -24,6 +24,9 @@ class _Callback: + + __slots__ = "func", "_periodUs", "expirationUs" + def __init__( self, func: Callable[[], None], @@ -96,6 +99,9 @@ def __repr__(self) -> str: class _OrderedList: + + __slots__ = "_data" + def __init__(self) -> None: self._data: list[Any] = [] @@ -113,6 +119,9 @@ def peek( else: return None + def siftupRoot(self): + _siftup(self._data, 0) + def __len__(self) -> int: return len(self._data) @@ -189,13 +198,20 @@ def startCompetition(self) -> None: # We don't have to check there's an element in the queue first because # there's always at least one (the constructor adds one). It's re-enqueued # at the end of the loop. - callback = self._callbacks.pop() + callback = self._callbacks.peek() status = updateNotifierAlarm(self._notifier, callback.expirationUs) if status != 0: raise RuntimeError(f"updateNotifierAlarm() returned {status}") self._loopStartTimeUs, status = waitForNotifierAlarm(self._notifier) + + # The C++ code that this was based upon used the following line to establish + # the loopStart time. Uncomment it and + # the "self._loopStartTimeUs = startTimeUs" further below to emulate the + # legacy behavior. + # startTimeUs = _getFPGATime() # uncomment this for legacy behavior + if status != 0: raise RuntimeError( f"waitForNotifierAlarm() returned _loopStartTimeUs={self._loopStartTimeUs} status={status}" @@ -207,11 +223,24 @@ def startCompetition(self) -> None: # See the API for waitForNotifierAlarm break + # On a RoboRio 2, the following print statement results in values like: + # print(f"expUs={callback.expirationUs} current={self._loopStartTimeUs}, legacy={startTimeUs}") + # [2.27] expUs=3418017 current=3418078, legacy=3418152 + # [2.29] expUs=3438017 current=3438075, legacy=3438149 + # This indicates that there is about 60 microseconds of skid from + # callback.expirationUs to self._loopStartTimeUs + # and there is about 70 microseconds of skid from self._loopStartTimeUs to startTimeUs. + # Consequently, this code uses "self._loopStartTimeUs, status = waitForNotifierAlarm" + # to establish loopStartTime, rather than slowing down the code by adding an extra call to + # "startTimeUs = _getFPGATime()". + + # self._loopStartTimeUs = startTimeUs # Uncomment this line for legacy behavior. + self._runCallbackAndReschedule(callback) # Process all other callbacks that are ready to run while self._callbacks.peek().expirationUs <= self._loopStartTimeUs: - callback = self._callbacks.pop() + callback = self._callbacks.peek() self._runCallbackAndReschedule(callback) finally: # pytests hang on PC when we don't force a call to self._stopNotifier() @@ -224,7 +253,8 @@ def _runCallbackAndReschedule(self, callback: _Callback) -> None: # that ran long we immediately push the next invocation to the # following period. callback.setNextStartTimeUs(_getFPGATime()) - self._callbacks.add(callback) + # assert callback is self._callbacks.peek() + self._callbacks.siftupRoot() def _stopNotifier(self): stopNotifier(self._notifier) @@ -269,8 +299,9 @@ def addPeriodic( to TimedRobotPy. """ - self._callbacks.add( - _Callback.makeCallBack( - callback, self._startTimeUs, int(period * 1e6), int(offset * 1e6) - ) + cb = _Callback.makeCallBack( + callback, self._startTimeUs, int(period * 1e6), int(offset * 1e6) ) + if len(self._callbacks): + assert cb > self._callbacks.peek() + self._callbacks.add(cb)