36
36
import time
37
37
from digitalio import Direction
38
38
39
+ try :
40
+ import busio
41
+ from typing import Optional , Dict , Union , List
42
+ from digitalio import DigitalInOut
43
+ except ImportError :
44
+ pass
45
+
39
46
__version__ = "0.0.0-auto.0"
40
47
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_espATcontrol.git"
41
48
@@ -67,13 +74,13 @@ class ESP_ATcontrol:
67
74
68
75
def __init__ (
69
76
self ,
70
- uart ,
71
- default_baudrate ,
77
+ uart : busio . UART ,
78
+ default_baudrate : int ,
72
79
* ,
73
- run_baudrate = None ,
74
- rts_pin = None ,
75
- reset_pin = None ,
76
- debug = False
80
+ run_baudrate : Optional [ int ] = None ,
81
+ rts_pin : Optional [ DigitalInOut ] = None ,
82
+ reset_pin : Optional [ DigitalInOut ] = None ,
83
+ debug : bool = False
77
84
):
78
85
"""This function doesn't try to do any sync'ing, just sets up
79
86
# the hardware, that way nothing can unexpectedly fail!"""
@@ -100,7 +107,7 @@ def __init__(
100
107
self ._ifconfig = []
101
108
self ._initialized = False
102
109
103
- def begin (self ):
110
+ def begin (self ) -> None :
104
111
"""Initialize the module by syncing, resetting if necessary, setting up
105
112
the desired baudrate, turning on single-socket mode, and configuring
106
113
SSL support. Required before using the module but we dont do in __init__
@@ -128,7 +135,7 @@ def begin(self):
128
135
except OKError :
129
136
pass # retry
130
137
131
- def connect (self , secrets ) :
138
+ def connect (self , secrets : Dict [ str , Union [ str , int ]]) -> None :
132
139
"""Repeatedly try to connect to an access point with the details in
133
140
the passed in 'secrets' dictionary. Be sure 'ssid' and 'password' are
134
141
defined in the secrets dict! If 'timezone' is set, we'll also configure
@@ -160,15 +167,15 @@ def connect(self, secrets):
160
167
# *************************** SOCKET SETUP ****************************
161
168
162
169
@property
163
- def cipmux (self ):
170
+ def cipmux (self ) -> int :
164
171
"""The IP socket multiplexing setting. 0 for one socket, 1 for multi-socket"""
165
172
replies = self .at_response ("AT+CIPMUX?" , timeout = 3 ).split (b"\r \n " )
166
173
for reply in replies :
167
174
if reply .startswith (b"+CIPMUX:" ):
168
175
return int (reply [8 :])
169
176
raise RuntimeError ("Bad response to CIPMUX?" )
170
177
171
- def socket_connect (self , conntype , remote , remote_port , * , keepalive = 10 , retries = 1 ) :
178
+ def socket_connect (self , conntype : str , remote : str , remote_port : int , * , keepalive : int = 10 , retries : int = 1 ) -> bool :
172
179
"""Open a socket. conntype can be TYPE_TCP, TYPE_UDP, or TYPE_SSL. Remote
173
180
can be an IP address or DNS (we'll do the lookup for you. Remote port
174
181
is integer port on other side. We can't set the local port"""
@@ -199,7 +206,7 @@ def socket_connect(self, conntype, remote, remote_port, *, keepalive=10, retries
199
206
return True
200
207
return False
201
208
202
- def socket_send (self , buffer , timeout = 1 ) :
209
+ def socket_send (self , buffer : bytes , timeout : int = 1 ) -> bool :
203
210
"""Send data over the already-opened socket, buffer must be bytes"""
204
211
cmd = "AT+CIPSEND=%d" % len (buffer )
205
212
self .at_response (cmd , timeout = 5 , retries = 1 )
@@ -232,7 +239,7 @@ def socket_send(self, buffer, timeout=1):
232
239
# Get newlines off front and back, then split into lines
233
240
return True
234
241
235
- def socket_receive (self , timeout = 5 ) :
242
+ def socket_receive (self , timeout : int = 5 ) -> bytearray :
236
243
# pylint: disable=too-many-nested-blocks, too-many-branches
237
244
"""Check for incoming data over the open socket, returns bytes"""
238
245
incoming_bytes = None
@@ -298,7 +305,7 @@ def socket_receive(self, timeout=5):
298
305
gc .collect ()
299
306
return ret
300
307
301
- def socket_disconnect (self ):
308
+ def socket_disconnect (self ) -> None :
302
309
"""Close any open socket, if there is one"""
303
310
try :
304
311
self .at_response ("AT+CIPCLOSE" , retries = 1 )
@@ -307,7 +314,7 @@ def socket_disconnect(self):
307
314
308
315
# *************************** SNTP SETUP ****************************
309
316
310
- def sntp_config (self , enable , timezone = None , server = None ):
317
+ def sntp_config (self , enable : bool , timezone : Optional [ int ] = None , server : Optional [ str ] = None ) -> None :
311
318
"""Configure the built in ESP SNTP client with a UTC-offset number (timezone)
312
319
and server as IP or hostname."""
313
320
cmd = "AT+CIPSNTPCFG="
@@ -322,7 +329,7 @@ def sntp_config(self, enable, timezone=None, server=None):
322
329
self .at_response (cmd , timeout = 3 )
323
330
324
331
@property
325
- def sntp_time (self ):
332
+ def sntp_time (self ) -> Union [ bytes , None ] :
326
333
"""Return a string with time/date information using SNTP, may return
327
334
1970 'bad data' on the first few minutes, without warning!"""
328
335
replies = self .at_response ("AT+CIPSNTPTIME?" , timeout = 5 ).split (b"\r \n " )
@@ -334,7 +341,7 @@ def sntp_time(self):
334
341
# *************************** WIFI SETUP ****************************
335
342
336
343
@property
337
- def is_connected (self ):
344
+ def is_connected (self ) -> bool :
338
345
"""Initialize module if not done yet, and check if we're connected to
339
346
an access point, returns True or False"""
340
347
if not self ._initialized :
@@ -354,7 +361,7 @@ def is_connected(self):
354
361
return False
355
362
356
363
@property
357
- def status (self ):
364
+ def status (self ) -> Union [ int , None ] :
358
365
"""The IP connection status number (see AT+CIPSTATUS datasheet for meaning)"""
359
366
replies = self .at_response ("AT+CIPSTATUS" , timeout = 5 ).split (b"\r \n " )
360
367
for reply in replies :
@@ -363,7 +370,7 @@ def status(self):
363
370
return None
364
371
365
372
@property
366
- def mode (self ):
373
+ def mode (self ) -> Union [ int , None ] :
367
374
"""What mode we're in, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION"""
368
375
if not self ._initialized :
369
376
self .begin ()
@@ -374,7 +381,7 @@ def mode(self):
374
381
raise RuntimeError ("Bad response to CWMODE?" )
375
382
376
383
@mode .setter
377
- def mode (self , mode ) :
384
+ def mode (self , mode : int ) -> None :
378
385
"""Station or AP mode selection, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION"""
379
386
if not self ._initialized :
380
387
self .begin ()
@@ -383,15 +390,15 @@ def mode(self, mode):
383
390
self .at_response ("AT+CWMODE=%d" % mode , timeout = 3 )
384
391
385
392
@property
386
- def local_ip (self ):
393
+ def local_ip (self ) -> Union [ str , None ] :
387
394
"""Our local IP address as a dotted-quad string"""
388
395
reply = self .at_response ("AT+CIFSR" ).strip (b"\r \n " )
389
396
for line in reply .split (b"\r \n " ):
390
397
if line and line .startswith (b'+CIFSR:STAIP,"' ):
391
398
return str (line [14 :- 1 ], "utf-8" )
392
399
raise RuntimeError ("Couldn't find IP address" )
393
400
394
- def ping (self , host ) :
401
+ def ping (self , host : str ) -> Union [ int , None ] :
395
402
"""Ping the IP or hostname given, returns ms time or None on failure"""
396
403
reply = self .at_response ('AT+PING="%s"' % host .strip ('"' ), timeout = 5 )
397
404
for line in reply .split (b"\r \n " ):
@@ -404,7 +411,7 @@ def ping(self, host):
404
411
return None
405
412
raise RuntimeError ("Couldn't ping" )
406
413
407
- def nslookup (self , host ) :
414
+ def nslookup (self , host : str ) -> Union [ str , None ] :
408
415
"""Return a dotted-quad IP address strings that matches the hostname"""
409
416
reply = self .at_response ('AT+CIPDOMAIN="%s"' % host .strip ('"' ), timeout = 3 )
410
417
for line in reply .split (b"\r \n " ):
@@ -415,7 +422,7 @@ def nslookup(self, host):
415
422
# *************************** AP SETUP ****************************
416
423
417
424
@property
418
- def remote_AP (self ): # pylint: disable=invalid-name
425
+ def remote_AP (self ) -> List [ Union [ int , str , None ]] : # pylint: disable=invalid-name
419
426
"""The name of the access point we're connected to, as a string"""
420
427
stat = self .status
421
428
if stat != self .STATUS_APCONNECTED :
@@ -434,7 +441,7 @@ def remote_AP(self): # pylint: disable=invalid-name
434
441
return reply
435
442
return [None ] * 4
436
443
437
- def join_AP (self , ssid , password ) : # pylint: disable=invalid-name
444
+ def join_AP (self , ssid : str , password : str ) -> None : # pylint: disable=invalid-name
438
445
"""Try to join an access point by name and password, will return
439
446
immediately if we're already connected and won't try to reconnect"""
440
447
# First make sure we're in 'station' mode so we can connect to AP's
@@ -456,7 +463,7 @@ def join_AP(self, ssid, password): # pylint: disable=invalid-name
456
463
raise RuntimeError ("Didn't get IP address" )
457
464
return
458
465
459
- def scan_APs (self , retries = 3 ) : # pylint: disable=invalid-name
466
+ def scan_APs (self , retries : int = 3 ) -> Union [ List [ List [ bytes ]], None ] : # pylint: disable=invalid-name
460
467
"""Ask the module to scan for access points and return a list of lists
461
468
with name, RSSI, MAC addresses, etc"""
462
469
for _ in range (retries ):
@@ -482,11 +489,11 @@ def scan_APs(self, retries=3): # pylint: disable=invalid-name
482
489
# ************************** AT LOW LEVEL ****************************
483
490
484
491
@property
485
- def version (self ):
492
+ def version (self ) -> Union [ str , None ] :
486
493
"""The cached version string retrieved via the AT+GMR command"""
487
494
return self ._version
488
495
489
- def get_version (self ):
496
+ def get_version (self ) -> Union [ str , None ] :
490
497
"""Request the AT firmware version string and parse out the
491
498
version number"""
492
499
reply = self .at_response ("AT+GMR" , timeout = 3 ).strip (b"\r \n " )
@@ -499,12 +506,12 @@ def get_version(self):
499
506
self ._version = str (line , "utf-8" )
500
507
return self ._version
501
508
502
- def hw_flow (self , flag ) :
509
+ def hw_flow (self , flag : bool ) -> None :
503
510
"""Turn on HW flow control (if available) on to allow data, or off to stop"""
504
511
if self ._rts_pin :
505
512
self ._rts_pin .value = not flag
506
513
507
- def at_response (self , at_cmd , timeout = 5 , retries = 3 ) :
514
+ def at_response (self , at_cmd : str , timeout : int = 5 , retries : int = 3 ) -> bytes :
508
515
"""Send an AT command, check that we got an OK response,
509
516
and then cut out the reply lines to return. We can set
510
517
a variable timeout (how long we'll wait for response) and
@@ -554,7 +561,7 @@ def at_response(self, at_cmd, timeout=5, retries=3):
554
561
return response [:- 4 ]
555
562
raise OKError ("No OK response to " + at_cmd )
556
563
557
- def sync (self ):
564
+ def sync (self ) -> bool :
558
565
"""Check if we have AT commmand sync by sending plain ATs"""
559
566
try :
560
567
self .at_response ("AT" , timeout = 1 )
@@ -563,12 +570,12 @@ def sync(self):
563
570
return False
564
571
565
572
@property
566
- def baudrate (self ):
573
+ def baudrate (self ) -> int :
567
574
"""The baudrate of our UART connection"""
568
575
return self ._uart .baudrate
569
576
570
577
@baudrate .setter
571
- def baudrate (self , baudrate ) :
578
+ def baudrate (self , baudrate : int ) -> None :
572
579
"""Change the modules baudrate via AT commands and then check
573
580
that we're still sync'd."""
574
581
at_cmd = "AT+UART_CUR=" + str (baudrate ) + ",8,1,0,"
@@ -588,14 +595,14 @@ def baudrate(self, baudrate):
588
595
if not self .sync ():
589
596
raise RuntimeError ("Failed to resync after Baudrate change" )
590
597
591
- def echo (self , echo ) :
598
+ def echo (self , echo : bool ) -> None :
592
599
"""Set AT command echo on or off"""
593
600
if echo :
594
601
self .at_response ("ATE1" , timeout = 1 )
595
602
else :
596
603
self .at_response ("ATE0" , timeout = 1 )
597
604
598
- def soft_reset (self ):
605
+ def soft_reset (self ) -> bool :
599
606
"""Perform a software reset by AT command. Returns True
600
607
if we successfully performed, false if failed to reset"""
601
608
try :
@@ -609,13 +616,13 @@ def soft_reset(self):
609
616
pass # fail, see below
610
617
return False
611
618
612
- def factory_reset (self ):
619
+ def factory_reset (self ) -> None :
613
620
"""Perform a hard reset, then send factory restore settings request"""
614
621
self .hard_reset ()
615
622
self .at_response ("AT+RESTORE" , timeout = 1 )
616
623
self ._initialized = False
617
624
618
- def hard_reset (self ):
625
+ def hard_reset (self ) -> None :
619
626
"""Perform a hardware reset by toggling the reset pin, if it was
620
627
defined in the initialization of this object"""
621
628
if self ._reset_pin :
0 commit comments