34
34
35
35
import gc
36
36
import time
37
- from digitalio import Direction
37
+ from digitalio import Direction , DigitalInOut
38
+
39
+ try :
40
+ import busio
41
+ from typing import Optional , Dict , Union , List
42
+ except ImportError :
43
+ pass
38
44
39
45
__version__ = "0.0.0-auto.0"
40
46
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_espATcontrol.git"
@@ -67,13 +73,13 @@ class ESP_ATcontrol:
67
73
68
74
def __init__ (
69
75
self ,
70
- uart ,
71
- default_baudrate ,
76
+ uart : busio . UART ,
77
+ default_baudrate : int ,
72
78
* ,
73
- run_baudrate = None ,
74
- rts_pin = None ,
75
- reset_pin = None ,
76
- debug = False
79
+ run_baudrate : Optional [ int ] = None ,
80
+ rts_pin : Optional [ DigitalInOut ] = None ,
81
+ reset_pin : Optional [ DigitalInOut ] = None ,
82
+ debug : bool = False
77
83
):
78
84
"""This function doesn't try to do any sync'ing, just sets up
79
85
# the hardware, that way nothing can unexpectedly fail!"""
@@ -100,7 +106,7 @@ def __init__(
100
106
self ._ifconfig = []
101
107
self ._initialized = False
102
108
103
- def begin (self ):
109
+ def begin (self ) -> None :
104
110
"""Initialize the module by syncing, resetting if necessary, setting up
105
111
the desired baudrate, turning on single-socket mode, and configuring
106
112
SSL support. Required before using the module but we dont do in __init__
@@ -128,7 +134,7 @@ def begin(self):
128
134
except OKError :
129
135
pass # retry
130
136
131
- def connect (self , secrets ) :
137
+ def connect (self , secrets : Dict [ str , Union [ str , int ]]) -> None :
132
138
"""Repeatedly try to connect to an access point with the details in
133
139
the passed in 'secrets' dictionary. Be sure 'ssid' and 'password' are
134
140
defined in the secrets dict! If 'timezone' is set, we'll also configure
@@ -160,15 +166,23 @@ def connect(self, secrets):
160
166
# *************************** SOCKET SETUP ****************************
161
167
162
168
@property
163
- def cipmux (self ):
169
+ def cipmux (self ) -> int :
164
170
"""The IP socket multiplexing setting. 0 for one socket, 1 for multi-socket"""
165
171
replies = self .at_response ("AT+CIPMUX?" , timeout = 3 ).split (b"\r \n " )
166
172
for reply in replies :
167
173
if reply .startswith (b"+CIPMUX:" ):
168
174
return int (reply [8 :])
169
175
raise RuntimeError ("Bad response to CIPMUX?" )
170
176
171
- def socket_connect (self , conntype , remote , remote_port , * , keepalive = 10 , retries = 1 ):
177
+ def socket_connect (
178
+ self ,
179
+ conntype : str ,
180
+ remote : str ,
181
+ remote_port : int ,
182
+ * ,
183
+ keepalive : int = 10 ,
184
+ retries : int = 1
185
+ ) -> bool :
172
186
"""Open a socket. conntype can be TYPE_TCP, TYPE_UDP, or TYPE_SSL. Remote
173
187
can be an IP address or DNS (we'll do the lookup for you. Remote port
174
188
is integer port on other side. We can't set the local port"""
@@ -199,7 +213,7 @@ def socket_connect(self, conntype, remote, remote_port, *, keepalive=10, retries
199
213
return True
200
214
return False
201
215
202
- def socket_send (self , buffer , timeout = 1 ) :
216
+ def socket_send (self , buffer : bytes , timeout : int = 1 ) -> bool :
203
217
"""Send data over the already-opened socket, buffer must be bytes"""
204
218
cmd = "AT+CIPSEND=%d" % len (buffer )
205
219
self .at_response (cmd , timeout = 5 , retries = 1 )
@@ -232,7 +246,7 @@ def socket_send(self, buffer, timeout=1):
232
246
# Get newlines off front and back, then split into lines
233
247
return True
234
248
235
- def socket_receive (self , timeout = 5 ) :
249
+ def socket_receive (self , timeout : int = 5 ) -> bytearray :
236
250
# pylint: disable=too-many-nested-blocks, too-many-branches
237
251
"""Check for incoming data over the open socket, returns bytes"""
238
252
incoming_bytes = None
@@ -298,7 +312,7 @@ def socket_receive(self, timeout=5):
298
312
gc .collect ()
299
313
return ret
300
314
301
- def socket_disconnect (self ):
315
+ def socket_disconnect (self ) -> None :
302
316
"""Close any open socket, if there is one"""
303
317
try :
304
318
self .at_response ("AT+CIPCLOSE" , retries = 1 )
@@ -307,7 +321,9 @@ def socket_disconnect(self):
307
321
308
322
# *************************** SNTP SETUP ****************************
309
323
310
- def sntp_config (self , enable , timezone = None , server = None ):
324
+ def sntp_config (
325
+ self , enable : bool , timezone : Optional [int ] = None , server : Optional [str ] = None
326
+ ) -> None :
311
327
"""Configure the built in ESP SNTP client with a UTC-offset number (timezone)
312
328
and server as IP or hostname."""
313
329
cmd = "AT+CIPSNTPCFG="
@@ -322,7 +338,7 @@ def sntp_config(self, enable, timezone=None, server=None):
322
338
self .at_response (cmd , timeout = 3 )
323
339
324
340
@property
325
- def sntp_time (self ):
341
+ def sntp_time (self ) -> Union [ bytes , None ] :
326
342
"""Return a string with time/date information using SNTP, may return
327
343
1970 'bad data' on the first few minutes, without warning!"""
328
344
replies = self .at_response ("AT+CIPSNTPTIME?" , timeout = 5 ).split (b"\r \n " )
@@ -334,7 +350,7 @@ def sntp_time(self):
334
350
# *************************** WIFI SETUP ****************************
335
351
336
352
@property
337
- def is_connected (self ):
353
+ def is_connected (self ) -> bool :
338
354
"""Initialize module if not done yet, and check if we're connected to
339
355
an access point, returns True or False"""
340
356
if not self ._initialized :
@@ -354,7 +370,7 @@ def is_connected(self):
354
370
return False
355
371
356
372
@property
357
- def status (self ):
373
+ def status (self ) -> Union [ int , None ] :
358
374
"""The IP connection status number (see AT+CIPSTATUS datasheet for meaning)"""
359
375
replies = self .at_response ("AT+CIPSTATUS" , timeout = 5 ).split (b"\r \n " )
360
376
for reply in replies :
@@ -363,7 +379,7 @@ def status(self):
363
379
return None
364
380
365
381
@property
366
- def mode (self ):
382
+ def mode (self ) -> Union [ int , None ] :
367
383
"""What mode we're in, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION"""
368
384
if not self ._initialized :
369
385
self .begin ()
@@ -374,7 +390,7 @@ def mode(self):
374
390
raise RuntimeError ("Bad response to CWMODE?" )
375
391
376
392
@mode .setter
377
- def mode (self , mode ) :
393
+ def mode (self , mode : int ) -> None :
378
394
"""Station or AP mode selection, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION"""
379
395
if not self ._initialized :
380
396
self .begin ()
@@ -383,15 +399,15 @@ def mode(self, mode):
383
399
self .at_response ("AT+CWMODE=%d" % mode , timeout = 3 )
384
400
385
401
@property
386
- def local_ip (self ):
402
+ def local_ip (self ) -> Union [ str , None ] :
387
403
"""Our local IP address as a dotted-quad string"""
388
404
reply = self .at_response ("AT+CIFSR" ).strip (b"\r \n " )
389
405
for line in reply .split (b"\r \n " ):
390
406
if line and line .startswith (b'+CIFSR:STAIP,"' ):
391
407
return str (line [14 :- 1 ], "utf-8" )
392
408
raise RuntimeError ("Couldn't find IP address" )
393
409
394
- def ping (self , host ) :
410
+ def ping (self , host : str ) -> Union [ int , None ] :
395
411
"""Ping the IP or hostname given, returns ms time or None on failure"""
396
412
reply = self .at_response ('AT+PING="%s"' % host .strip ('"' ), timeout = 5 )
397
413
for line in reply .split (b"\r \n " ):
@@ -404,7 +420,7 @@ def ping(self, host):
404
420
return None
405
421
raise RuntimeError ("Couldn't ping" )
406
422
407
- def nslookup (self , host ) :
423
+ def nslookup (self , host : str ) -> Union [ str , None ] :
408
424
"""Return a dotted-quad IP address strings that matches the hostname"""
409
425
reply = self .at_response ('AT+CIPDOMAIN="%s"' % host .strip ('"' ), timeout = 3 )
410
426
for line in reply .split (b"\r \n " ):
@@ -415,7 +431,7 @@ def nslookup(self, host):
415
431
# *************************** AP SETUP ****************************
416
432
417
433
@property
418
- def remote_AP (self ): # pylint: disable=invalid-name
434
+ def remote_AP (self ) -> List [ Union [ int , str , None ]] : # pylint: disable=invalid-name
419
435
"""The name of the access point we're connected to, as a string"""
420
436
stat = self .status
421
437
if stat != self .STATUS_APCONNECTED :
@@ -434,7 +450,7 @@ def remote_AP(self): # pylint: disable=invalid-name
434
450
return reply
435
451
return [None ] * 4
436
452
437
- def join_AP (self , ssid , password ) : # pylint: disable=invalid-name
453
+ def join_AP (self , ssid : str , password : str ) -> None : # pylint: disable=invalid-name
438
454
"""Try to join an access point by name and password, will return
439
455
immediately if we're already connected and won't try to reconnect"""
440
456
# First make sure we're in 'station' mode so we can connect to AP's
@@ -456,7 +472,9 @@ def join_AP(self, ssid, password): # pylint: disable=invalid-name
456
472
raise RuntimeError ("Didn't get IP address" )
457
473
return
458
474
459
- def scan_APs (self , retries = 3 ): # pylint: disable=invalid-name
475
+ def scan_APs ( # pylint: disable=invalid-name
476
+ self , retries : int = 3
477
+ ) -> Union [List [List [bytes ]], None ]:
460
478
"""Ask the module to scan for access points and return a list of lists
461
479
with name, RSSI, MAC addresses, etc"""
462
480
for _ in range (retries ):
@@ -482,11 +500,11 @@ def scan_APs(self, retries=3): # pylint: disable=invalid-name
482
500
# ************************** AT LOW LEVEL ****************************
483
501
484
502
@property
485
- def version (self ):
503
+ def version (self ) -> Union [ str , None ] :
486
504
"""The cached version string retrieved via the AT+GMR command"""
487
505
return self ._version
488
506
489
- def get_version (self ):
507
+ def get_version (self ) -> Union [ str , None ] :
490
508
"""Request the AT firmware version string and parse out the
491
509
version number"""
492
510
reply = self .at_response ("AT+GMR" , timeout = 3 ).strip (b"\r \n " )
@@ -499,12 +517,12 @@ def get_version(self):
499
517
self ._version = str (line , "utf-8" )
500
518
return self ._version
501
519
502
- def hw_flow (self , flag ) :
520
+ def hw_flow (self , flag : bool ) -> None :
503
521
"""Turn on HW flow control (if available) on to allow data, or off to stop"""
504
522
if self ._rts_pin :
505
523
self ._rts_pin .value = not flag
506
524
507
- def at_response (self , at_cmd , timeout = 5 , retries = 3 ) :
525
+ def at_response (self , at_cmd : str , timeout : int = 5 , retries : int = 3 ) -> bytes :
508
526
"""Send an AT command, check that we got an OK response,
509
527
and then cut out the reply lines to return. We can set
510
528
a variable timeout (how long we'll wait for response) and
@@ -554,7 +572,7 @@ def at_response(self, at_cmd, timeout=5, retries=3):
554
572
return response [:- 4 ]
555
573
raise OKError ("No OK response to " + at_cmd )
556
574
557
- def sync (self ):
575
+ def sync (self ) -> bool :
558
576
"""Check if we have AT commmand sync by sending plain ATs"""
559
577
try :
560
578
self .at_response ("AT" , timeout = 1 )
@@ -563,12 +581,12 @@ def sync(self):
563
581
return False
564
582
565
583
@property
566
- def baudrate (self ):
584
+ def baudrate (self ) -> int :
567
585
"""The baudrate of our UART connection"""
568
586
return self ._uart .baudrate
569
587
570
588
@baudrate .setter
571
- def baudrate (self , baudrate ) :
589
+ def baudrate (self , baudrate : int ) -> None :
572
590
"""Change the modules baudrate via AT commands and then check
573
591
that we're still sync'd."""
574
592
at_cmd = "AT+UART_CUR=" + str (baudrate ) + ",8,1,0,"
@@ -588,14 +606,14 @@ def baudrate(self, baudrate):
588
606
if not self .sync ():
589
607
raise RuntimeError ("Failed to resync after Baudrate change" )
590
608
591
- def echo (self , echo ) :
609
+ def echo (self , echo : bool ) -> None :
592
610
"""Set AT command echo on or off"""
593
611
if echo :
594
612
self .at_response ("ATE1" , timeout = 1 )
595
613
else :
596
614
self .at_response ("ATE0" , timeout = 1 )
597
615
598
- def soft_reset (self ):
616
+ def soft_reset (self ) -> bool :
599
617
"""Perform a software reset by AT command. Returns True
600
618
if we successfully performed, false if failed to reset"""
601
619
try :
@@ -609,13 +627,13 @@ def soft_reset(self):
609
627
pass # fail, see below
610
628
return False
611
629
612
- def factory_reset (self ):
630
+ def factory_reset (self ) -> None :
613
631
"""Perform a hard reset, then send factory restore settings request"""
614
632
self .hard_reset ()
615
633
self .at_response ("AT+RESTORE" , timeout = 1 )
616
634
self ._initialized = False
617
635
618
- def hard_reset (self ):
636
+ def hard_reset (self ) -> None :
619
637
"""Perform a hardware reset by toggling the reset pin, if it was
620
638
defined in the initialization of this object"""
621
639
if self ._reset_pin :
0 commit comments