-
Notifications
You must be signed in to change notification settings - Fork 24
Add coroutine command support #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e4d360c
8488fbc
5de62e0
3ee269a
d285fb0
70f5715
9122d07
7174dfb
decd650
1c7e4d9
ed5b106
aafde42
0bb3251
8f91b2b
71322a3
bac4080
7815111
021ce43
401aa5f
04d90d7
ffbeeca
f2cae93
6068723
d8078e7
c977711
4028320
243072c
748c84c
a2da722
cb5af47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from ..trigger import Trigger | ||
|
||
|
||
class Button(Trigger): | ||
""" | ||
A class used to bind command scheduling to button presses. | ||
Can be composed with other buttons with the operators in Trigger. | ||
|
||
@see Trigger | ||
""" | ||
|
||
whenPressed = Trigger.whenActive | ||
whenReleased = Trigger.whenInactive | ||
whileHeld = Trigger.whileActiveContinous | ||
whenHeld = Trigger.whileActiveOnce | ||
toggleWhenPressed = Trigger.toggleWhenActive | ||
cancelWhenPressed = Trigger.cancelWhenActive |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from wpilib import Joystick | ||
|
||
from .button import Button | ||
|
||
|
||
class JoystickButton(Button): | ||
""" | ||
A class used to bind command scheduling to joystick button presses. | ||
Can be composed with other buttons with the operators in Trigger. | ||
|
||
@see Trigger | ||
""" | ||
|
||
def __init__(self, joystick: Joystick, button: int) -> None: | ||
""" | ||
Creates a JoystickButton that commands can be bound to. | ||
|
||
:param joystick: The joystick on which the button is located. | ||
:param button: The number of the button on the joystick. | ||
""" | ||
if not isinstance(joystick, Joystick) or not isinstance(button, int): | ||
raise TypeError( | ||
"JoystickButton.__init__(): incompatible constructor arguments. The following argument types are supported:\n" | ||
"\t1. commands2.button.JoystickButton(joystick: Joystick, button: int)\n" | ||
f"Invoked with: {joystick}, {button}" | ||
) | ||
super().__init__(lambda: joystick.getRawButton(button)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm curious what error message you get when something that isn't an integer is passed in... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It constructs fine but fails the first time py", line 21, in <lambda>
super().__init__(lambda: joystick.getRawButton(button))
TypeError: getRawButton(): incompatible function arguments. The following argument types are supported:
1. (self: wpilib.interfaces._interfaces.GenericHID, button: int) -> bool
Invoked with: <XboxController 0>, 'qwe' which isn't an ideal error message. I'll check the type in the constructor. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
from ntcore import NetworkTable, NetworkTableEntry | ||
|
||
from typing import Union, overload | ||
|
||
from .button import Button | ||
|
||
|
||
class NetworkButton(Button): | ||
""" | ||
A class used to bind command scheduling to a NetworkTable boolean fields. | ||
Can be composed with other buttons with the operators in Trigger. | ||
|
||
@see Trigger | ||
""" | ||
|
||
@overload | ||
def __init__(self, entry: NetworkTableEntry) -> None: | ||
""" | ||
Creates a NetworkButton that commands can be bound to. | ||
|
||
:param entry: The entry that is the value. | ||
""" | ||
|
||
@overload | ||
def __init__(self, table: Union[NetworkTable, str], field: str) -> None: | ||
""" | ||
Creates a NetworkButton that commands can be bound to. | ||
|
||
:param table: The table where the networktable value is located. | ||
:param field: The field that is the value. | ||
""" | ||
|
||
def __init__(self, *args, **kwargs) -> None: | ||
num_args = len(args) + len(kwargs) | ||
if num_args == 1: | ||
entry: NetworkTableEntry = kwargs.get("entry") or args[0] | ||
|
||
if not isinstance(entry, NetworkTableEntry): | ||
raise self._type_error(entry) | ||
|
||
super().__init__( | ||
lambda: NetworkTables.isConnected() and entry.getBoolean(False) | ||
) | ||
elif num_args == 2: | ||
table = kwargs.get("table") or args[0] | ||
field = kwargs.get("field") or args[-1] | ||
|
||
if isinstance(table, str): | ||
table = NetworkTables.getTable(table) | ||
|
||
entry = table.getEntry(field) | ||
self.__init__(entry) | ||
else: | ||
raise self._type_error(args) | ||
|
||
def _type_error(self, *args): | ||
return TypeError( | ||
"NetworkButton.__init__(): incompatible constructor arguments. The following argument types are supported:\n" | ||
"\t1. commands2.button.NetworkButton(entry: NetworkTableEntry)\n" | ||
"\t2. commands2.button.NetworkButton(table: str, field: str)\n" | ||
"\t3. commands2.button.NetworkButton(table: NetworkTable, field: str)\n" | ||
f"Invoked with: {', '.join(map(str, args))}" | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
from wpilib import Joystick | ||
|
||
from .button import Button | ||
|
||
|
||
class POVButton(Button): | ||
""" | ||
A class used to bind command scheduling to joystick POV presses. | ||
Can be composed with other buttons with the operators in Trigger. | ||
|
||
@see Trigger | ||
""" | ||
|
||
def __init__(self, joystick: Joystick, angle: int, povNumber: int = 0) -> None: | ||
""" | ||
Creates a POVButton that commands can be bound to. | ||
|
||
:param joystick: The joystick on which the button is located. | ||
:param angle: The angle of the POV corresponding to a button press. | ||
:param povNumber: The number of the POV on the joystick. | ||
""" | ||
if ( | ||
not isinstance(joystick, Joystick) | ||
or not isinstance(angle, int) | ||
or not isinstance(povNumber, int) | ||
): | ||
raise TypeError( | ||
"POVButton.__init__(): incompatible constructor arguments. The following argument types are supported:\n" | ||
"\t1. commands2.button.POVButton(joystick: Joystick, angle: int, povNumber: int)\n" | ||
f"Invoked with: {joystick}, {angle}, {povNumber}" | ||
) | ||
super().__init__(lambda: joystick.getPOV(povNumber) == angle) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
from functools import wraps | ||
from typing import Any, Callable, Generator, List, Union, Optional, overload | ||
from ._impl import CommandBase, Subsystem | ||
import inspect | ||
from typing_extensions import TypeGuard | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo: Figure out the minimum version of |
||
|
||
Coroutine = Generator[None, None, None] | ||
CoroutineFunction = Callable[[], Generator[None, None, None]] | ||
Coroutineable = Union[Callable[[], None], CoroutineFunction] | ||
|
||
|
||
def is_coroutine(func: Any) -> TypeGuard[Coroutine]: | ||
return inspect.isgenerator(func) | ||
|
||
|
||
def is_coroutine_function(func: Any) -> TypeGuard[CoroutineFunction]: | ||
return inspect.isgeneratorfunction(func) | ||
|
||
|
||
def is_coroutineable(func: Any) -> TypeGuard[Coroutineable]: | ||
return is_coroutine_function(func) or callable(func) | ||
|
||
|
||
def ensure_generator_function(func: Coroutineable) -> Callable[..., Coroutine]: | ||
if is_coroutine_function(func): | ||
return func | ||
|
||
@wraps(func) | ||
def wrapper(*args, **kwargs): | ||
func(*args, **kwargs) | ||
return | ||
yield | ||
|
||
return wrapper | ||
|
||
|
||
class CoroutineCommand(CommandBase): | ||
""" | ||
A class that wraps a coroutine function into a command. | ||
""" | ||
|
||
coroutine: Optional[Coroutine] | ||
coroutine_function: Optional[Coroutineable] | ||
is_finished: bool | ||
|
||
def __init__( | ||
self, | ||
coroutine: Union[Coroutine, Coroutineable], | ||
requirements: Optional[List[Subsystem]] = None, | ||
runs_when_disabled: bool = False, | ||
) -> None: | ||
""" | ||
Creates a CoroutineCommand than can be used as a command. | ||
|
||
:param coroutine: The coroutine or coroutine function to bind. | ||
:param requirements: The subsystems that this command requires. | ||
:param runs_when_disabled: Whether or not this command runs when the robot is disabled. | ||
""" | ||
super().__init__() | ||
self.coroutine = None | ||
self.coroutine_function = None | ||
self.runsWhenDisabled = lambda: runs_when_disabled | ||
|
||
if is_coroutine(coroutine): | ||
self.coroutine = coroutine | ||
elif is_coroutineable(coroutine): | ||
self.coroutine_function = coroutine | ||
else: | ||
raise TypeError("The coroutine must be a coroutine or a coroutine function") | ||
|
||
if requirements is not None: | ||
self.addRequirements(requirements) | ||
|
||
self.is_finished = False | ||
|
||
def initialize(self) -> None: | ||
if self.coroutine_function: | ||
self.coroutine = ensure_generator_function(self.coroutine_function)() | ||
elif self.coroutine and self.is_finished: | ||
raise RuntimeError("Generator objects cannot be reused.") | ||
|
||
self.is_finished = False | ||
|
||
def execute(self): | ||
print("r") | ||
try: | ||
if not self.is_finished: | ||
if not self.coroutine: | ||
raise TypeError("This command was not properly initialized") | ||
next(self.coroutine) | ||
except StopIteration: | ||
self.is_finished = True | ||
|
||
def isFinished(self): | ||
return self.is_finished | ||
|
||
|
||
@overload | ||
def commandify( | ||
*, requirements: Optional[List[Subsystem]] = None, runs_when_disabled: bool = False | ||
) -> Callable[[Coroutineable], Callable[..., CoroutineCommand]]: | ||
""" | ||
A decorator that turns a coroutine function into a command. | ||
A def should be under this. | ||
|
||
:param requirements: The subsystems that this command requires. | ||
:param runs_when_disabled: Whether or not this command runs when the robot is disabled. | ||
""" | ||
|
||
|
||
@overload | ||
def commandify(coroutine: Coroutineable) -> Callable[..., CoroutineCommand]: | ||
""" | ||
A decorator that turns a coroutine function into a command. | ||
A def should be under this. | ||
""" | ||
|
||
|
||
def commandify( | ||
coroutine: Optional[Coroutineable] = None, | ||
*, | ||
requirements: Optional[List[Subsystem]] = None, | ||
runs_when_disabled: bool = False, | ||
) -> Union[ | ||
Callable[[Coroutineable], Callable[..., CoroutineCommand]], | ||
Callable[..., CoroutineCommand], | ||
]: | ||
def wrapper(func: Coroutineable) -> Callable[..., CoroutineCommand]: | ||
@wraps(func) | ||
def arg_accepter(*args, **kwargs) -> CoroutineCommand: | ||
return CoroutineCommand( | ||
lambda: ensure_generator_function(func)(*args, **kwargs), | ||
requirements, | ||
) | ||
|
||
return arg_accepter | ||
|
||
if coroutine is None: | ||
return wrapper | ||
|
||
return wrapper(coroutine) |
Uh oh!
There was an error while loading. Please reload this page.