From 3b54f14e6c644fb1bbe6b9be24d47fe26adf8ceb Mon Sep 17 00:00:00 2001 From: jcerise Date: Mon, 2 May 2022 12:11:00 -0600 Subject: [PATCH 1/6] Add type annotations --- adafruit_rfm9x.py | 189 ++++++++++++++++++++++++++-------------------- 1 file changed, 106 insertions(+), 83 deletions(-) diff --git a/adafruit_rfm9x.py b/adafruit_rfm9x.py index 772ab18..5b90984 100644 --- a/adafruit_rfm9x.py +++ b/adafruit_rfm9x.py @@ -15,7 +15,9 @@ import random import time import adafruit_bus_device.spi_device as spidev +from digitalio import DigitalInOut from micropython import const +from busio import SPI HAS_SUPERVISOR = False @@ -26,6 +28,12 @@ HAS_SUPERVISOR = True except ImportError: pass + +try: + from typing import Optional, Any +except ImportError: + pass + __version__ = "0.0.0-auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RFM9x.git" @@ -120,18 +128,18 @@ # pylint: disable=too-many-statements -def ticks_diff(ticks1, ticks2): +def ticks_diff(ticks1: int, ticks2: int) -> int: """Compute the signed difference between two ticks values assuming that they are within 2**28 ticks """ - diff = (ticks1 - ticks2) & _TICKS_MAX + diff: int = (ticks1 - ticks2) & _TICKS_MAX diff = ((diff + _TICKS_HALFPERIOD) & _TICKS_MAX) - _TICKS_HALFPERIOD return diff class RFM9x: """Interface to a RFM95/6/7/8 LoRa radio module. Allows sending and - receivng bytes of data in long range LoRa mode at a support board frequency + receiving bytes of data in long range LoRa mode at a support board frequency (433/915mhz). You must specify the following parameters: @@ -186,7 +194,7 @@ class _RegisterBits: # check from pylint. # pylint: disable=protected-access - def __init__(self, address, *, offset=0, bits=1): + def __init__(self, address: int, *, offset: int = 0, bits: int = 1) -> None: assert 0 <= offset <= 7 assert 1 <= bits <= 8 assert (offset + bits) <= 8 @@ -198,12 +206,12 @@ def __init__(self, address, *, offset=0, bits=1): self._mask <<= offset self._offset = offset - def __get__(self, obj, objtype): - reg_value = obj._read_u8(self._address) + def __get__(self, obj, objtype) -> int: + reg_value: int = obj._read_u8(self._address) return (reg_value & self._mask) >> self._offset - def __set__(self, obj, val): - reg_value = obj._read_u8(self._address) + def __set__(self, obj, val) -> None: + reg_value: int = obj._read_u8(self._address) reg_value &= ~self._mask reg_value |= (val & 0xFF) << self._offset obj._write_u8(self._address, reg_value) @@ -243,21 +251,23 @@ def __set__(self, obj, val): def __init__( self, - spi, - cs, - reset, - frequency, + spi: SPI, + cs: Optional[DigitalInOut], + reset: DigitalInOut, + frequency: int, *, preamble_length=8, - high_power=True, - baudrate=5000000, - agc=False, - crc=True - ): - self.high_power = high_power + high_power: bool = True, + baudrate: int = 5000000, + agc: bool = False, + crc: bool = True + ) -> None: + self.high_power: bool = high_power # Device support SPI mode 0 (polarity & phase = 0) up to a max of 10mhz. # Set Default Baudrate to 5MHz to avoid problems - self._device = spidev.SPIDevice(spi, cs, baudrate=baudrate, polarity=0, phase=0) + self._device: spidev.SPIDevice = spidev.SPIDevice( + spi, cs, baudrate=baudrate, polarity=0, phase=0 + ) # Setup reset as a digital output - initially High # This line is pulled low as an output quickly to trigger a reset. self._reset = reset @@ -266,7 +276,7 @@ def __init__( self.reset() # No device type check! Catch an error from the very first request and # throw a nicer message to indicate possible wiring problems. - version = self._read_u8(_RH_RF95_REG_42_VERSION) + version: int = self._read_u8(_RH_RF95_REG_42_VERSION) if version != 18: raise RuntimeError( "Failed to find rfm9x with expected version -- check wiring" @@ -288,82 +298,84 @@ def __init__( # Set mode idle self.idle() # Set frequency - self.frequency_mhz = frequency + self.frequency_mhz: float = frequency # Set preamble length (default 8 bytes to match radiohead). - self.preamble_length = preamble_length + self.preamble_length: int = preamble_length # Defaults set modem config to RadioHead compatible Bw125Cr45Sf128 mode. - self.signal_bandwidth = 125000 - self.coding_rate = 5 - self.spreading_factor = 7 + self.signal_bandwidth: int = 125000 + self.coding_rate: int = 5 + self.spreading_factor: int = 7 # Default to enable CRC checking on incoming packets. - self.enable_crc = crc + self.enable_crc: bool = crc """CRC Enable state""" # set AGC - Default = False self.auto_agc = agc """Automatic Gain Control state""" # Set transmit power to 13 dBm, a safe value any module supports. - self.tx_power = 13 + self.tx_power: int = 13 # initialize last RSSI reading - self.last_rssi = 0.0 + self.last_rssi: float = 0.0 """The RSSI of the last received packet. Stored when the packet was received. The instantaneous RSSI value may not be accurate once the operating mode has been changed. """ - self.last_snr = 0.0 + self.last_snr: float = 0.0 """The SNR of the last received packet. Stored when the packet was received. The instantaneous SNR value may not be accurate once the operating mode has been changed. """ # initialize timeouts and delays delays - self.ack_wait = 0.5 + self.ack_wait: float = 0.5 """The delay time before attempting a retry after not receiving an ACK""" - self.receive_timeout = 0.5 + self.receive_timeout: float = 0.5 """The amount of time to poll for a received packet. If no packet is received, the returned packet will be None """ - self.xmit_timeout = 2.0 + self.xmit_timeout: float = 2.0 """The amount of time to wait for the HW to transmit the packet. This is mainly used to prevent a hang due to a HW issue """ - self.ack_retries = 5 + self.ack_retries: int = 5 """The number of ACK retries before reporting a failure.""" - self.ack_delay = None + self.ack_delay: Optional[float] = None """The delay time before attemting to send an ACK. If ACKs are being missed try setting this to .1 or .2. """ # initialize sequence number counter for reliabe datagram mode - self.sequence_number = 0 + self.sequence_number: int = 0 # create seen Ids list - self.seen_ids = bytearray(256) + self.seen_ids: bytearray = bytearray(256) # initialize packet header # node address - default is broadcast - self.node = _RH_BROADCAST_ADDRESS + self.node: int = _RH_BROADCAST_ADDRESS """The default address of this Node. (0-255). If not 255 (0xff) then only packets address to this node will be accepted. First byte of the RadioHead header. """ # destination address - default is broadcast - self.destination = _RH_BROADCAST_ADDRESS + self.destination: int = _RH_BROADCAST_ADDRESS """The default destination address for packet transmissions. (0-255). If 255 (0xff) then any receiving node should accept the packet. Second byte of the RadioHead header. """ # ID - contains seq count for reliable datagram mode - self.identifier = 0 + self.identifier: int = 0 """Automatically set to the sequence number when send_with_ack() used. Third byte of the RadioHead header. """ # flags - identifies ack/reetry packet for reliable datagram mode - self.flags = 0 + self.flags: int = 0 """Upper 4 bits reserved for use by Reliable Datagram Mode. Lower 4 bits may be used to pass information. Fourth byte of the RadioHead header. """ - self.crc_error_count = 0 + self.crc_error_count: int = 0 # pylint: disable=no-member # Reconsider pylint: disable when this can be tested - def _read_into(self, address, buf, length=None): + def _read_into( + self, address: int, buf: bytearray, length: Optional[int] = None + ) -> None: # Read a number of bytes from the specified address into the provided # buffer. If length is not specified (the default) the entire buffer # will be filled. @@ -375,12 +387,14 @@ def _read_into(self, address, buf, length=None): device.write(self._BUFFER, end=1) device.readinto(buf, end=length) - def _read_u8(self, address): + def _read_u8(self, address: int) -> int: # Read a single byte from the provided address and return it. self._read_into(address, self._BUFFER, length=1) return self._BUFFER[0] - def _write_from(self, address, buf, length=None): + def _write_from( + self, address: int, buf: bytearray, length: Optional[int] = None + ) -> None: # Write a number of bytes to the provided address and taken from the # provided buffer. If no length is specified (the default) the entire # buffer is written. @@ -392,16 +406,17 @@ def _write_from(self, address, buf, length=None): device.write(self._BUFFER, end=1) device.write(buf, end=length) - def _write_u8(self, address, val): + def _write_u8(self, address: int, val: int) -> None: # Write a byte register to the chip. Specify the 7-bit address and the # 8-bit value to write to that address. with self._device as device: - self._BUFFER[0] = (address | 0x80) & 0xFF # Set top bit to 1 to - # indicate a write. + self._BUFFER[0] = ( + address | 0x80 + ) & 0xFF # Set top bit to 1 to indicate a write. self._BUFFER[1] = val & 0xFF device.write(self._BUFFER, end=2) - def reset(self): + def reset(self) -> None: """Perform a reset of the chip.""" # See section 7.2.2 of the datasheet for reset description. self._reset.value = False # Set Reset Low @@ -409,22 +424,22 @@ def reset(self): self._reset.value = True # set Reset High time.sleep(0.005) # 5 ms - def idle(self): + def idle(self) -> None: """Enter idle standby mode.""" self.operation_mode = STANDBY_MODE - def sleep(self): + def sleep(self) -> None: """Enter sleep mode.""" self.operation_mode = SLEEP_MODE - def listen(self): + def listen(self) -> None: """Listen for packets to be received by the chip. Use :py:func:`receive` to listen, wait and retrieve packets as they're available. """ self.operation_mode = RX_MODE self.dio0_mapping = 0b00 # Interrupt on rx done. - def transmit(self): + def transmit(self) -> None: """Transmit a packet which is queued in the FIFO. This is a low level function for entering transmit mode and more. For generating and transmitting a packet of data use :py:func:`send` instead. @@ -433,7 +448,7 @@ def transmit(self): self.dio0_mapping = 0b01 # Interrupt on tx done. @property - def preamble_length(self): + def preamble_length(self) -> int: """The length of the preamble for sent and received packets, an unsigned 16-bit value. Received packets must match this length or they are ignored! Set to 8 to match the RadioHead RFM95 library. @@ -443,13 +458,13 @@ def preamble_length(self): return ((msb << 8) | lsb) & 0xFFFF @preamble_length.setter - def preamble_length(self, val): + def preamble_length(self, val: int) -> None: assert 0 <= val <= 65535 self._write_u8(_RH_RF95_REG_20_PREAMBLE_MSB, (val >> 8) & 0xFF) self._write_u8(_RH_RF95_REG_21_PREAMBLE_LSB, val & 0xFF) @property - def frequency_mhz(self): + def frequency_mhz(self) -> float: """The frequency of the radio in Megahertz. Only the allowed values for your radio must be specified (i.e. 433 vs. 915 mhz)! """ @@ -461,7 +476,7 @@ def frequency_mhz(self): return frequency @frequency_mhz.setter - def frequency_mhz(self, val): + def frequency_mhz(self, val: int) -> None: if val < 240 or val > 960: raise RuntimeError("frequency_mhz must be between 240 and 960") # Calculate FRF register 24-bit value. @@ -475,7 +490,7 @@ def frequency_mhz(self, val): self._write_u8(_RH_RF95_REG_08_FRF_LSB, lsb) @property - def tx_power(self): + def tx_power(self) -> int: """The transmit power in dBm. Can be set to a value from 5 to 23 for high power devices (RFM95/96/97/98, high_power=True) or -1 to 14 for low power devices. Only integer power levels are actually set (i.e. 12.5 @@ -490,7 +505,7 @@ def tx_power(self): return self.output_power - 1 @tx_power.setter - def tx_power(self, val): + def tx_power(self, val: int) -> None: val = int(val) if self.high_power: if val < 5 or val > 23: @@ -511,11 +526,11 @@ def tx_power(self, val): self.output_power = (val + 1) & 0x0F @property - def rssi(self): + def rssi(self) -> int: """The received strength indicator (in dBm) of the last received message.""" # Read RSSI register and convert to value using formula in datasheet. # Remember in LoRa mode the payload register changes function to RSSI! - raw_rssi = self._read_u8(_RH_RF95_REG_1A_PKT_RSSI_VALUE) + raw_rssi: int = self._read_u8(_RH_RF95_REG_1A_PKT_RSSI_VALUE) if self.low_frequency_mode: raw_rssi -= 157 else: @@ -523,22 +538,22 @@ def rssi(self): return raw_rssi @property - def snr(self): + def snr(self) -> float: """The SNR (in dB) of the last received message.""" # Read SNR 0x19 register and convert to value using formula in datasheet. # SNR(dB) = PacketSnr [twos complement] / 4 - snr_byte = self._read_u8(_RH_RF95_REG_19_PKT_SNR_VALUE) + snr_byte: int = self._read_u8(_RH_RF95_REG_19_PKT_SNR_VALUE) if snr_byte > 127: snr_byte = (256 - snr_byte) * -1 return snr_byte / 4 @property - def signal_bandwidth(self): + def signal_bandwidth(self) -> int: """The signal bandwidth used by the radio (try setting to a higher value to increase throughput or to a lower value to increase the likelihood of successfully received payloads). Valid values are listed in RFM9x.bw_bins.""" - bw_id = (self._read_u8(_RH_RF95_REG_1D_MODEM_CONFIG1) & 0xF0) >> 4 + bw_id: int = (self._read_u8(_RH_RF95_REG_1D_MODEM_CONFIG1) & 0xF0) >> 4 if bw_id >= len(self.bw_bins): current_bandwidth = 500000 else: @@ -546,8 +561,10 @@ def signal_bandwidth(self): return current_bandwidth @signal_bandwidth.setter - def signal_bandwidth(self, val): + def signal_bandwidth(self, val: int) -> None: # Set signal bandwidth (set to 125000 to match RadioHead Bw125). + bw_id: int + cutoff: int for bw_id, cutoff in enumerate(self.bw_bins): if val <= cutoff: break @@ -581,17 +598,17 @@ def signal_bandwidth(self, val): self._write_u8(0x30, 0) @property - def coding_rate(self): + def coding_rate(self) -> int: """The coding rate used by the radio to control forward error correction (try setting to a higher value to increase tolerance of short bursts of interference or to a lower value to increase bit rate). Valid values are limited to 5, 6, 7, or 8.""" - cr_id = (self._read_u8(_RH_RF95_REG_1D_MODEM_CONFIG1) & 0x0E) >> 1 + cr_id: int = (self._read_u8(_RH_RF95_REG_1D_MODEM_CONFIG1) & 0x0E) >> 1 denominator = cr_id + 4 return denominator @coding_rate.setter - def coding_rate(self, val): + def coding_rate(self, val: int) -> None: # Set coding rate (set to 5 to match RadioHead Cr45). denominator = min(max(val, 5), 8) cr_id = denominator - 4 @@ -601,7 +618,7 @@ def coding_rate(self, val): ) @property - def spreading_factor(self): + def spreading_factor(self) -> int: """The spreading factor used by the radio (try setting to a higher value to increase the receiver's ability to distinguish signal from noise or to a lower value to increase the data transmission rate). @@ -610,7 +627,7 @@ def spreading_factor(self): return sf_id @spreading_factor.setter - def spreading_factor(self, val): + def spreading_factor(self, val: int) -> None: # Set spreading factor (set to 7 to match RadioHead Sf128). val = min(max(val, 6), 12) @@ -629,14 +646,14 @@ def spreading_factor(self, val): ) @property - def enable_crc(self): + def enable_crc(self) -> bool: """Set to True to enable hardware CRC checking of incoming packets. Incoming packets that fail the CRC check are not processed. Set to False to disable CRC checking and process all incoming packets.""" return (self._read_u8(_RH_RF95_REG_1E_MODEM_CONFIG2) & 0x04) == 0x04 @enable_crc.setter - def enable_crc(self, val): + def enable_crc(self, val: bool) -> None: # Optionally enable CRC checking on incoming packets. if val: self._write_u8( @@ -649,29 +666,29 @@ def enable_crc(self, val): self._read_u8(_RH_RF95_REG_1E_MODEM_CONFIG2) & 0xFB, ) - def tx_done(self): + def tx_done(self) -> int: """Transmit status""" return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x8) >> 3 - def rx_done(self): + def rx_done(self) -> int: """Receive status""" return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x40) >> 6 - def crc_error(self): + def crc_error(self) -> int: """crc status""" return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x20) >> 5 # pylint: disable=too-many-branches def send( self, - data, + data: Any, *, - keep_listening=False, + keep_listening: bool = False, destination=None, node=None, identifier=None, flags=None - ): + ) -> bool: """Send a string of data using the transmitter. You can only send 252 bytes at a time (limited by chip's FIFO size and appended headers). @@ -743,7 +760,7 @@ def send( self._write_u8(_RH_RF95_REG_12_IRQ_FLAGS, 0xFF) return not timed_out - def send_with_ack(self, data): + def send_with_ack(self, data: Any) -> bool: """Reliable Datagram mode: Send a packet with data and wait for an ACK response. The packet header is automatically generated. @@ -753,7 +770,7 @@ def send_with_ack(self, data): retries_remaining = self.ack_retries else: retries_remaining = 1 - got_ack = False + got_ack: bool = False self.sequence_number = (self.sequence_number + 1) & 0xFF while not got_ack and retries_remaining: self.identifier = self.sequence_number @@ -781,7 +798,12 @@ def send_with_ack(self, data): return got_ack def receive( - self, *, keep_listening=True, with_header=False, with_ack=False, timeout=None + self, + *, + keep_listening: bool = True, + with_header: bool = False, + with_ack: bool = False, + timeout: Optional[float] = None ): """Wait to receive a packet from the receiver. If a packet is found the payload bytes are returned, otherwise None is returned (which indicates the timeout elapsed with no @@ -789,7 +811,7 @@ def receive( If keep_listening is True (the default) the chip will immediately enter listening mode after reception of a packet, otherwise it will fall back to idle mode and ignore any future reception. - All packets must have a 4-byte header for compatibilty with the + All packets must have a 4-byte header for compatibility with the RadioHead library. The header consists of 4 bytes (To,From,ID,Flags). The default setting will strip the header before returning the packet to the caller. @@ -819,7 +841,7 @@ def receive( if time.monotonic() - start >= timeout: timed_out = True # Payload ready is set, a packet is in the FIFO. - packet = None + packet: Optional[bytearray] = None # save last RSSI reading self.last_rssi = self.rssi @@ -848,6 +870,7 @@ def receive( if fifo_length < 5: packet = None else: + assert packet is not None if ( self.node != _RH_BROADCAST_ADDRESS and packet[0] != _RH_BROADCAST_ADDRESS From 72fc1457c1813a339d6bd732339ffa151c1110f1 Mon Sep 17 00:00:00 2001 From: Jeremy Cerise Date: Mon, 2 May 2022 15:26:18 -0600 Subject: [PATCH 2/6] Remove unnecssary assert --- adafruit_rfm9x.py | 1 - 1 file changed, 1 deletion(-) diff --git a/adafruit_rfm9x.py b/adafruit_rfm9x.py index 5b90984..a956841 100644 --- a/adafruit_rfm9x.py +++ b/adafruit_rfm9x.py @@ -870,7 +870,6 @@ def receive( if fifo_length < 5: packet = None else: - assert packet is not None if ( self.node != _RH_BROADCAST_ADDRESS and packet[0] != _RH_BROADCAST_ADDRESS From 2a3455ad28bdc3b039b47144b1a887a701a4dc87 Mon Sep 17 00:00:00 2001 From: Jeremy Cerise Date: Mon, 2 May 2022 15:51:36 -0600 Subject: [PATCH 3/6] Based on conversation in discord, remove all non-method/function param and return types --- adafruit_rfm9x.py | 74 +++++++++++++++++++++++------------------------ 1 file changed, 36 insertions(+), 38 deletions(-) diff --git a/adafruit_rfm9x.py b/adafruit_rfm9x.py index a956841..947284c 100644 --- a/adafruit_rfm9x.py +++ b/adafruit_rfm9x.py @@ -30,7 +30,7 @@ pass try: - from typing import Optional, Any + from typing import Optional except ImportError: pass @@ -132,7 +132,7 @@ def ticks_diff(ticks1: int, ticks2: int) -> int: """Compute the signed difference between two ticks values assuming that they are within 2**28 ticks """ - diff: int = (ticks1 - ticks2) & _TICKS_MAX + diff = (ticks1 - ticks2) & _TICKS_MAX diff = ((diff + _TICKS_HALFPERIOD) & _TICKS_MAX) - _TICKS_HALFPERIOD return diff @@ -207,11 +207,11 @@ def __init__(self, address: int, *, offset: int = 0, bits: int = 1) -> None: self._offset = offset def __get__(self, obj, objtype) -> int: - reg_value: int = obj._read_u8(self._address) + reg_value = obj._read_u8(self._address) return (reg_value & self._mask) >> self._offset def __set__(self, obj, val) -> None: - reg_value: int = obj._read_u8(self._address) + reg_value = obj._read_u8(self._address) reg_value &= ~self._mask reg_value |= (val & 0xFF) << self._offset obj._write_u8(self._address, reg_value) @@ -262,7 +262,7 @@ def __init__( agc: bool = False, crc: bool = True ) -> None: - self.high_power: bool = high_power + self.high_power = high_power # Device support SPI mode 0 (polarity & phase = 0) up to a max of 10mhz. # Set Default Baudrate to 5MHz to avoid problems self._device: spidev.SPIDevice = spidev.SPIDevice( @@ -276,7 +276,7 @@ def __init__( self.reset() # No device type check! Catch an error from the very first request and # throw a nicer message to indicate possible wiring problems. - version: int = self._read_u8(_RH_RF95_REG_42_VERSION) + version = self._read_u8(_RH_RF95_REG_42_VERSION) if version != 18: raise RuntimeError( "Failed to find rfm9x with expected version -- check wiring" @@ -298,78 +298,78 @@ def __init__( # Set mode idle self.idle() # Set frequency - self.frequency_mhz: float = frequency + self.frequency_mhz = frequency # Set preamble length (default 8 bytes to match radiohead). - self.preamble_length: int = preamble_length + self.preamble_length = preamble_length # Defaults set modem config to RadioHead compatible Bw125Cr45Sf128 mode. - self.signal_bandwidth: int = 125000 - self.coding_rate: int = 5 - self.spreading_factor: int = 7 + self.signal_bandwidth = 125000 + self.coding_rate = 5 + self.spreading_factor = 7 # Default to enable CRC checking on incoming packets. - self.enable_crc: bool = crc + self.enable_crc = crc """CRC Enable state""" # set AGC - Default = False self.auto_agc = agc """Automatic Gain Control state""" # Set transmit power to 13 dBm, a safe value any module supports. - self.tx_power: int = 13 + self.tx_power = 13 # initialize last RSSI reading - self.last_rssi: float = 0.0 + self.last_rssi = 0.0 """The RSSI of the last received packet. Stored when the packet was received. The instantaneous RSSI value may not be accurate once the operating mode has been changed. """ - self.last_snr: float = 0.0 + self.last_snr = 0.0 """The SNR of the last received packet. Stored when the packet was received. The instantaneous SNR value may not be accurate once the operating mode has been changed. """ # initialize timeouts and delays delays - self.ack_wait: float = 0.5 + self.ack_wait = 0.5 """The delay time before attempting a retry after not receiving an ACK""" - self.receive_timeout: float = 0.5 + self.receive_timeout = 0.5 """The amount of time to poll for a received packet. If no packet is received, the returned packet will be None """ - self.xmit_timeout: float = 2.0 + self.xmit_timeout = 2.0 """The amount of time to wait for the HW to transmit the packet. This is mainly used to prevent a hang due to a HW issue """ - self.ack_retries: int = 5 + self.ack_retries = 5 """The number of ACK retries before reporting a failure.""" - self.ack_delay: Optional[float] = None + self.ack_delay = None """The delay time before attemting to send an ACK. If ACKs are being missed try setting this to .1 or .2. """ # initialize sequence number counter for reliabe datagram mode - self.sequence_number: int = 0 + self.sequence_number = 0 # create seen Ids list - self.seen_ids: bytearray = bytearray(256) + self.seen_ids = bytearray(256) # initialize packet header # node address - default is broadcast - self.node: int = _RH_BROADCAST_ADDRESS + self.node = _RH_BROADCAST_ADDRESS """The default address of this Node. (0-255). If not 255 (0xff) then only packets address to this node will be accepted. First byte of the RadioHead header. """ # destination address - default is broadcast - self.destination: int = _RH_BROADCAST_ADDRESS + self.destination = _RH_BROADCAST_ADDRESS """The default destination address for packet transmissions. (0-255). If 255 (0xff) then any receiving node should accept the packet. Second byte of the RadioHead header. """ # ID - contains seq count for reliable datagram mode - self.identifier: int = 0 + self.identifier = 0 """Automatically set to the sequence number when send_with_ack() used. Third byte of the RadioHead header. """ # flags - identifies ack/reetry packet for reliable datagram mode - self.flags: int = 0 + self.flags = 0 """Upper 4 bits reserved for use by Reliable Datagram Mode. Lower 4 bits may be used to pass information. Fourth byte of the RadioHead header. """ - self.crc_error_count: int = 0 + self.crc_error_count = 0 # pylint: disable=no-member # Reconsider pylint: disable when this can be tested @@ -530,7 +530,7 @@ def rssi(self) -> int: """The received strength indicator (in dBm) of the last received message.""" # Read RSSI register and convert to value using formula in datasheet. # Remember in LoRa mode the payload register changes function to RSSI! - raw_rssi: int = self._read_u8(_RH_RF95_REG_1A_PKT_RSSI_VALUE) + raw_rssi = self._read_u8(_RH_RF95_REG_1A_PKT_RSSI_VALUE) if self.low_frequency_mode: raw_rssi -= 157 else: @@ -542,7 +542,7 @@ def snr(self) -> float: """The SNR (in dB) of the last received message.""" # Read SNR 0x19 register and convert to value using formula in datasheet. # SNR(dB) = PacketSnr [twos complement] / 4 - snr_byte: int = self._read_u8(_RH_RF95_REG_19_PKT_SNR_VALUE) + snr_byte = self._read_u8(_RH_RF95_REG_19_PKT_SNR_VALUE) if snr_byte > 127: snr_byte = (256 - snr_byte) * -1 return snr_byte / 4 @@ -553,7 +553,7 @@ def signal_bandwidth(self) -> int: value to increase throughput or to a lower value to increase the likelihood of successfully received payloads). Valid values are listed in RFM9x.bw_bins.""" - bw_id: int = (self._read_u8(_RH_RF95_REG_1D_MODEM_CONFIG1) & 0xF0) >> 4 + bw_id = (self._read_u8(_RH_RF95_REG_1D_MODEM_CONFIG1) & 0xF0) >> 4 if bw_id >= len(self.bw_bins): current_bandwidth = 500000 else: @@ -563,8 +563,6 @@ def signal_bandwidth(self) -> int: @signal_bandwidth.setter def signal_bandwidth(self, val: int) -> None: # Set signal bandwidth (set to 125000 to match RadioHead Bw125). - bw_id: int - cutoff: int for bw_id, cutoff in enumerate(self.bw_bins): if val <= cutoff: break @@ -603,7 +601,7 @@ def coding_rate(self) -> int: correction (try setting to a higher value to increase tolerance of short bursts of interference or to a lower value to increase bit rate). Valid values are limited to 5, 6, 7, or 8.""" - cr_id: int = (self._read_u8(_RH_RF95_REG_1D_MODEM_CONFIG1) & 0x0E) >> 1 + cr_id = (self._read_u8(_RH_RF95_REG_1D_MODEM_CONFIG1) & 0x0E) >> 1 denominator = cr_id + 4 return denominator @@ -681,7 +679,7 @@ def crc_error(self) -> int: # pylint: disable=too-many-branches def send( self, - data: Any, + data, *, keep_listening: bool = False, destination=None, @@ -760,7 +758,7 @@ def send( self._write_u8(_RH_RF95_REG_12_IRQ_FLAGS, 0xFF) return not timed_out - def send_with_ack(self, data: Any) -> bool: + def send_with_ack(self, data) -> bool: """Reliable Datagram mode: Send a packet with data and wait for an ACK response. The packet header is automatically generated. @@ -770,7 +768,7 @@ def send_with_ack(self, data: Any) -> bool: retries_remaining = self.ack_retries else: retries_remaining = 1 - got_ack: bool = False + got_ack = False self.sequence_number = (self.sequence_number + 1) & 0xFF while not got_ack and retries_remaining: self.identifier = self.sequence_number @@ -804,7 +802,7 @@ def receive( with_header: bool = False, with_ack: bool = False, timeout: Optional[float] = None - ): + ) -> Optional[bytearray]: """Wait to receive a packet from the receiver. If a packet is found the payload bytes are returned, otherwise None is returned (which indicates the timeout elapsed with no reception). @@ -841,7 +839,7 @@ def receive( if time.monotonic() - start >= timeout: timed_out = True # Payload ready is set, a packet is in the FIFO. - packet: Optional[bytearray] = None + packet = None # save last RSSI reading self.last_rssi = self.rssi From a29a3850614c9a04074431e1646747a5bd4b6cfb Mon Sep 17 00:00:00 2001 From: Jeremy Cerise Date: Mon, 2 May 2022 22:07:54 -0600 Subject: [PATCH 4/6] Fix some typing based on feedback, add more specific types elsewhere --- adafruit_rfm9x.py | 44 +++++++++++++++++++++++--------------------- requirements.txt | 2 +- setup.py | 2 +- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/adafruit_rfm9x.py b/adafruit_rfm9x.py index 947284c..478fe43 100644 --- a/adafruit_rfm9x.py +++ b/adafruit_rfm9x.py @@ -15,9 +15,7 @@ import random import time import adafruit_bus_device.spi_device as spidev -from digitalio import DigitalInOut from micropython import const -from busio import SPI HAS_SUPERVISOR = False @@ -30,7 +28,11 @@ pass try: - from typing import Optional + from typing import Optional, Type, Literal + from digitalio import DigitalInOut + from busio import SPI + from circuitpython_typing import WriteableBuffer, ReadableBuffer + except ImportError: pass @@ -206,11 +208,11 @@ def __init__(self, address: int, *, offset: int = 0, bits: int = 1) -> None: self._mask <<= offset self._offset = offset - def __get__(self, obj, objtype) -> int: + def __get__(self, obj: "RFM9x", objtype: Type["RFM9x"]) -> int: reg_value = obj._read_u8(self._address) return (reg_value & self._mask) >> self._offset - def __set__(self, obj, val) -> None: + def __set__(self, obj: "RFM9x", val: int) -> None: reg_value = obj._read_u8(self._address) reg_value &= ~self._mask reg_value |= (val & 0xFF) << self._offset @@ -252,11 +254,11 @@ def __set__(self, obj, val) -> None: def __init__( self, spi: SPI, - cs: Optional[DigitalInOut], + cs: DigitalInOut, reset: DigitalInOut, frequency: int, *, - preamble_length=8, + preamble_length: int = 8, high_power: bool = True, baudrate: int = 5000000, agc: bool = False, @@ -374,7 +376,7 @@ def __init__( # pylint: disable=no-member # Reconsider pylint: disable when this can be tested def _read_into( - self, address: int, buf: bytearray, length: Optional[int] = None + self, address: int, buf: WriteableBuffer, length: Optional[int] = None ) -> None: # Read a number of bytes from the specified address into the provided # buffer. If length is not specified (the default) the entire buffer @@ -393,7 +395,7 @@ def _read_u8(self, address: int) -> int: return self._BUFFER[0] def _write_from( - self, address: int, buf: bytearray, length: Optional[int] = None + self, address: int, buf: ReadableBuffer, length: Optional[int] = None ) -> None: # Write a number of bytes to the provided address and taken from the # provided buffer. If no length is specified (the default) the entire @@ -464,7 +466,7 @@ def preamble_length(self, val: int) -> None: self._write_u8(_RH_RF95_REG_21_PREAMBLE_LSB, val & 0xFF) @property - def frequency_mhz(self) -> float: + def frequency_mhz(self) -> Literal[433.0, 915.0]: """The frequency of the radio in Megahertz. Only the allowed values for your radio must be specified (i.e. 433 vs. 915 mhz)! """ @@ -476,7 +478,7 @@ def frequency_mhz(self) -> float: return frequency @frequency_mhz.setter - def frequency_mhz(self, val: int) -> None: + def frequency_mhz(self, val: Literal[433.0, 915.0]) -> None: if val < 240 or val > 960: raise RuntimeError("frequency_mhz must be between 240 and 960") # Calculate FRF register 24-bit value. @@ -596,7 +598,7 @@ def signal_bandwidth(self, val: int) -> None: self._write_u8(0x30, 0) @property - def coding_rate(self) -> int: + def coding_rate(self) -> Literal[5, 6, 7, 8]: """The coding rate used by the radio to control forward error correction (try setting to a higher value to increase tolerance of short bursts of interference or to a lower value to increase bit @@ -606,7 +608,7 @@ def coding_rate(self) -> int: return denominator @coding_rate.setter - def coding_rate(self, val: int) -> None: + def coding_rate(self, val: Literal[5, 6, 7, 8]) -> None: # Set coding rate (set to 5 to match RadioHead Cr45). denominator = min(max(val, 5), 8) cr_id = denominator - 4 @@ -616,7 +618,7 @@ def coding_rate(self, val: int) -> None: ) @property - def spreading_factor(self) -> int: + def spreading_factor(self) -> Literal[6, 7, 8, 9, 10, 11, 12]: """The spreading factor used by the radio (try setting to a higher value to increase the receiver's ability to distinguish signal from noise or to a lower value to increase the data transmission rate). @@ -625,7 +627,7 @@ def spreading_factor(self) -> int: return sf_id @spreading_factor.setter - def spreading_factor(self, val: int) -> None: + def spreading_factor(self, val: Literal[6, 7, 8, 9, 10, 11, 12]) -> None: # Set spreading factor (set to 7 to match RadioHead Sf128). val = min(max(val, 6), 12) @@ -664,24 +666,24 @@ def enable_crc(self, val: bool) -> None: self._read_u8(_RH_RF95_REG_1E_MODEM_CONFIG2) & 0xFB, ) - def tx_done(self) -> int: + def tx_done(self) -> bool: """Transmit status""" return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x8) >> 3 - def rx_done(self) -> int: + def rx_done(self) -> bool: """Receive status""" return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x40) >> 6 - def crc_error(self) -> int: + def crc_error(self) -> bool: """crc status""" return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x20) >> 5 # pylint: disable=too-many-branches def send( self, - data, + data: ReadableBuffer, *, - keep_listening: bool = False, + keep_listening: Optional[int] = False, destination=None, node=None, identifier=None, @@ -798,7 +800,7 @@ def send_with_ack(self, data) -> bool: def receive( self, *, - keep_listening: bool = True, + keep_listening: Optional[int] = True, with_header: bool = False, with_ack: bool = False, timeout: Optional[float] = None diff --git a/requirements.txt b/requirements.txt index f675e3b..ae2e224 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,5 @@ # # SPDX-License-Identifier: Unlicense -Adafruit-Blinka +Adafruit-Blinka>=7.2.3 adafruit-circuitpython-busdevice diff --git a/setup.py b/setup.py index 03ed1c0..4f525cd 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ # Author details author="Adafruit Industries", author_email="circuitpython@adafruit.com", - install_requires=["Adafruit-Blinka", "adafruit-circuitpython-busdevice"], + install_requires=["Adafruit-Blinka>=7.2.3", "adafruit-circuitpython-busdevice"], # Choose your license license="MIT", # See https://pypi.python.org/pypi?%3Aaction=list_classifiers From 0c14497b0e10c09123d9fde064584da6d6c3a336 Mon Sep 17 00:00:00 2001 From: Jeremy Cerise Date: Wed, 4 May 2022 10:25:23 -0600 Subject: [PATCH 5/6] Fix some minor mixups with typing based on PR feedback --- adafruit_rfm9x.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/adafruit_rfm9x.py b/adafruit_rfm9x.py index 478fe43..d001087 100644 --- a/adafruit_rfm9x.py +++ b/adafruit_rfm9x.py @@ -683,11 +683,11 @@ def send( self, data: ReadableBuffer, *, - keep_listening: Optional[int] = False, - destination=None, - node=None, - identifier=None, - flags=None + keep_listening: bool = False, + destination: Optional[int] = None, + node: Optional[int] = None, + identifier: Optional[int] = None, + flags: Optional[int] = None ) -> bool: """Send a string of data using the transmitter. You can only send 252 bytes at a time @@ -760,7 +760,7 @@ def send( self._write_u8(_RH_RF95_REG_12_IRQ_FLAGS, 0xFF) return not timed_out - def send_with_ack(self, data) -> bool: + def send_with_ack(self, data: ReadableBuffer) -> bool: """Reliable Datagram mode: Send a packet with data and wait for an ACK response. The packet header is automatically generated. @@ -800,7 +800,7 @@ def send_with_ack(self, data) -> bool: def receive( self, *, - keep_listening: Optional[int] = True, + keep_listening: bool = True, with_header: bool = False, with_ack: bool = False, timeout: Optional[float] = None From b2cc3a4d5f83d91869fa1f5e2c5774626e25f6f8 Mon Sep 17 00:00:00 2001 From: Jeremy Cerise Date: Wed, 4 May 2022 11:22:18 -0600 Subject: [PATCH 6/6] Remove attribute annotation --- adafruit_rfm9x.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/adafruit_rfm9x.py b/adafruit_rfm9x.py index d001087..cc9561a 100644 --- a/adafruit_rfm9x.py +++ b/adafruit_rfm9x.py @@ -267,9 +267,7 @@ def __init__( self.high_power = high_power # Device support SPI mode 0 (polarity & phase = 0) up to a max of 10mhz. # Set Default Baudrate to 5MHz to avoid problems - self._device: spidev.SPIDevice = spidev.SPIDevice( - spi, cs, baudrate=baudrate, polarity=0, phase=0 - ) + self._device = spidev.SPIDevice(spi, cs, baudrate=baudrate, polarity=0, phase=0) # Setup reset as a digital output - initially High # This line is pulled low as an output quickly to trigger a reset. self._reset = reset