diff --git a/commands2/__init__.py b/commands2/__init__.py index 39e92022..fa39862c 100644 --- a/commands2/__init__.py +++ b/commands2/__init__.py @@ -1,4 +1,6 @@ from . import _init_impl +from .trigger import Trigger +from .coroutinecommand import CoroutineCommand, commandify from .version import version as __version__ @@ -40,7 +42,7 @@ TrapezoidProfileCommandRadians, TrapezoidProfileSubsystem, TrapezoidProfileSubsystemRadians, - Trigger, + # Trigger, WaitCommand, WaitUntilCommand, # button, @@ -85,10 +87,13 @@ "TrapezoidProfileCommandRadians", "TrapezoidProfileSubsystem", "TrapezoidProfileSubsystemRadians", - "Trigger", + # "_Trigger", "WaitCommand", "WaitUntilCommand", # "button", # "cmd", "requirementsDisjoint", + "commandify", + "CoroutineCommand", + "Trigger", ] diff --git a/commands2/button/__init__.py b/commands2/button/__init__.py index 651dc05b..1cfdff38 100644 --- a/commands2/button/__init__.py +++ b/commands2/button/__init__.py @@ -1,13 +1,13 @@ -# autogenerated by 'robotpy-build create-imports commands2.button commands2._impl.button' +from .button import Button +from .joystickbutton import JoystickButton +from .networkbutton import NetworkButton +from .povbutton import POVButton + from .._impl.button import ( - Button, CommandGenericHID, CommandJoystick, CommandPS4Controller, CommandXboxController, - JoystickButton, - NetworkButton, - POVButton, ) __all__ = [ diff --git a/commands2/button/button.py b/commands2/button/button.py new file mode 100644 index 00000000..d0cafb86 --- /dev/null +++ b/commands2/button/button.py @@ -0,0 +1,17 @@ +from ..trigger import Trigger + + +class Button(Trigger): + """ + A class used to bind command scheduling to button presses. + Can be composed with other buttons with the operators in Trigger. + + @see Trigger + """ + + whenPressed = Trigger.whenActive + whenReleased = Trigger.whenInactive + whileHeld = Trigger.whileActiveContinous + whenHeld = Trigger.whileActiveOnce + toggleWhenPressed = Trigger.toggleWhenActive + cancelWhenPressed = Trigger.cancelWhenActive diff --git a/commands2/button/joystickbutton.py b/commands2/button/joystickbutton.py new file mode 100644 index 00000000..f34a9ad2 --- /dev/null +++ b/commands2/button/joystickbutton.py @@ -0,0 +1,27 @@ +from wpilib import Joystick + +from .button import Button + + +class JoystickButton(Button): + """ + A class used to bind command scheduling to joystick button presses. + Can be composed with other buttons with the operators in Trigger. + + @see Trigger + """ + + def __init__(self, joystick: Joystick, button: int) -> None: + """ + Creates a JoystickButton that commands can be bound to. + + :param joystick: The joystick on which the button is located. + :param button: The number of the button on the joystick. + """ + if not isinstance(joystick, Joystick) or not isinstance(button, int): + raise TypeError( + "JoystickButton.__init__(): incompatible constructor arguments. The following argument types are supported:\n" + "\t1. commands2.button.JoystickButton(joystick: Joystick, button: int)\n" + f"Invoked with: {joystick}, {button}" + ) + super().__init__(lambda: joystick.getRawButton(button)) diff --git a/commands2/button/networkbutton.py b/commands2/button/networkbutton.py new file mode 100644 index 00000000..b0b9302d --- /dev/null +++ b/commands2/button/networkbutton.py @@ -0,0 +1,63 @@ +from ntcore import NetworkTable, NetworkTableEntry + +from typing import Union, overload + +from .button import Button + + +class NetworkButton(Button): + """ + A class used to bind command scheduling to a NetworkTable boolean fields. + Can be composed with other buttons with the operators in Trigger. + + @see Trigger + """ + + @overload + def __init__(self, entry: NetworkTableEntry) -> None: + """ + Creates a NetworkButton that commands can be bound to. + + :param entry: The entry that is the value. + """ + + @overload + def __init__(self, table: Union[NetworkTable, str], field: str) -> None: + """ + Creates a NetworkButton that commands can be bound to. + + :param table: The table where the networktable value is located. + :param field: The field that is the value. + """ + + def __init__(self, *args, **kwargs) -> None: + num_args = len(args) + len(kwargs) + if num_args == 1: + entry: NetworkTableEntry = kwargs.get("entry") or args[0] + + if not isinstance(entry, NetworkTableEntry): + raise self._type_error(entry) + + super().__init__( + lambda: NetworkTables.isConnected() and entry.getBoolean(False) + ) + elif num_args == 2: + table = kwargs.get("table") or args[0] + field = kwargs.get("field") or args[-1] + + if isinstance(table, str): + table = NetworkTables.getTable(table) + + entry = table.getEntry(field) + self.__init__(entry) + else: + raise self._type_error(args) + + def _type_error(self, *args): + return TypeError( + "NetworkButton.__init__(): incompatible constructor arguments. The following argument types are supported:\n" + "\t1. commands2.button.NetworkButton(entry: NetworkTableEntry)\n" + "\t2. commands2.button.NetworkButton(table: str, field: str)\n" + "\t3. commands2.button.NetworkButton(table: NetworkTable, field: str)\n" + f"Invoked with: {', '.join(map(str, args))}" + ) diff --git a/commands2/button/povbutton.py b/commands2/button/povbutton.py new file mode 100644 index 00000000..a029fc44 --- /dev/null +++ b/commands2/button/povbutton.py @@ -0,0 +1,32 @@ +from wpilib import Joystick + +from .button import Button + + +class POVButton(Button): + """ + A class used to bind command scheduling to joystick POV presses. + Can be composed with other buttons with the operators in Trigger. + + @see Trigger + """ + + def __init__(self, joystick: Joystick, angle: int, povNumber: int = 0) -> None: + """ + Creates a POVButton that commands can be bound to. + + :param joystick: The joystick on which the button is located. + :param angle: The angle of the POV corresponding to a button press. + :param povNumber: The number of the POV on the joystick. + """ + if ( + not isinstance(joystick, Joystick) + or not isinstance(angle, int) + or not isinstance(povNumber, int) + ): + raise TypeError( + "POVButton.__init__(): incompatible constructor arguments. The following argument types are supported:\n" + "\t1. commands2.button.POVButton(joystick: Joystick, angle: int, povNumber: int)\n" + f"Invoked with: {joystick}, {angle}, {povNumber}" + ) + super().__init__(lambda: joystick.getPOV(povNumber) == angle) diff --git a/commands2/coroutinecommand.py b/commands2/coroutinecommand.py new file mode 100644 index 00000000..f91fc9ce --- /dev/null +++ b/commands2/coroutinecommand.py @@ -0,0 +1,141 @@ +from functools import wraps +from typing import Any, Callable, Generator, List, Union, Optional, overload +from ._impl import CommandBase, Subsystem +import inspect +from typing_extensions import TypeGuard + +Coroutine = Generator[None, None, None] +CoroutineFunction = Callable[[], Generator[None, None, None]] +Coroutineable = Union[Callable[[], None], CoroutineFunction] + + +def is_coroutine(func: Any) -> TypeGuard[Coroutine]: + return inspect.isgenerator(func) + + +def is_coroutine_function(func: Any) -> TypeGuard[CoroutineFunction]: + return inspect.isgeneratorfunction(func) + + +def is_coroutineable(func: Any) -> TypeGuard[Coroutineable]: + return is_coroutine_function(func) or callable(func) + + +def ensure_generator_function(func: Coroutineable) -> Callable[..., Coroutine]: + if is_coroutine_function(func): + return func + + @wraps(func) + def wrapper(*args, **kwargs): + func(*args, **kwargs) + return + yield + + return wrapper + + +class CoroutineCommand(CommandBase): + """ + A class that wraps a coroutine function into a command. + """ + + coroutine: Optional[Coroutine] + coroutine_function: Optional[Coroutineable] + is_finished: bool + + def __init__( + self, + coroutine: Union[Coroutine, Coroutineable], + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> None: + """ + Creates a CoroutineCommand than can be used as a command. + + :param coroutine: The coroutine or coroutine function to bind. + :param requirements: The subsystems that this command requires. + :param runs_when_disabled: Whether or not this command runs when the robot is disabled. + """ + super().__init__() + self.coroutine = None + self.coroutine_function = None + self.runsWhenDisabled = lambda: runs_when_disabled + + if is_coroutine(coroutine): + self.coroutine = coroutine + elif is_coroutineable(coroutine): + self.coroutine_function = coroutine + else: + raise TypeError("The coroutine must be a coroutine or a coroutine function") + + if requirements is not None: + self.addRequirements(requirements) + + self.is_finished = False + + def initialize(self) -> None: + if self.coroutine_function: + self.coroutine = ensure_generator_function(self.coroutine_function)() + elif self.coroutine and self.is_finished: + raise RuntimeError("Generator objects cannot be reused.") + + self.is_finished = False + + def execute(self): + print("r") + try: + if not self.is_finished: + if not self.coroutine: + raise TypeError("This command was not properly initialized") + next(self.coroutine) + except StopIteration: + self.is_finished = True + + def isFinished(self): + return self.is_finished + + +@overload +def commandify( + *, requirements: Optional[List[Subsystem]] = None, runs_when_disabled: bool = False +) -> Callable[[Coroutineable], Callable[..., CoroutineCommand]]: + """ + A decorator that turns a coroutine function into a command. + A def should be under this. + + :param requirements: The subsystems that this command requires. + :param runs_when_disabled: Whether or not this command runs when the robot is disabled. + """ + + +@overload +def commandify(coroutine: Coroutineable) -> Callable[..., CoroutineCommand]: + """ + A decorator that turns a coroutine function into a command. + A def should be under this. + """ + + +def commandify( + coroutine: Optional[Coroutineable] = None, + *, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, +) -> Union[ + Callable[[Coroutineable], Callable[..., CoroutineCommand]], + Callable[..., CoroutineCommand], +]: + def wrapper(func: Coroutineable) -> Callable[..., CoroutineCommand]: + @wraps(func) + def arg_accepter(*args, **kwargs) -> CoroutineCommand: + return CoroutineCommand( + lambda: ensure_generator_function(func)(*args, **kwargs), + requirements, + ) + + return arg_accepter + + if coroutine is None: + return wrapper + + return wrapper(coroutine) diff --git a/commands2/src/Command.cpp.inl b/commands2/src/Command.cpp.inl index 8bc1ab1c..055a24ec 100644 --- a/commands2/src/Command.cpp.inl +++ b/commands2/src/Command.cpp.inl @@ -159,5 +159,17 @@ cls_Command "\n" ":returns: the command with the timeout added\n" DECORATOR_NOTE) + .def("__iter__", + [](std::shared_ptr self) { + return py::make_iterator(CommandIterator(self), CommandIteratorSentinel()); + }, + py::keep_alive<0, 1>(), + "Creates an Iterator for this command. The iterator will run the command and\n" + "will only exhaust when the command is finished.\n" + "Note that the iterator will not run the command in the background. It must be\n" + "explicitly be iterated over.\n" + "\n" + ":returns: an iterator for this command\n" + ) ; diff --git a/commands2/src/helpers.cpp b/commands2/src/helpers.cpp index e5d8319e..6b337e3f 100644 --- a/commands2/src/helpers.cpp +++ b/commands2/src/helpers.cpp @@ -23,4 +23,20 @@ std::vector pyargs2SubsystemList(py::args subs) { subsystems.emplace_back(py::cast(sub)); } return subsystems; -} \ No newline at end of file +} + +CommandIterator::CommandIterator(std::shared_ptr cmd) : cmd(cmd) {} +std::shared_ptr CommandIterator::operator*() const { return cmd; } +CommandIterator& CommandIterator::operator++() { + if (!called_initialize) { + cmd->Initialize(); + called_initialize = true; + return *this; + } + cmd->Execute(); + return *this; +} + +bool operator==(const CommandIterator& it, const CommandIteratorSentinel&) { + return it.called_initialize && it.cmd->IsFinished(); +} diff --git a/commands2/src/helpers.h b/commands2/src/helpers.h index c3e3649a..e5c42199 100644 --- a/commands2/src/helpers.h +++ b/commands2/src/helpers.h @@ -16,4 +16,17 @@ std::shared_ptr convertToSharedPtrHack(T *orig) { py::object pyo = py::cast(orig); return py::cast>(pyo); -} \ No newline at end of file +} + +class CommandIterator { + public: + std::shared_ptr cmd; + bool called_initialize = false; + explicit CommandIterator(std::shared_ptr cmd); + std::shared_ptr operator*() const; + CommandIterator& operator++(); +}; + +class CommandIteratorSentinel {}; + +bool operator==(const CommandIterator& it, const CommandIteratorSentinel&); diff --git a/commands2/trigger.py b/commands2/trigger.py new file mode 100644 index 00000000..42ab3dcd --- /dev/null +++ b/commands2/trigger.py @@ -0,0 +1,508 @@ +from typing import Callable, Optional, overload, List, Union + +from ._impl import Command, Subsystem, _Trigger + +from .coroutinecommand import CoroutineCommand, Coroutineable, Coroutine + +from wpimath.filter import Debouncer + + +class Trigger: + """ + A class used to bind command scheduling to events. The + Trigger class is a base for all command-event-binding classes, and so the + methods are named fairly abstractly; for purpose-specific wrappers, see + Button. + + @see Button + """ + + @overload + def __init__(self, is_active: Callable[[], bool] = lambda: False) -> None: + """ + Create a new trigger that is active when the given condition is true. + + :param is_active: Whether the trigger is active. + + Create a new trigger that is never active (default constructor) - activity + can be further determined by subclass code. + """ + + @overload + def __init__(self, is_active: _Trigger) -> None: + """ + Create a new trigger from an existing c++ trigger. + Robot code does not need to use this constructor. + + :param is_active: The c++ trigger to wrap. + """ + + def __init__( + self, is_active: Union[Callable[[], bool], _Trigger] = lambda: False + ) -> None: + if isinstance(is_active, _Trigger): + self._trigger = is_active + else: + self._trigger = _Trigger(is_active) + + def __bool__(self) -> bool: + """ + Returns whether or not the trigger is currently active + """ + return bool(self._trigger) + + def get(self) -> bool: + """ + Returns whether or not the trigger is currently active + """ + return bool(self) + + def __call__(self) -> bool: + """ + Returns whether or not the trigger is currently active + """ + return bool(self) + + def __and__(self, other: "Trigger") -> "Trigger": + """ + Composes this trigger with another trigger, returning a new trigger that is active when both + triggers are active. + + :param trigger: the trigger to compose with + + :returns: the trigger that is active when both triggers are active + """ + return Trigger(lambda: self() and other()) + + def __or__(self, other: "Trigger") -> "Trigger": + """ + Composes this trigger with another trigger, returning a new trigger that is active when either + triggers are active. + + :param trigger: the trigger to compose with + + :returns: the trigger that is active when both triggers are active + """ + return Trigger(lambda: self() or other()) + + def __invert__(self) -> "Trigger": + """ + Creates a new trigger that is active when this trigger is inactive, i.e. that acts as the + negation of this trigger. + + :param trigger: the trigger to compose with + + :returns: the trigger that is active when both triggers are active + """ + return Trigger(lambda: not self()) + + and_ = __and__ + or_ = __or__ + not_ = __invert__ + + def debounce(self, debounceTime: float, type: Debouncer.DebounceType) -> "Trigger": + """ + Creates a new debounced trigger from this trigger - it will become active + when this trigger has been active for longer than the specified period. + + :param debounceTime: The debounce period. + :param type: The debounce type. + + :returns: The debounced trigger. + """ + return Trigger(self._trigger.debounce(debounceTime, type)) + + def cancelWhenActive(self, command: Command) -> None: + """ + Binds a command to be canceled when the trigger becomes active. Takes a + raw pointer, and so is non-owning; users are responsible for the lifespan + and scheduling of the command. + + :param command: The command to bind. + """ + self._trigger.cancelWhenActive(command) + + @overload + def whenActive( + self, command_or_coroutine: Command, interruptible: bool = True + ) -> None: + """ + Binds a command to start when the trigger becomes active. + + :param command: The command to bind. + :param interruptible: Whether the command should be interruptible. + """ + + @overload + def whenActive( + self, + command_or_coroutine: Union[Coroutine, Coroutineable], + *, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> None: + """ + Binds a coroutine to start when the trigger becomes active. + + :param coroutine: The coroutine or coroutine function to bind. + :param interruptible: Whether the command should be interruptible. + :param requirements: The subsystems required to run the coroutine. + :param runs_when_disabled: Whether the coroutine should run when the subsystem is disabled. + """ + + @overload + def whenActive( + self, + *, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> Callable[[Coroutineable], None]: + """ + Binds a coroutine to start when the trigger becomes active (Decorator Form). + A def should be under this. + + :param interruptible: Whether the command should be interruptible. + :param requirements: The subsystems required to run the coroutine. + :param runs_when_disabled: Whether the coroutine should run when the subsystem is disabled. + """ + + def whenActive( + self, + command_or_coroutine: Optional[Union[Command, Coroutine, Coroutineable]] = None, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> Union[None, Callable[[Coroutineable], None]]: + if command_or_coroutine is None: + + def wrapper(coroutine: Coroutineable) -> None: + self.whenActive( + coroutine, + interruptible=interruptible, + requirements=requirements, + runs_when_disabled=runs_when_disabled, + ) + + return wrapper + + if isinstance(command_or_coroutine, Command): + self._trigger.whenActive(command_or_coroutine, interruptible) + return + + self._trigger.whenActive( + CoroutineCommand(command_or_coroutine, requirements, runs_when_disabled), + interruptible, + ) + return + + @overload + def whenInactive( + self, command_or_coroutine: Command, interruptible: bool = True + ) -> None: + """ + Binds a command to start when the trigger becomes inactive. + + :param command: The command to bind. + :param interruptible: Whether the command should be interruptible. + """ + + @overload + def whenInactive( + self, + command_or_coroutine: Union[Coroutine, Coroutineable], + *, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> None: + """ + Binds a coroutine to start when the trigger becomes inactive. + + :param coroutine: The coroutine or coroutine function to bind. + :param interruptible: Whether the command should be interruptible. + :param requirements: The subsystems required to run the coroutine. + :param runs_when_disabled: Whether the coroutine should run when the subsystem is disabled. + """ + + @overload + def whenInactive( + self, + *, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> Callable[[Coroutineable], None]: + """ + Binds a coroutine to start when the trigger becomes active (Decorator Form). + A def should be under this. + + :param interruptible: Whether the command should be interruptible. + :param requirements: The subsystems required to run the coroutine. + :param runs_when_disabled: Whether the coroutine should run when the subsystem is disabled. + """ + + def whenInactive( + self, + command_or_coroutine: Optional[Union[Command, Coroutine, Coroutineable]] = None, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> Union[None, Callable[[Coroutineable], None]]: + if command_or_coroutine is None: + + def wrapper(coroutine: Coroutineable) -> None: + self.whenInactive( + coroutine, + interruptible=interruptible, + requirements=requirements, + runs_when_disabled=runs_when_disabled, + ) + + return wrapper + + if isinstance(command_or_coroutine, Command): + self._trigger.whenInactive(command_or_coroutine, interruptible) + return + + self._trigger.whenInactive( + CoroutineCommand(command_or_coroutine, requirements, runs_when_disabled), + interruptible, + ) + return + + @overload + def whileActiveContinous( + self, command_or_coroutine: Command, interruptible: bool = True + ) -> None: + """ + Binds a command to be started repeatedly while the trigger is active, and + canceled when it becomes inactive. + + :param command: The command to bind. + :param interruptible: Whether the command should be interruptible. + """ + + @overload + def whileActiveContinous( + self, + command_or_coroutine: Union[Coroutine, Coroutineable], + *, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> None: + """ + Binds a command to be started repeatedly while the trigger is active, and + canceled when it becomes inactive. + + :param coroutine: The coroutine or coroutine function to bind. + :param interruptible: Whether the command should be interruptible. + :param requirements: The subsystems required to run the coroutine. + :param runs_when_disabled: Whether the coroutine should run when the subsystem is disabled. + """ + + @overload + def whileActiveContinous( + self, + *, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> Callable[[Coroutineable], None]: + """ + Binds a command to be started repeatedly while the trigger is active, and + canceled when it becomes inactive (Decorator Form). + A def should be under this. + + :param interruptible: Whether the command should be interruptible. + :param requirements: The subsystems required to run the coroutine. + :param runs_when_disabled: Whether the coroutine should run when the subsystem is disabled. + """ + + def whileActiveContinous( + self, + command_or_coroutine: Optional[Union[Command, Coroutine, Coroutineable]] = None, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> Union[None, Callable[[Coroutineable], None]]: + if command_or_coroutine is None: + + def wrapper(coroutine: Coroutineable) -> None: + self.whileActiveContinous( + coroutine, + interruptible=interruptible, + requirements=requirements, + runs_when_disabled=runs_when_disabled, + ) + + return wrapper + + if isinstance(command_or_coroutine, Command): + self._trigger.whileActiveContinous(command_or_coroutine, interruptible) + return + + self._trigger.whileActiveContinous( + CoroutineCommand(command_or_coroutine, requirements, runs_when_disabled), + interruptible, + ) + return + + @overload + def whileActiveOnce( + self, command_or_coroutine: Command, interruptible: bool = True + ) -> None: + """ + Binds a command to be started when the trigger becomes active, and + canceled when it becomes inactive. + + :param command: The command to bind. + :param interruptible: Whether the command should be interruptible. + """ + + @overload + def whileActiveOnce( + self, + command_or_coroutine: Union[Coroutine, Coroutineable], + *, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> None: + """ + Binds a command to be started when the trigger becomes active, and + canceled when it becomes inactive. + + :param coroutine: The coroutine or coroutine function to bind. + :param interruptible: Whether the command should be interruptible. + :param requirements: The subsystems required to run the coroutine. + :param runs_when_disabled: Whether the coroutine should run when the subsystem is disabled. + """ + + @overload + def whileActiveOnce( + self, + *, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> Callable[[Coroutineable], None]: + """ + Binds a command to be started when the trigger becomes active, and + canceled when it becomes inactive (Decorator Form). + A def should be under this. + + :param interruptible: Whether the command should be interruptible. + :param requirements: The subsystems required to run the coroutine. + :param runs_when_disabled: Whether the coroutine should run when the subsystem is disabled. + """ + + def whileActiveOnce( + self, + command_or_coroutine: Optional[Union[Command, Coroutine, Coroutineable]] = None, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> Union[None, Callable[[Coroutineable], None]]: + if command_or_coroutine is None: + + def wrapper(coroutine: Coroutineable) -> None: + self.whileActiveOnce( + coroutine, + interruptible=interruptible, + requirements=requirements, + runs_when_disabled=runs_when_disabled, + ) + + return wrapper + + if isinstance(command_or_coroutine, Command): + self._trigger.whileActiveOnce(command_or_coroutine, interruptible) + return + + self._trigger.whileActiveOnce( + CoroutineCommand(command_or_coroutine, requirements, runs_when_disabled), + interruptible, + ) + return + + @overload + def toggleWhenActive( + self, command_or_coroutine: Command, interruptible: bool = True + ) -> None: + """ + Binds a command to start when the trigger becomes active, and be canceled + when it again becomes active. + + :param command: The command to bind. + :param interruptible: Whether the command should be interruptible. + """ + ... + + @overload + def toggleWhenActive( + self, + command_or_coroutine: Union[Coroutine, Coroutineable], + *, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> None: + """ + Binds a command to start when the trigger becomes active, and be canceled + when it again becomes active. + + :param coroutine: The coroutine or coroutine function to bind. + :param interruptible: Whether the command should be interruptible. + :param requirements: The subsystems required to run the coroutine. + :param runs_when_disabled: Whether the coroutine should run when the subsystem is disabled. + """ + + @overload + def toggleWhenActive( + self, + *, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> Callable[[Coroutineable], None]: + """ + Binds a command to start when the trigger becomes active, and be canceled + when it again becomes active (Decorator Form). + A def should be under this. + + :param interruptible: Whether the command should be interruptible. + :param requirements: The subsystems required to run the coroutine. + :param runs_when_disabled: Whether the coroutine should run when the subsystem is disabled. + """ + + def toggleWhenActive( + self, + command_or_coroutine: Optional[Union[Command, Coroutine, Coroutineable]] = None, + interruptible: bool = True, + requirements: Optional[List[Subsystem]] = None, + runs_when_disabled: bool = False, + ) -> Union[None, Callable[[Coroutineable], None]]: + if command_or_coroutine is None: + + def wrapper(coroutine: Coroutineable) -> None: + self.toggleWhenActive( + coroutine, + interruptible=interruptible, + requirements=requirements, + runs_when_disabled=runs_when_disabled, + ) + + return wrapper + + if isinstance(command_or_coroutine, Command): + self._trigger.toggleWhenActive(command_or_coroutine, interruptible) + return + + self._trigger.toggleWhenActive( + CoroutineCommand(command_or_coroutine, requirements, runs_when_disabled), + interruptible, + ) + return diff --git a/gen/Trigger.yml b/gen/Trigger.yml index c82f7c74..179f9af4 100644 --- a/gen/Trigger.yml +++ b/gen/Trigger.yml @@ -5,6 +5,7 @@ extra_includes: classes: Trigger: + rename: "_Trigger" methods: Trigger: overloads: diff --git a/pyproject.toml b/pyproject.toml index 10fd0b74..05e684ac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -122,14 +122,14 @@ WaitUntilCommand = "frc2/command/WaitUntilCommand.h" # WrapperCommand = "frc2/command/WrapperCommand.h" # frc2/command/button -Button = "frc2/command/button/Button.h" +# Button = "frc2/command/button/Button.h" CommandGenericHID = "frc2/command/button/CommandGenericHID.h" CommandJoystick = "frc2/command/button/CommandJoystick.h" CommandPS4Controller = "frc2/command/button/CommandPS4Controller.h" CommandXboxController = "frc2/command/button/CommandXboxController.h" -JoystickButton = "frc2/command/button/JoystickButton.h" -NetworkButton = "frc2/command/button/NetworkButton.h" -POVButton = "frc2/command/button/POVButton.h" +# JoystickButton = "frc2/command/button/JoystickButton.h" +# NetworkButton = "frc2/command/button/NetworkButton.h" +# POVButton = "frc2/command/button/POVButton.h" Trigger = "frc2/command/button/Trigger.h" # custom stuff diff --git a/tests/test_button.py b/tests/test_button.py index b0efc194..930b8594 100644 --- a/tests/test_button.py +++ b/tests/test_button.py @@ -2,6 +2,7 @@ import commands2.button from util import Counter +import pytest class MyButton(commands2.button.Button): diff --git a/tests/test_button_coroutine_functions.py b/tests/test_button_coroutine_functions.py new file mode 100644 index 00000000..b1744ad9 --- /dev/null +++ b/tests/test_button_coroutine_functions.py @@ -0,0 +1,183 @@ +import commands2 +import commands2.button + +from util import Counter +import pytest + + +class MyButton(commands2.button.Button): + def __init__(self): + super().__init__(self.isPressed) + self.pressed = False + + def isPressed(self) -> bool: + return self.pressed + + def setPressed(self, value: bool): + self.pressed = value + + +def test_when_pressed_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + + class state: + pass + + state.executed = False + + def cmd1(): + state.executed = True + return + yield + + button.setPressed(False) + button.whenPressed(cmd1) + scheduler.run() + + assert not state.executed + + button.setPressed(True) + scheduler.run() + scheduler.run() + + assert state.executed + + +def test_when_released_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + + class state: + pass + + state.executed = False + + def cmd1(): + state.executed = True + return + yield + + button.setPressed(True) + button.whenReleased(cmd1) + scheduler.run() + + assert not state.executed + + button.setPressed(False) + scheduler.run() + scheduler.run() + + assert state.executed + + +def test_while_held_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + + class state: + pass + + state.executed = 0 + + def cmd1(): + state.executed += 1 + return + yield + + button.setPressed(False) + button.whileHeld(cmd1) + scheduler.run() + + assert not state.executed + + button.setPressed(True) + scheduler.run() + scheduler.run() + assert state.executed == 2 + + button.setPressed(False) + scheduler.run() + + assert state.executed == 2 + + +def test_when_held_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + + class state: + pass + + state.executed = 0 + + def cmd1(): + while True: + state.executed += 1 + yield + + button.setPressed(False) + button.whenHeld(cmd1()) + scheduler.run() + + assert not state.executed + + button.setPressed(True) + scheduler.run() + scheduler.run() + assert state.executed == 2 + + button.setPressed(False) + + assert state.executed == 2 + + +def test_toggle_when_pressed_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + + class state: + pass + + state.executed = 0 + + def cmd1(): + while True: + state.executed += 1 + yield + + button.setPressed(False) + + button.toggleWhenPressed(cmd1) + scheduler.run() + + assert not state.executed + + button.setPressed(True) + scheduler.run() + + assert state.executed + + +def test_function_bindings_coroutine(scheduler: commands2.CommandScheduler): + + buttonWhenPressed = MyButton() + buttonWhileHeld = MyButton() + buttonWhenReleased = MyButton() + + buttonWhenPressed.setPressed(False) + buttonWhileHeld.setPressed(True) + buttonWhenReleased.setPressed(True) + + counter = Counter() + + def increment(): + counter.increment() + return + yield + + buttonWhenPressed.whenPressed(increment) + buttonWhileHeld.whileHeld(increment) + buttonWhenReleased.whenReleased(increment) + + scheduler.run() + buttonWhenPressed.setPressed(True) + buttonWhenReleased.setPressed(False) + scheduler.run() + + assert counter.value == 4 diff --git a/tests/test_button_coroutines.py b/tests/test_button_coroutines.py new file mode 100644 index 00000000..97fad40b --- /dev/null +++ b/tests/test_button_coroutines.py @@ -0,0 +1,185 @@ +import commands2 +import commands2.button + +from util import Counter +import pytest + + +class MyButton(commands2.button.Button): + def __init__(self): + super().__init__(self.isPressed) + self.pressed = False + + def isPressed(self) -> bool: + return self.pressed + + def setPressed(self, value: bool): + self.pressed = value + + +def test_when_pressed_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + + class state: + pass + + state.executed = False + + def cmd1(): + state.executed = True + return + yield + + button.setPressed(False) + button.whenPressed(cmd1()) + scheduler.run() + + assert not state.executed + + button.setPressed(True) + scheduler.run() + scheduler.run() + + assert state.executed + + +def test_when_released_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + + class state: + pass + + state.executed = False + + def cmd1(): + state.executed = True + return + yield + + button.setPressed(True) + button.whenReleased(cmd1()) + scheduler.run() + + assert not state.executed + + button.setPressed(False) + scheduler.run() + scheduler.run() + + assert state.executed + + +@pytest.mark.xfail(strict=True) +def test_while_held_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + + class state: + pass + + state.executed = 0 + + def cmd1(): + state.executed += 1 + return + yield + + button.setPressed(False) + button.whileHeld(cmd1()) + scheduler.run() + + assert not state.executed + + button.setPressed(True) + scheduler.run() + scheduler.run() + assert state.executed == 2 + + button.setPressed(False) + scheduler.run() + + assert state.executed == 2 + + +def test_when_held_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + + class state: + pass + + state.executed = 0 + + def cmd1(): + while True: + state.executed += 1 + yield + + button.setPressed(False) + button.whenHeld(cmd1()) + scheduler.run() + + assert not state.executed + + button.setPressed(True) + scheduler.run() + scheduler.run() + assert state.executed == 2 + + button.setPressed(False) + + assert state.executed == 2 + + +def test_toggle_when_pressed_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + + class state: + pass + + state.executed = 0 + + def cmd1(): + while True: + state.executed += 1 + yield + + button.setPressed(False) + + button.toggleWhenPressed(cmd1()) + scheduler.run() + + assert not state.executed + + button.setPressed(True) + scheduler.run() + + assert state.executed + + +@pytest.mark.xfail(strict=True) +def test_function_bindings_coroutine(scheduler: commands2.CommandScheduler): + + buttonWhenPressed = MyButton() + buttonWhileHeld = MyButton() + buttonWhenReleased = MyButton() + + buttonWhenPressed.setPressed(False) + buttonWhileHeld.setPressed(True) + buttonWhenReleased.setPressed(True) + + counter = Counter() + + def increment(): + counter.increment() + return + yield + + buttonWhenPressed.whenPressed(increment()) + buttonWhileHeld.whileHeld(increment()) + buttonWhenReleased.whenReleased(increment()) + + scheduler.run() + buttonWhenPressed.setPressed(True) + buttonWhenReleased.setPressed(False) + scheduler.run() + + assert counter.value == 4 diff --git a/tests/test_button_decorator_coroutines.py b/tests/test_button_decorator_coroutines.py new file mode 100644 index 00000000..5f0a6710 --- /dev/null +++ b/tests/test_button_decorator_coroutines.py @@ -0,0 +1,153 @@ +import commands2 +import commands2.button + +from util import Counter +import pytest + + +class MyButton(commands2.button.Button): + def __init__(self): + super().__init__(self.isPressed) + self.pressed = False + + def isPressed(self) -> bool: + return self.pressed + + def setPressed(self, value: bool): + self.pressed = value + + +def test_when_pressed_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + button.setPressed(False) + + class state: + pass + + state.executed = False + + @button.whenPressed + def cmd1(): + state.executed = True + return + yield + + scheduler.run() + + assert not state.executed + + button.setPressed(True) + scheduler.run() + scheduler.run() + + assert state.executed + + +def test_when_released_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + button.setPressed(True) + + class state: + pass + + state.executed = False + + @button.whenReleased() + def cmd1(): + state.executed = True + return + yield + + scheduler.run() + + assert not state.executed + + button.setPressed(False) + scheduler.run() + scheduler.run() + + assert state.executed + + +def test_while_held_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + button.setPressed(False) + + class state: + pass + + state.executed = 0 + + @button.whileHeld(interruptible=True) + def cmd1(): + state.executed += 1 + return + yield + + scheduler.run() + + assert not state.executed + + button.setPressed(True) + scheduler.run() + scheduler.run() + assert state.executed == 2 + + button.setPressed(False) + scheduler.run() + + assert state.executed == 2 + + +def test_when_held_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + button.setPressed(False) + + class state: + pass + + state.executed = 0 + + @button.whenHeld(runs_when_disabled=True) + def cmd1(): + while True: + state.executed += 1 + yield + + scheduler.run() + + assert not state.executed + + button.setPressed(True) + scheduler.run() + scheduler.run() + assert state.executed == 2 + + button.setPressed(False) + + assert state.executed == 2 + + +def test_toggle_when_pressed_coroutine(scheduler: commands2.CommandScheduler): + button = MyButton() + button.setPressed(False) + + class state: + pass + + state.executed = 0 + + @button.toggleWhenPressed + def cmd1(): + while True: + state.executed += 1 + yield + + scheduler.run() + + assert not state.executed + + button.setPressed(True) + scheduler.run() + + assert state.executed diff --git a/tests/test_coroutines.py b/tests/test_coroutines.py new file mode 100644 index 00000000..14961421 --- /dev/null +++ b/tests/test_coroutines.py @@ -0,0 +1,127 @@ +import commands2 +import commands2.button +from commands2 import commandify + +from util import Counter +import pytest + + +def test_coroutine_command(scheduler: commands2.CommandScheduler): + class state: + pass + + state.co1_count = 0 + + def co1(): + for i in range(3): + state.co1_count += 1 + yield + + cmd1 = commands2.CoroutineCommand(co1) + + cmd1.schedule() + + for _ in range(10): + scheduler.run() + + assert state.co1_count == 3 + + +def test_coroutine_composition(scheduler: commands2.CommandScheduler): + class state: + pass + + state.co1_count = 0 + state.co2_count = 0 + + def co1(): + for i in range(3): + state.co1_count += 1 + yield + + def co2(): + state.co2_count += 1 + yield from co1() + state.co2_count += 1 + + commands2.CoroutineCommand(co2).schedule() + + for _ in range(10): + scheduler.run() + + assert state.co1_count == 3 + assert state.co2_count == 2 + + +def test_yield_from_command(scheduler: commands2.CommandScheduler): + class state: + pass + + state.co1_count = 0 + state.co2_count = 0 + + class Command1(commands2.CommandBase): + def execute(self) -> None: + state.co1_count += 1 + + def isFinished(self) -> bool: + return state.co1_count == 3 + + def co2(): + state.co2_count += 1 + yield from Command1() + state.co2_count += 1 + + commands2.CoroutineCommand(co2).schedule() + + for _ in range(10): + scheduler.run() + + assert state.co1_count == 3 + assert state.co2_count == 2 + + +# def test_commandify(scheduler: commands2.CommandScheduler): +# class state: pass +# state.co1_count = 0 + +# def co1(n): +# for i in range(n): +# print(1) +# state.co1_count += 1 +# yield + +# Cmd1 = commandify(co1) +# Cmd1(5).schedule() +# # Cmd1(5).schedule() + +# for _ in range(10): +# scheduler.run() + +# assert state.co1_count == 5 + + +def test_commandify_decorator(scheduler: commands2.CommandScheduler): + class state: + pass + + state.co1_count = 0 + + @commandify + def Cmd1(n): + for i in range(n): + print(1) + state.co1_count += 1 + yield + + cmd = Cmd1(5) + cmd.schedule() + # Cmd1(5).schedule() + for _ in range(10): + scheduler.run() + + assert state.co1_count == 5 + + +def test_coroutine_command_lambda_pass_args(): + ...