diff --git a/adafruit_fona/adafruit_fona.py b/adafruit_fona/adafruit_fona.py index 4a71c84..1e88937 100644 --- a/adafruit_fona/adafruit_fona.py +++ b/adafruit_fona/adafruit_fona.py @@ -3,6 +3,8 @@ # # SPDX-License-Identifier: MIT +# pylint: disable=too-many-lines + """ `adafruit_fona` ================================================================================ @@ -24,6 +26,14 @@ from micropython import const from simpleio import map_range +try: + from typing import Optional, Tuple, Union, Literal + from circuitpython_typing import ReadableBuffer + from busio import UART + from digitalio import DigitalInOut +except ImportError: + pass + __version__ = "0.0.0-auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_FONA.git" @@ -65,7 +75,13 @@ class FONA: UDP_MODE = const(1) # UDP socket # pylint: disable=too-many-arguments - def __init__(self, uart, rst, ri=None, debug=False): + def __init__( + self, + uart: UART, + rst: DigitalInOut, + ri: Optional[DigitalInOut] = None, + debug: bool = False, + ) -> None: self._buf = b"" # shared buffer self._fona_type = 0 self._debug = debug @@ -79,7 +95,7 @@ def __init__(self, uart, rst, ri=None, debug=False): raise RuntimeError("Unable to find FONA. Please check connections.") # pylint: disable=too-many-branches, too-many-statements - def _init_fona(self): + def _init_fona(self) -> bool: """Initializes FONA module.""" self.reset() @@ -136,7 +152,7 @@ def _init_fona(self): self._fona_type = FONA_800_H return True - def factory_reset(self): + def factory_reset(self) -> bool: """Resets modem to factory configuration.""" self._uart_write(b"ATZ\r\n") @@ -144,7 +160,7 @@ def factory_reset(self): return False return True - def reset(self): + def reset(self) -> None: """Performs a hardware reset on the modem.""" if self._debug: print("* Reset FONA") @@ -157,14 +173,14 @@ def reset(self): @property # pylint: disable=too-many-return-statements - def version(self): + def version(self) -> int: """The version of the FONA module. Can be FONA_800_L, FONA_800_H, FONA_808_V1, FONA_808_V2, FONA_3G_A, FONA3G_E. """ return self._fona_type @property - def iemi(self): + def iemi(self) -> str: """FONA Module's IEMI (International Mobile Equipment Identity) number.""" if self._debug: print("FONA IEMI") @@ -176,7 +192,7 @@ def iemi(self): return iemi.decode("utf-8") @property - def local_ip(self): + def local_ip(self) -> Optional[str]: """Module's local IP address, None if not set.""" self._uart_write(b"AT+CIFSR\r\n") self._read_line() @@ -187,7 +203,7 @@ def local_ip(self): return ip_addr @property - def iccid(self): + def iccid(self) -> str: """SIM Card's unique ICCID (Integrated Circuit Card Identifier).""" if self._debug: print("ICCID") @@ -197,7 +213,7 @@ def iccid(self): return iccid @property - def gprs(self): + def gprs(self) -> bool: """GPRS (General Packet Radio Services) power status.""" if not self._send_parse_reply(b"AT+CGATT?", b"+CGATT: ", ":"): return False @@ -206,10 +222,15 @@ def gprs(self): return True # pylint: disable=too-many-return-statements - def set_gprs(self, apn=None, enable=True): + def set_gprs( + self, + apn: Optional[Tuple[str, Optional[str], Optional[str]]] = None, + enable: bool = True, + ) -> bool: """Configures and brings up GPRS. - :param bool enable: Enables or disables GPRS. + :param tuple apn: The APN information + :param bool enable: Enables or disables GPRS. """ if enable: apn_name, apn_user, apn_pass = apn @@ -293,7 +314,7 @@ def set_gprs(self, apn=None, enable=True): return True @property - def network_status(self): + def network_status(self) -> int: """The status of the cellular network.""" self._read_line() if self._debug: @@ -306,7 +327,7 @@ def network_status(self): return status @property - def rssi(self): + def rssi(self) -> float: """The received signal strength indicator for the cellular network we are connected to. """ @@ -331,7 +352,7 @@ def rssi(self): return rssi @property - def gps(self): + def gps(self) -> int: """Module's GPS status.""" if self._debug: print("GPS Fix") @@ -354,7 +375,7 @@ def gps(self): return status @gps.setter - def gps(self, gps_on=False): + def gps(self, gps_on: bool = False) -> bool: if self._fona_type not in (FONA_3G_A, FONA_3G_E, FONA_808_V1, FONA_808_V2): raise TypeError("GPS unsupported for this FONA module.") @@ -385,11 +406,15 @@ def gps(self, gps_on=False): return True - def pretty_ip(self, ip): # pylint: disable=no-self-use, invalid-name + def pretty_ip( # pylint: disable=no-self-use, invalid-name + self, ip: ReadableBuffer + ) -> str: """Converts a bytearray IP address to a dotted-quad string for printing""" return "%d.%d.%d.%d" % (ip[0], ip[1], ip[2], ip[3]) - def unpretty_ip(self, ip): # pylint: disable=no-self-use, invalid-name + def unpretty_ip( # pylint: disable=no-self-use, invalid-name + self, ip: str + ) -> bytes: """Converts a dotted-quad string to a bytearray IP address""" octets = [int(x) for x in ip.split(".")] return bytes(octets) @@ -397,14 +422,14 @@ def unpretty_ip(self, ip): # pylint: disable=no-self-use, invalid-name ### SMS ### @property - def enable_sms_notification(self): + def enable_sms_notification(self) -> bool: """Checks if SMS notifications are enabled.""" if not self._send_parse_reply(b"AT+CNMI?\r\n", b"+CNMI:", idx=1): return False return self._buf @enable_sms_notification.setter - def enable_sms_notification(self, enable=True): + def enable_sms_notification(self, enable: bool = True) -> bool: if enable: if not self._send_check_reply(b"AT+CNMI=2,1\r\n", reply=REPLY_OK): return False @@ -413,9 +438,10 @@ def enable_sms_notification(self, enable=True): return False return True - def receive_sms(self): + def receive_sms(self) -> Tuple[str, str]: """Checks for a message notification from the FONA module, replies back with the a tuple containing (sender, message). + NOTE: This method needs to be polled consistently due to the lack of hw-based interrupts in CircuitPython. @@ -437,11 +463,11 @@ def receive_sms(self): return sender, message.strip() - def send_sms(self, phone_number, message): + def send_sms(self, phone_number: int, message: str) -> bool: """Sends a message SMS to a phone number. + :param int phone_number: Destination phone number. :param str message: Message to send to the phone number. - """ if not hasattr(phone_number, "to_bytes"): raise TypeError("Phone number must be integer") @@ -474,10 +500,10 @@ def send_sms(self, phone_number, message): return False return True - def num_sms(self, sim_storage=True): + def num_sms(self, sim_storage: bool = True) -> int: """Returns the number of SMS messages stored in memory. - :param bool sim_storage: SMS storage on the SIM, otherwise internal storage on FONA chip. + :param bool sim_storage: SMS storage on the SIM, otherwise internal storage on FONA chip. """ if not self._send_check_reply(b"AT+CMGF=1", reply=REPLY_OK): raise RuntimeError("Operating mode not supported by FONA module.") @@ -500,10 +526,10 @@ def num_sms(self, sim_storage=True): return self._buf return 0 - def delete_sms(self, sms_slot): + def delete_sms(self, sms_slot: int) -> bool: """Deletes a SMS message from a storage (internal or sim) slot - :param int sms_slot: SMS SIM or FONA memory slot number. + :param int sms_slot: SMS SIM or FONA memory slot number. """ if not self._send_check_reply(b"AT+CMGF=1", reply=REPLY_OK): return False @@ -515,7 +541,7 @@ def delete_sms(self, sms_slot): return True - def delete_all_sms(self): + def delete_all_sms(self) -> bool: """Deletes all SMS messages on the FONA SIM.""" self._read_line() if not self._send_check_reply(b"AT+CMGF=1", reply=REPLY_OK): @@ -533,11 +559,11 @@ def delete_all_sms(self): return False return True - def read_sms(self, sms_slot): + def read_sms(self, sms_slot: int) -> Tuple[str, str]: """Reads and parses SMS messages from FONA device. Returns the SMS sender's phone number and the message contents as a tuple. - :param int sms_slot: SMS SIM or FONA memory slot number. + :param int sms_slot: SMS SIM or FONA memory slot number. """ if not self._send_check_reply(b"AT+CMGF=1", reply=REPLY_OK): return False @@ -569,10 +595,10 @@ def read_sms(self, sms_slot): ### Socket API (TCP, UDP) ### - def get_host_by_name(self, hostname): + def get_host_by_name(self, hostname: str) -> Union[str, Literal[False]]: """Converts a hostname to a packed 4-byte IP address. - :param str hostname: Destination server. + :param str hostname: Destination server. """ self._read_line() if self._debug: @@ -590,7 +616,7 @@ def get_host_by_name(self, hostname): self._read_line() return self._buf - def get_socket(self): + def get_socket(self) -> int: """Obtains a socket, if available.""" if self._debug: print("*** Get socket") @@ -613,10 +639,10 @@ def get_socket(self): print("Allocated socket #%d" % allocated_socket) return allocated_socket - def remote_ip(self, sock_num): + def remote_ip(self, sock_num: int) -> str: """Returns the IP address of the remote server. - :param int sock_num: Desired socket. + :param int sock_num: Desired socket. """ assert ( sock_num < FONA_MAX_SOCKETS @@ -628,10 +654,10 @@ def remote_ip(self, sock_num): self._parse_reply(b"+CIPSTATUS:", idx=3) return self._buf - def socket_status(self, sock_num): + def socket_status(self, sock_num: int) -> bool: """Returns the socket connection status, False if not connected. - :param int sock_num: Desired socket number. + :param int sock_num: Desired socket number. """ assert ( sock_num < FONA_MAX_SOCKETS @@ -658,10 +684,10 @@ def socket_status(self, sock_num): return True - def socket_available(self, sock_num): + def socket_available(self, sock_num: int) -> int: """Returns the amount of bytes available to be read from the socket. - :param int sock_num: Desired socket to return bytes available from. + :param int sock_num: Desired socket to return bytes available from. """ assert ( sock_num < FONA_MAX_SOCKETS @@ -681,14 +707,16 @@ def socket_available(self, sock_num): return data - def socket_connect(self, sock_num, dest, port, conn_mode=TCP_MODE): + def socket_connect( + self, sock_num: int, dest: str, port: int, conn_mode: int = TCP_MODE + ) -> bool: """Connects to a destination IP address or hostname. By default, we use conn_mode TCP_MODE but we may also use UDP_MODE. + :param int sock_num: Desired socket number :param str dest: Destination dest address. :param int port: Destination dest port. :param int conn_mode: Connection mode (TCP/UDP) - """ if self._debug: print( @@ -722,10 +750,10 @@ def socket_connect(self, sock_num, dest, port, conn_mode=TCP_MODE): return False return True - def socket_close(self, sock_num): + def socket_close(self, sock_num: int) -> bool: """Close TCP or UDP connection - :param int sock_num: Desired socket number. + :param int sock_num: Desired socket number. """ if self._debug: print("*** Closing socket #%d" % sock_num) @@ -745,12 +773,12 @@ def socket_close(self, sock_num): return False return True - def socket_read(self, sock_num, length): + def socket_read(self, sock_num: int, length: int) -> bytearray: """Read data from the network into a buffer. - Returns buffer and amount of bytes read. + Returns bytes read. + :param int sock_num: Desired socket to read from. :param int length: Desired length to read. - """ self._read_line() assert ( @@ -769,12 +797,12 @@ def socket_read(self, sock_num, length): return self._uart.read(length) - def socket_write(self, sock_num, buffer, timeout=3000): + def socket_write(self, sock_num: int, buffer: bytes, timeout: int = 3000) -> bool: """Writes bytes to the socket. + :param int sock_num: Desired socket number to write to. :param bytes buffer: Bytes to write to socket. :param int timeout: Socket write timeout, in milliseconds. - """ self._read_line() assert ( @@ -801,22 +829,24 @@ def socket_write(self, sock_num, buffer, timeout=3000): ### UART Reply/Response Helpers ### - def _uart_write(self, buffer): + def _uart_write(self, buffer: bytes) -> None: """UART ``write`` with optional debug that prints the buffer before sending. - :param bytes buffer: Buffer of bytes to send to the bus. + :param bytes buffer: Buffer of bytes to send to the bus. """ if self._debug: print("\tUARTWRITE ::", buffer.decode()) self._uart.write(buffer) - def _send_parse_reply(self, send_data, reply_data, divider=",", idx=0): + def _send_parse_reply( + self, send_data: bytes, reply_data: bytes, divider: str = ",", idx: int = 0 + ) -> bool: """Sends data to FONA module, parses reply data returned. + :param bytes send_data: Data to send to the module. :param bytes send_data: Data received by the FONA module. :param str divider: Separator - """ self._read_line() self._get_reply(send_data) @@ -826,12 +856,18 @@ def _send_parse_reply(self, send_data, reply_data, divider=",", idx=0): return True def _get_reply( - self, data=None, prefix=None, suffix=None, timeout=FONA_DEFAULT_TIMEOUT_MS - ): + self, + data: Optional[bytes] = None, + prefix: Optional[bytes] = None, + suffix: Optional[bytes] = None, + timeout: int = FONA_DEFAULT_TIMEOUT_MS, + ) -> Tuple[int, bytes]: """Send data to FONA, read response into buffer. + :param bytes data: Data to send to FONA module. + :param bytes prefix: Data to write if ``data`` is not provided + :param bytes suffix: Data to write following ``prefix`` if ``data is not provided :param int timeout: Time to wait for UART response. - """ self._uart.reset_input_buffer() @@ -842,11 +878,11 @@ def _get_reply( return self._read_line(timeout) - def _parse_reply(self, reply, divider=",", idx=0): + def _parse_reply(self, reply: bytes, divider: str = ",", idx: int = 0) -> bool: """Attempts to find reply in UART buffer, reads up to divider. + :param bytes reply: Expected response from FONA module. :param str divider: Divider character. - """ parsed_reply = self._buf.find(reply) if parsed_reply == -1: @@ -866,12 +902,14 @@ def _parse_reply(self, reply, divider=",", idx=0): return True - def _read_line(self, timeout=FONA_DEFAULT_TIMEOUT_MS, multiline=False): + def _read_line( + self, timeout: int = FONA_DEFAULT_TIMEOUT_MS, multiline: bool = False + ) -> Tuple[int, bytes]: """Reads one or multiple lines into the buffer. Optionally prints the buffer after reading. + :param int timeout: Time to wait for UART serial to reply, in seconds. :param bool multiline: Read multiple lines. - """ self._buf = b"" reply_idx = 0 @@ -904,16 +942,19 @@ def _read_line(self, timeout=FONA_DEFAULT_TIMEOUT_MS, multiline=False): def _send_check_reply( self, - send=None, - prefix=None, - suffix=None, - reply=None, + send: Optional[bytes] = None, + prefix: Optional[bytes] = None, + suffix: Optional[bytes] = None, + reply: Optional[bytes] = None, timeout=FONA_DEFAULT_TIMEOUT_MS, - ): + ) -> bool: """Sends data to FONA, validates response. + :param bytes send: Command. + :param bytes prefix: Data to send if ``send`` not provided + :param bytes suffix: Data to send after ``prefix`` if ``send`` not provided :param bytes reply: Expected response from module. - + :param int timeout: Time to wait for UART serial to reply, in seconds. """ self._read_line() if send is None: @@ -929,14 +970,18 @@ def _send_check_reply( return True def _send_check_reply_quoted( - self, prefix, suffix, reply, timeout=FONA_DEFAULT_TIMEOUT_MS - ): + self, + prefix: bytes, + suffix: bytes, + reply: bytes, + timeout: int = FONA_DEFAULT_TIMEOUT_MS, + ) -> bool: """Send prefix, ", suffix, ", and a newline. Verify response against reply. + :param bytes prefix: Command prefix. :param bytes prefix: Command ", suffix, ". :param bytes reply: Expected response from module. :param int timeout: Time to expect reply back from FONA, in milliseconds. - """ self._buf = b"" @@ -946,13 +991,15 @@ def _send_check_reply_quoted( return False return True - def _get_reply_quoted(self, prefix, suffix, timeout): + def _get_reply_quoted( + self, prefix: bytes, suffix: bytes, timeout: int + ) -> Tuple[int, bytes]: """Send prefix, ", suffix, ", and newline. Returns: Response (and also fills buffer with response). + :param bytes prefix: Command prefix. :param bytes prefix: Command ", suffix, ". :param int timeout: Time to expect reply back from FONA, in milliseconds. - """ self._uart.reset_input_buffer() @@ -960,10 +1007,11 @@ def _get_reply_quoted(self, prefix, suffix, timeout): return self._read_line(timeout) - def _expect_reply(self, reply, timeout=10000): + def _expect_reply(self, reply: bytes, timeout: int = 10000) -> bool: """Reads line from FONA module and compares to reply from FONA module. - :param bytes reply: Expected reply from module. + :param bytes reply: Expected reply from module. + :param int timeout: Time to wait for UART serial to reply, in seconds. """ self._read_line(timeout) if reply not in self._buf: diff --git a/adafruit_fona/adafruit_fona_network.py b/adafruit_fona/adafruit_fona_network.py index fd79183..30cf38a 100755 --- a/adafruit_fona/adafruit_fona_network.py +++ b/adafruit_fona/adafruit_fona_network.py @@ -12,6 +12,16 @@ """ +try: + from typing import Optional, Tuple, Type + from types import TracebackType + from adafruit_fona.adafruit_fona import FONA +except ImportError: + pass + +__version__ = "0.0.0-auto.0" +__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_FONA.git" + # Network types NET_GSM = 0x01 NET_CDMA = 0x02 @@ -20,12 +30,14 @@ class CELLULAR: """Interface for connecting to and interacting with GSM and CDMA cellular networks.""" - def __init__(self, fona, apn): + def __init__( + self, fona: FONA, apn: Tuple[str, Optional[str], Optional[str]] + ) -> None: """Initializes interface with cellular network. - :param adafruit_fona fona: The Adafruit FONA module we are using. + + :param FONA fona: The Adafruit FONA module we are using. :param tuple apn: Tuple containing APN name, (optional) APN username, and APN password. - """ self._iface = fona self._apn = apn @@ -36,24 +48,29 @@ def __init__(self, fona, apn): self._network_type = NET_GSM self._iface.gps = True - def __enter__(self): + def __enter__(self) -> "CELLULAR": return self - def __exit__(self, exception_type, exception_value, traceback): + def __exit__( + self, + exception_type: Optional[Type[type]], + exception_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: self.disconnect() @property - def imei(self): + def imei(self) -> str: """Returns the modem's IEMI number, as a string.""" return self._iface.iemi @property - def iccid(self): + def iccid(self) -> str: """Returns the SIM card's ICCID, as a string.""" return self._iface.iccid @property - def is_attached(self): + def is_attached(self) -> bool: """Returns if the modem is attached to the network.""" if self._network_type == NET_GSM: if self._iface.gps == 3 and self._iface.network_status == 1: @@ -70,7 +87,7 @@ def is_connected(self): return False return True - def connect(self): + def connect(self) -> None: """Connect to cellular network.""" if self._iface.set_gprs(self._apn, True): self._network_connected = True @@ -78,7 +95,7 @@ def connect(self): # reset context for next connection attempt self._iface.set_gprs(self._apn, False) - def disconnect(self): + def disconnect(self) -> None: """Disconnect from cellular network.""" self._iface.set_gprs(self._apn, False) self._network_connected = False diff --git a/adafruit_fona/adafruit_fona_socket.py b/adafruit_fona/adafruit_fona_socket.py index 16034e6..041d626 100644 --- a/adafruit_fona/adafruit_fona_socket.py +++ b/adafruit_fona/adafruit_fona_socket.py @@ -16,16 +16,22 @@ import time from micropython import const +try: + from typing import Optional, Tuple, Sequence + from adafruit_fona.adafruit_fona import FONA +except ImportError: + pass + _the_interface = None # pylint: disable=invalid-name -def set_interface(iface): +def set_interface(iface: FONA) -> None: """Helper to set the global internet interface.""" global _the_interface # pylint: disable=global-statement, invalid-name _the_interface = iface -def htonl(x): +def htonl(x: int) -> int: """Convert 32-bit positive integers from host to network byte order.""" return ( ((x) << 24 & 0xFF000000) @@ -35,7 +41,7 @@ def htonl(x): ) -def htons(x): +def htons(x: int) -> int: """Convert 16-bit positive integers from host to network byte order.""" return (((x) << 8) & 0xFF00) | (((x) >> 8) & 0xFF) @@ -50,7 +56,7 @@ def htons(x): SOCKETS = [] # pylint: disable=too-many-arguments, unused-argument -def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0): +def getaddrinfo(host, port: int, family=0, socktype=0, proto=0, flags=0): """Translate the host/port argument into a sequence of 5-tuples that contain all the necessary arguments for creating a socket connected to that service. """ @@ -59,9 +65,10 @@ def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0): return [(AF_INET, socktype, proto, "", (gethostbyname(host), port))] -def gethostbyname(hostname): +def gethostbyname(hostname: str) -> str: """Translate a host name to IPv4 address format. The IPv4 address is returned as a string. + :param str hostname: Desired hostname. """ addr = _the_interface.get_host_by_name(hostname) @@ -72,14 +79,19 @@ def gethostbyname(hostname): class socket: """A simplified implementation of the Python 'socket' class for connecting to a FONA cellular module. + :param int family: Socket address (and protocol) family. :param int type: Socket type. - """ def __init__( - self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None, socknum=None - ): + self, + family: int = AF_INET, + type: int = SOCK_STREAM, + proto: int = 0, + fileno: Optional[int] = None, + socknum: Optional[int] = None, + ) -> None: if family != AF_INET: raise RuntimeError("Only AF_INET family supported by cellular sockets.") self._sock_type = type @@ -94,35 +106,37 @@ def __init__( self.settimeout(self._timeout) @property - def socknum(self): + def socknum(self) -> int: """Returns the socket object's socket number.""" return self._socknum @property - def connected(self): + def connected(self) -> bool: """Returns whether or not we are connected to the socket.""" return _the_interface.socket_status(self.socknum) - def getpeername(self): + def getpeername(self) -> str: """Return the remote address to which the socket is connected.""" return _the_interface.remote_ip(self.socknum) - def inet_aton(self, ip_string): + def inet_aton(self, ip_string: str) -> bytearray: """Convert an IPv4 address from dotted-quad string format. - :param str ip_string: IP Address, as a dotted-quad string. + :param str ip_string: IP Address, as a dotted-quad string. """ self._buffer = b"" self._buffer = [int(item) for item in ip_string.split(".")] self._buffer = bytearray(self._buffer) return self._buffer - def connect(self, address, conn_mode=None): + def connect( + self, address: Tuple[str, int], conn_mode: Optional[int] = None + ) -> None: """Connect to a remote socket at address. (The format of address depends on the address family — see above.) + :param tuple address: Remote socket as a (host, port) tuple. :param int conn_mode: Connection mode (TCP/UDP) - """ assert ( conn_mode != 0x03 @@ -135,17 +149,18 @@ def connect(self, address, conn_mode=None): raise RuntimeError("Failed to connect to host", host) self._buffer = b"" - def send(self, data): + def send(self, data: bytes) -> None: """Send data to the socket. The socket must be connected to a remote socket prior to calling this method. - :param bytes data: Desired data to send to the socket. + :param bytes data: Desired data to send to the socket. """ _the_interface.socket_write(self._socknum, data, self._timeout) gc.collect() - def recv(self, bufsize=0): + def recv(self, bufsize: int = 0) -> Sequence[int]: """Reads some bytes from the connected remote address. + :param int bufsize: maximum number of bytes to receive """ # print("Socket read", bufsize) @@ -189,7 +204,7 @@ def recv(self, bufsize=0): gc.collect() return ret - def readline(self): + def readline(self) -> Sequence[int]: """Attempt to return as many bytes as we can up to but not including '\r\n'""" # print("Socket readline") stamp = time.monotonic() @@ -205,26 +220,25 @@ def readline(self): gc.collect() return firstline - def available(self): + def available(self) -> int: """Returns how many bytes are available to be read from the socket.""" return _the_interface.socket_available(self._socknum) - def settimeout(self, value): + def settimeout(self, value: int) -> None: """Sets socket read timeout. - :param int value: Socket read timeout, in seconds. + :param int value: Socket read timeout, in seconds. """ if value < 0: raise Exception("Timeout period should be non-negative.") self._timeout = value - def gettimeout(self): + def gettimeout(self) -> int: """Return the timeout in seconds (float) associated with socket operations, or None if no timeout is set. - """ return self._timeout - def close(self): + def close(self) -> bool: """Closes the socket.""" return _the_interface.socket_close(self._socknum) diff --git a/adafruit_fona/fona_3g.py b/adafruit_fona/fona_3g.py index a16c9d3..c9acce5 100755 --- a/adafruit_fona/fona_3g.py +++ b/adafruit_fona/fona_3g.py @@ -24,6 +24,13 @@ from micropython import const from .adafruit_fona import FONA, REPLY_OK +try: + from typing import Optional, Tuple, Union, Literal + from busio import UART + from digitalio import DigitalInOut +except ImportError: + pass + __version__ = "0.0.0-auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_FONA.git" @@ -32,18 +39,24 @@ class FONA3G(FONA): """FONA 3G module interface. - :param ~busio.uart UART: FONA UART connection. - :param ~digialio RST: FONA RST pin. - :param ~digialio RI: Optional FONA Ring Interrupt (RI) pin. - :param bool debug: Enable debugging output. + :param ~busio.uart uart: FONA UART connection. + :param ~DigitalInOut rst: FONA RST pin. + :param ~DigitalInOut ri: Optional FONA Ring Interrupt (RI) pin. + :param bool debug: Enable debugging output. """ - def __init__(self, uart, rst, ri=None, debug=False): + def __init__( + self, + uart: UART, + rst: DigitalInOut, + ri: Optional[DigitalInOut] = None, + debug: bool = False, + ) -> None: uart.baudrate = 4800 super().__init__(uart, rst, ri, debug) - def set_baudrate(self, baudrate): + def set_baudrate(self, baudrate: int) -> bool: """Sets the FONA's UART baudrate.""" if not self._send_check_reply( b"AT+IPREX=" + str(baudrate).encode(), reply=REPLY_OK @@ -52,14 +65,14 @@ def set_baudrate(self, baudrate): return True @property - def gps(self): + def gps(self) -> bool: """Module's GPS status.""" if not self._send_check_reply(b"AT+CGPS?", reply=b"+CGPS: 1,1"): return False return True @gps.setter - def gps(self, gps_on=False): + def gps(self, gps_on: bool = False) -> bool: # check if GPS is already enabled if not self._send_parse_reply(b"AT+CGPS?", b"+CGPS: "): return False @@ -77,7 +90,7 @@ def gps(self, gps_on=False): return True @property - def ue_system_info(self): + def ue_system_info(self) -> bool: """UE System status.""" self._send_parse_reply(b"AT+CPSI?\r\n", b"+CPSI: ") if not self._buf == "GSM" or self._buf == "WCDMA": # 5.15 @@ -85,17 +98,22 @@ def ue_system_info(self): return True @property - def local_ip(self): + def local_ip(self) -> Optional[str]: """Module's local IP address, None if not set.""" if not self._send_parse_reply(b"AT+IPADDR", b"+IPADDR:"): return None return self._buf # pylint: disable=too-many-return-statements - def set_gprs(self, apn=None, enable=True): + def set_gprs( + self, + apn: Optional[Tuple[str, Optional[str], Optional[str]]] = None, + enable: bool = True, + ) -> bool: """Configures and brings up GPRS. - :param bool enable: Enables or disables GPRS. + :param tuple apn: APN configuration settings + :param bool enable: Enables or disables GPRS. """ if enable: if not self._send_check_reply(b"AT+CGATT=1", reply=REPLY_OK, timeout=10000): @@ -142,7 +160,7 @@ def set_gprs(self, apn=None, enable=True): ### Socket API (TCP, UDP) ### @property - def tx_timeout(self): + def tx_timeout(self) -> bool: """CIPSEND timeout, in milliseconds.""" self._read_line() if not self._send_parse_reply(b"AT+CIPTIMEOUT?", b"+CIPTIMEOUT:", idx=2): @@ -150,7 +168,7 @@ def tx_timeout(self): return True @tx_timeout.setter - def tx_timeout(self, timeout): + def tx_timeout(self, timeout: int) -> bool: self._read_line() if not self._send_check_reply( b"AT+CIPTIMEOUT=" + str(timeout).encode(), reply=REPLY_OK @@ -158,8 +176,9 @@ def tx_timeout(self, timeout): return False return True - def get_host_by_name(self, hostname): + def get_host_by_name(self, hostname: str) -> Union[str, Literal[False]]: """Converts a hostname to a 4-byte IP address. + :param str hostname: Domain name. """ self._read_line() @@ -175,7 +194,7 @@ def get_host_by_name(self, hostname): return False return self._buf - def get_socket(self): + def get_socket(self) -> int: """Returns an unused socket.""" if self._debug: print("*** Get socket") @@ -198,14 +217,16 @@ def get_socket(self): print("Allocated socket #%d" % socket) return socket - def socket_connect(self, sock_num, dest, port, conn_mode=0): + def socket_connect( + self, sock_num: int, dest: str, port: int, conn_mode: int = 0 + ) -> bool: """Connects to a destination IP address or hostname. By default, we use conn_mode TCP_MODE but we may also use UDP_MODE. + :param int sock_num: Desired socket number :param str dest: Destination dest address. :param int port: Destination dest port. :param int conn_mode: Connection mode (TCP/UDP) - """ if self._debug: print( @@ -236,8 +257,11 @@ def socket_connect(self, sock_num, dest, port, conn_mode=0): return False return True - def remote_ip(self, sock_num): - """Returns the IP address of the remote connection.""" + def remote_ip(self, sock_num: int) -> str: + """Returns the IP address of the remote connection. + + :param int sock_num: Desired socket number + """ self._read_line() assert ( sock_num < FONA_MAX_SOCKETS @@ -254,12 +278,12 @@ def remote_ip(self, sock_num): self._read_line() # eat the rest of '+CIPOPEN' responses return ip_addr - def socket_write(self, sock_num, buffer, timeout=120000): + def socket_write(self, sock_num: int, buffer: bytes, timeout: int = 120000) -> bool: """Writes len(buffer) bytes to the socket. + :param int sock_num: Desired socket number to write to. :param bytes buffer: Bytes to write to socket. :param int timeout: Socket write timeout, in milliseconds. Defaults to 120000ms. - """ self._read_line() assert ( @@ -295,10 +319,10 @@ def socket_write(self, sock_num, buffer, timeout=120000): return False return True - def socket_status(self, sock_num): + def socket_status(self, sock_num: int) -> bool: """Returns socket status, True if connected. False otherwise. - :param int sock_num: Desired socket number. + :param int sock_num: Desired socket number. """ if not self._send_parse_reply(b"AT+CIPCLOSE?", b"+CIPCLOSE:", idx=sock_num): return False