Skip to content

Commit acebfc1

Browse files
committed
linted
1 parent 68e237e commit acebfc1

File tree

3 files changed

+95
-76
lines changed

3 files changed

+95
-76
lines changed

adafruit_espatcontrol.py

Lines changed: 70 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -49,33 +49,37 @@
4949
5050
"""
5151

52+
import gc
5253
import time
5354
from digitalio import Direction
54-
import gc
5555

5656
__version__ = "0.0.0-auto.0"
5757
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_espATcontrol.git"
5858

5959
class OKError(Exception):
60+
"""The exception thrown when we didn't get acknowledgement to an AT command"""
6061
pass
6162

6263
class ESP_ATcontrol_socket:
64+
"""A 'socket' compatible interface thru the ESP AT command set"""
6365
def __init__(self, esp):
6466
self._esp = esp
6567

66-
def getaddrinfo(self, host, port,
67-
family=0, socktype=0, proto=0, flags=0):
68-
# honestly, we ignore anything but host & port
68+
def getaddrinfo(self, host, port, # pylint: disable=too-many-arguments
69+
family=0, socktype=0, proto=0, flags=0): # pylint: disable=unused-argument
70+
"""Given a hostname and a port name, return a 'socket.getaddrinfo'
71+
compatible list of tuples. Honestly, we ignore anything but host & port"""
6972
if not isinstance(port, int):
7073
raise RuntimeError("port must be an integer")
71-
ip = self._esp.nslookup(host)
72-
return [(family, socktype, proto, '', (ip, port))]
74+
ipaddr = self._esp.nslookup(host)
75+
return [(family, socktype, proto, '', (ipaddr, port))]
7376

7477
class ESP_ATcontrol:
7578
"""A wrapper for AT commands to a connected ESP8266 or ESP32 module to do
7679
some very basic internetting. The ESP module must be pre-programmed with
7780
AT command firmware, you can use esptool or our CircuitPython miniesptool
7881
to upload firmware"""
82+
#pylint: disable=too-many-public-methods, too-many-instance-attributes
7983
MODE_STATION = 1
8084
MODE_SOFTAP = 2
8185
MODE_SOFTAPSTATION = 3
@@ -89,9 +93,9 @@ class ESP_ATcontrol:
8993
USER_AGENT = "esp-idf/1.0 esp32"
9094

9195
def __init__(self, uart, default_baudrate, *, run_baudrate=None,
92-
rts_pin = None, reset_pin=None, debug=False):
93-
# this function doesn't try to do any sync'ing, just sets up
94-
# the hardware, that way nothing can unexpectedly fail!
96+
rts_pin=None, reset_pin=None, debug=False):
97+
"""This function doesn't try to do any sync'ing, just sets up
98+
# the hardware, that way nothing can unexpectedly fail!"""
9599
self._uart = uart
96100
if not run_baudrate:
97101
run_baudrate = default_baudrate
@@ -111,11 +115,15 @@ def __init__(self, uart, default_baudrate, *, run_baudrate=None,
111115
self._debug = debug
112116
self._versionstrings = []
113117
self._version = None
114-
self._IPDpacket = bytearray(1500)
118+
self._ipdpacket = bytearray(1500)
115119
self._ifconfig = []
116120
self._initialized = False
117121

118122
def begin(self):
123+
"""Initialize the module by syncing, resetting if necessary, setting up
124+
the desired baudrate, turning on single-socket mode, and configuring
125+
SSL support. Required before using the module but we dont do in __init__
126+
because this can throw an exception."""
119127
# Connect and sync
120128
for _ in range(3):
121129
try:
@@ -179,37 +187,44 @@ def request_url(self, url, ssl=False):
179187
return (header, data)
180188

181189
def connect(self, settings):
190+
"""Repeatedly try to connect to an access point with the details in
191+
the passed in 'settings' dictionary. Be sure 'ssid' and 'password' are
192+
defined in the settings dict! If 'timezone' is set, we'll also configure
193+
SNTP"""
182194
# Connect to WiFi if not already
183195
while True:
184196
try:
185197
if not self._initialized:
186198
self.begin()
187-
AP = self.remote_AP
199+
AP = self.remote_AP # pylint: disable=invalid-name
188200
print("Connected to", AP[0])
189201
if AP[0] != settings['ssid']:
190202
self.join_AP(settings['ssid'], settings['password'])
191203
if 'timezone' in settings:
192-
tz = settings['timezone']
204+
tzone = settings['timezone']
193205
ntp = None
194206
if 'ntp_server' in settings:
195207
ntp = settings['ntp_server']
196-
self.sntp_config(True, tz, ntp)
208+
self.sntp_config(True, tzone, ntp)
197209
print("My IP Address:", self.local_ip)
198210
return # yay!
199-
except (RuntimeError, OKError) as e:
200-
print("Failed to connect, retrying\n", e)
211+
except (RuntimeError, OKError) as exp:
212+
print("Failed to connect, retrying\n", exp)
201213
continue
202214

203-
"""*************************** SOCKET SETUP ****************************"""
215+
# *************************** SOCKET SETUP ****************************
216+
204217
@property
205218
def cipmux(self):
219+
"""The IP socket multiplexing setting. 0 for one socket, 1 for multi-socket"""
206220
replies = self.at_response("AT+CIPMUX?", timeout=3).split(b'\r\n')
207221
for reply in replies:
208222
if reply.startswith(b'+CIPMUX:'):
209223
return int(reply[8:])
210224
raise RuntimeError("Bad response to CIPMUX?")
211225

212226
def socket(self):
227+
"""Create a 'socket' object"""
213228
return ESP_ATcontrol_socket(self)
214229

215230
def socket_connect(self, conntype, remote, remote_port, *, keepalive=10, retries=1):
@@ -268,6 +283,7 @@ def socket_send(self, buffer, timeout=1):
268283
return True
269284

270285
def socket_receive(self, timeout=5):
286+
# pylint: disable=too-many-nested-blocks
271287
"""Check for incoming data over the open socket, returns bytes"""
272288
incoming_bytes = None
273289
bundle = b''
@@ -281,38 +297,35 @@ def socket_receive(self, timeout=5):
281297
if not incoming_bytes:
282298
self.hw_flow(False) # stop the flow
283299
# read one byte at a time
284-
self._IPDpacket[i] = self._uart.read(1)[0]
285-
if chr(self._IPDpacket[0]) != '+':
300+
self._ipdpacket[i] = self._uart.read(1)[0]
301+
if chr(self._ipdpacket[0]) != '+':
286302
i = 0 # keep goin' till we start with +
287303
continue
288304
i += 1
289305
# look for the IPD message
290-
if (ipd_start in self._IPDpacket) and chr(self._IPDpacket[i-1]) == ':':
306+
if (ipd_start in self._ipdpacket) and chr(self._ipdpacket[i-1]) == ':':
291307
try:
292-
s = str(self._IPDpacket[5:i-1], 'utf-8')
293-
incoming_bytes = int(s)
308+
ipd = str(self._ipdpacket[5:i-1], 'utf-8')
309+
incoming_bytes = int(ipd)
294310
if self._debug:
295311
print("Receiving:", incoming_bytes)
296312
except ValueError:
297-
raise RuntimeError("Parsing error during receive", s)
313+
raise RuntimeError("Parsing error during receive", ipd)
298314
i = 0 # reset the input buffer now that we know the size
299315
else:
300316
self.hw_flow(False) # stop the flow
301317
# read as much as we can!
302318
toread = min(incoming_bytes-i, self._uart.in_waiting)
303319
#print("i ", i, "to read:", toread)
304-
self._IPDpacket[i:i+toread] = self._uart.read(toread)
320+
self._ipdpacket[i:i+toread] = self._uart.read(toread)
305321
i += toread
306322
if i == incoming_bytes:
307-
#print(self._IPDpacket[0:i])
323+
#print(self._ipdpacket[0:i])
308324
gc.collect()
309-
bundle += self._IPDpacket[0:i]
325+
bundle += self._ipdpacket[0:i]
310326
i = incoming_bytes = 0
311327
else: # no data waiting
312328
self.hw_flow(True) # start the floooow
313-
else:
314-
#print("TIMED OUT")
315-
pass
316329
return bundle
317330

318331
def socket_disconnect(self):
@@ -322,32 +335,38 @@ def socket_disconnect(self):
322335
except OKError:
323336
pass # this is ok, means we didn't have an open socket
324337

325-
"""*************************** SNTP SETUP ****************************"""
338+
# *************************** SNTP SETUP ****************************
326339

327-
def sntp_config(self, en, timezone=None, server=None):
328-
at = "AT+CIPSNTPCFG="
329-
if en:
330-
at += '1'
340+
def sntp_config(self, enable, timezone=None, server=None):
341+
"""Configure the built in ESP SNTP client with a UTC-offset number (timezone)
342+
and server as IP or hostname."""
343+
cmd = "AT+CIPSNTPCFG="
344+
if enable:
345+
cmd += '1'
331346
else:
332-
at += '0'
347+
cmd += '0'
333348
if timezone is not None:
334-
at += ',%d' % timezone
349+
cmd += ',%d' % timezone
335350
if server is not None:
336-
at += ',"%s"' % server
337-
self.at_response(at, timeout=3)
351+
cmd += ',"%s"' % server
352+
self.at_response(cmd, timeout=3)
338353

339354
@property
340355
def sntp_time(self):
356+
"""Return a string with time/date information using SNTP, may return
357+
1970 'bad data' on the first few minutes, without warning!"""
341358
replies = self.at_response("AT+CIPSNTPTIME?", timeout=5).split(b'\r\n')
342359
for reply in replies:
343360
if reply.startswith(b'+CIPSNTPTIME:'):
344361
return reply[13:]
345362
return None
346363

347-
"""*************************** WIFI SETUP ****************************"""
364+
# *************************** WIFI SETUP ****************************
348365

349366
@property
350367
def is_connected(self):
368+
"""Initialize module if not done yet, and check if we're connected to
369+
an access point, returns True or False"""
351370
if not self._initialized:
352371
self.begin()
353372
try:
@@ -363,6 +382,7 @@ def is_connected(self):
363382

364383
@property
365384
def status(self):
385+
"""The IP connection status number (see AT+CIPSTATUS datasheet for meaning)"""
366386
replies = self.at_response("AT+CIPSTATUS", timeout=5).split(b'\r\n')
367387
for reply in replies:
368388
if reply.startswith(b'STATUS:'):
@@ -391,14 +411,15 @@ def mode(self, mode):
391411

392412
@property
393413
def local_ip(self):
394-
"""Our local IP address as a dotted-octal string"""
414+
"""Our local IP address as a dotted-quad string"""
395415
reply = self.at_response("AT+CIFSR").strip(b'\r\n')
396416
for line in reply.split(b'\r\n'):
397417
if line and line.startswith(b'+CIFSR:STAIP,"'):
398418
return str(line[14:-1], 'utf-8')
399419
raise RuntimeError("Couldn't find IP address")
400420

401421
def ping(self, host):
422+
"""Ping the IP or hostname given, returns ms time or None on failure"""
402423
reply = self.at_response('AT+PING="%s"' % host.strip('"'), timeout=5)
403424
for line in reply.split(b'\r\n'):
404425
if line and line.startswith(b'+PING:'):
@@ -409,12 +430,14 @@ def ping(self, host):
409430
raise RuntimeError("Couldn't ping")
410431

411432
def nslookup(self, host):
433+
"""Return a dotted-quad IP address strings that matches the hostname"""
412434
reply = self.at_response('AT+CIPDOMAIN="%s"' % host.strip('"'), timeout=3)
413435
for line in reply.split(b'\r\n'):
414436
if line and line.startswith(b'+CIPDOMAIN:'):
415437
return str(line[11:], 'utf-8')
416438
raise RuntimeError("Couldn't find IP address")
417-
"""*************************** AP SETUP ****************************"""
439+
440+
# *************************** AP SETUP ****************************
418441

419442
@property
420443
def remote_AP(self): # pylint: disable=invalid-name
@@ -479,10 +502,11 @@ def scan_APs(self, retries=3): # pylint: disable=invalid-name
479502
routers.append(router)
480503
return routers
481504

482-
"""************************** AT LOW LEVEL ****************************"""
505+
# ************************** AT LOW LEVEL ****************************
483506

484507
@property
485508
def version(self):
509+
"""The cached version string retrieved via the AT+GMR command"""
486510
return self._version
487511

488512
def get_version(self):
@@ -500,6 +524,7 @@ def get_version(self):
500524

501525

502526
def hw_flow(self, flag):
527+
"""Turn on HW flow control (if available) on to allow data, or off to stop"""
503528
if self._rts_pin:
504529
self._rts_pin.value = not flag
505530

@@ -508,6 +533,7 @@ def at_response(self, at_cmd, timeout=5, retries=3):
508533
and then cut out the reply lines to return. We can set
509534
a variable timeout (how long we'll wait for response) and
510535
how many times to retry before giving up"""
536+
#pylint: disable=too-many-branches
511537
for _ in range(retries):
512538
self.hw_flow(True) # allow any remaning data to stream in
513539
time.sleep(0.1) # wait for uart data
@@ -571,9 +597,9 @@ def baudrate(self, baudrate):
571597
that we're still sync'd."""
572598
at_cmd = "AT+UART_CUR="+str(baudrate)+",8,1,0,"
573599
if self._rts_pin is not None:
574-
at_cmd +="2"
600+
at_cmd += "2"
575601
else:
576-
at_cmd +="0"
602+
at_cmd += "0"
577603
at_cmd += "\r\n"
578604
if self._debug:
579605
print("Changing baudrate to:", baudrate)
@@ -608,6 +634,7 @@ def soft_reset(self):
608634
return False
609635

610636
def factory_reset(self):
637+
"""Perform a hard reset, then send factory restore settings request"""
611638
self.hard_reset()
612639
self.at_response("AT+RESTORE", timeout=1)
613640
self._initialized = False

0 commit comments

Comments
 (0)