diff --git a/adafruit_magtag/fakerequests.py b/adafruit_magtag/fakerequests.py deleted file mode 100755 index d30a020..0000000 --- a/adafruit_magtag/fakerequests.py +++ /dev/null @@ -1,49 +0,0 @@ -# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries -# SPDX-FileCopyrightText: Copyright (c) 2020 Melissa LeBlanc-Williams for Adafruit Industries -# -# SPDX-License-Identifier: MIT -""" -`adafruit_magtag.fakerequests` -================================================================================ - -Helper Library for the Adafruit MagTag. - - -* Author(s): Melissa LeBlanc-Williams - -Implementation Notes --------------------- - -**Hardware:** - -* `Adafruit MagTag `_ - -**Software and Dependencies:** - -* Adafruit CircuitPython firmware for the supported boards: - https://github.com/adafruit/circuitpython/releases - -""" - -import json - -__version__ = "0.0.0-auto.0" -__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MagTag.git" - - -class Fake_Requests: - """For faking 'requests' using a local file instead of the network.""" - - def __init__(self, filename): - self._filename = filename - - def json(self): - """json parsed version for local requests.""" - with open(self._filename, "r") as file: - return json.load(file) - - @property - def text(self): - """raw text version for local requests.""" - with open(self._filename, "r") as file: - return file.read() diff --git a/adafruit_magtag/graphics.py b/adafruit_magtag/graphics.py index cd89613..ba71c14 100755 --- a/adafruit_magtag/graphics.py +++ b/adafruit_magtag/graphics.py @@ -23,18 +23,20 @@ * Adafruit CircuitPython firmware for the supported boards: https://github.com/adafruit/circuitpython/releases +* Adafruit's PortalBase library: https://github.com/adafruit/Adafruit_CircuitPython_PortalBase + """ import gc from time import sleep import board -import displayio +from adafruit_portalbase.graphics import GraphicsBase __version__ = "0.0.0-auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MagTag.git" -class Graphics: +class Graphics(GraphicsBase): """Graphics Helper Class for the MagTag Library :param default_bg: The path to your default background image file or a hex color. @@ -50,53 +52,13 @@ class Graphics: def __init__( self, *, default_bg=0xFFFFFF, auto_refresh=True, rotation=270, debug=False ): - self._debug = debug - if not hasattr(board, "DISPLAY"): - import adafruit_il0373 # pylint: disable=import-outside-toplevel - - displayio.release_displays() - display_bus = displayio.FourWire( - board.SPI(), - command=board.EPD_DC, - chip_select=board.EPD_CS, - reset=board.EPD_RESET, - baudrate=1000000, - ) - - self.display = adafruit_il0373.IL0373( - display_bus, - width=296, - height=128, - rotation=rotation, - black_bits_inverted=False, - color_bits_inverted=False, - grayscale=True, - refresh_time=1, - seconds_per_frame=1, - ) - else: - self.display = board.DISPLAY - self.display.rotation = rotation - + self.display = board.DISPLAY + self.display.rotation = rotation self.auto_refresh = auto_refresh - - if self._debug: - print("Init display") - self.splash = displayio.Group(max_size=15) self._qr_group = None - if self._debug: - print("Init background") - self._bg_group = displayio.Group(max_size=1) - self._bg_file = None - self.splash.append(self._bg_group) - self.display.show(self.splash) - - # set the default background - if default_bg is not None: - self.set_background(default_bg) - gc.collect() + super().__init__(board.DISPLAY, default_bg=default_bg, debug=debug) def set_background(self, file_or_color, position=None): """The background image to a bitmap file. @@ -105,39 +67,7 @@ def set_background(self, file_or_color, position=None): :param tuple position: Optional x and y coordinates to place the background at. """ - while self._bg_group: - self._bg_group.pop() - - if not position: - position = (0, 0) # default in top corner - - if not file_or_color: - return # we're done, no background desired - if self._bg_file: - self._bg_file.close() - if isinstance(file_or_color, str): # its a filenme: - self._bg_file = open(file_or_color, "rb") - background = displayio.OnDiskBitmap(self._bg_file) - self._bg_sprite = displayio.TileGrid( - background, - pixel_shader=displayio.ColorConverter(), - x=position[0], - y=position[1], - ) - elif isinstance(file_or_color, int): - # Make a background color fill - color_bitmap = displayio.Bitmap(self.display.width, self.display.height, 1) - color_palette = displayio.Palette(1) - color_palette[0] = file_or_color - self._bg_sprite = displayio.TileGrid( - color_bitmap, - pixel_shader=color_palette, - x=position[0], - y=position[1], - ) - else: - raise RuntimeError("Unknown type of background") - self._bg_group.append(self._bg_sprite) + super().set_background(file_or_color, position) if self.auto_refresh: self.display.refresh() gc.collect() @@ -153,52 +83,7 @@ def qrcode( :param y: The y position of upper left corner of the QR code on the display. """ - import adafruit_miniqr # pylint: disable=import-outside-toplevel - - # generate the QR code - for qrtype in range(1, 5): - try: - qrcode = adafruit_miniqr.QRCode(qr_type=qrtype) - qrcode.add_data(qr_data) - qrcode.make() - break - except RuntimeError: - pass - # print("Trying with larger code") - else: - raise RuntimeError("Could not make QR code") - # monochrome (2 color) palette - palette = displayio.Palette(2) - palette[0] = 0xFFFFFF - palette[1] = qr_color - - # pylint: disable=invalid-name - # bitmap the size of the matrix, plus border, monochrome (2 colors) - qr_bitmap = displayio.Bitmap( - qrcode.matrix.width + 2, qrcode.matrix.height + 2, 2 - ) - for i in range(qr_bitmap.width * qr_bitmap.height): - qr_bitmap[i] = 0 - - # transcribe QR code into bitmap - for xx in range(qrcode.matrix.width): - for yy in range(qrcode.matrix.height): - qr_bitmap[xx + 1, yy + 1] = 1 if qrcode.matrix[xx, yy] else 0 - - # display the QR code - qr_sprite = displayio.TileGrid(qr_bitmap, pixel_shader=palette) - if self._qr_group: - try: - self._qr_group.pop() - except IndexError: # later test if empty - pass - else: - self._qr_group = displayio.Group() - self.splash.append(self._qr_group) - self._qr_group.scale = qr_size - self._qr_group.x = x - self._qr_group.y = y - self._qr_group.append(qr_sprite) + super().qrcode(qr_data, qr_size=qr_size, x=x, y=y, qr_color=qr_color) if self.auto_refresh: self.display.refresh() sleep(5) diff --git a/adafruit_magtag/magtag.py b/adafruit_magtag/magtag.py index d0cd5cc..a160d8b 100755 --- a/adafruit_magtag/magtag.py +++ b/adafruit_magtag/magtag.py @@ -23,13 +23,13 @@ * Adafruit CircuitPython firmware for the supported boards: https://github.com/adafruit/circuitpython/releases +* Adafruit's PortalBase library: https://github.com/adafruit/Adafruit_CircuitPython_PortalBase + """ import gc import time -import terminalio -from adafruit_bitmap_font import bitmap_font -from adafruit_display_text.label import Label +from adafruit_portalbase import PortalBase from adafruit_magtag.network import Network from adafruit_magtag.graphics import Graphics from adafruit_magtag.peripherals import Peripherals @@ -38,7 +38,7 @@ __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MagTag.git" -class MagTag: +class MagTag(PortalBase): """Class representing the Adafruit MagTag. :param url: The URL of your data source. Defaults to ``None``. @@ -50,8 +50,8 @@ class MagTag: use regexp. :param default_bg: The path to your default background image file or a hex color. Defaults to 0x000000. - :param status_neopixel: The pin for the status NeoPixel. Use ``board.NEOPIXEL`` for the on-board - NeoPixel. Defaults to ``None``, not the status LED + :param status_neopixel: The pin for the status NeoPixel. Use ``board.NEOPIXEL`` for the + on-board NeoPixel. Defaults to ``None``, to not use the status LED :param json_transform: A function or a list of functions to call with the parsed JSON. Changes and additions are permitted for the ``dict`` object. :param rotation: Default rotation is landscape (270) but can be 0, 90, or 180 for @@ -75,251 +75,34 @@ def __init__( debug=False, ): - self._debug = debug - self.graphics = Graphics( + network = Network( + status_neopixel=status_neopixel, + extract_values=False, + debug=debug, + ) + + graphics = Graphics( default_bg=default_bg, auto_refresh=False, rotation=rotation, debug=debug, ) - self.display = self.graphics.display - self.network = Network( - status_neopixel=status_neopixel, - extract_values=False, + super().__init__( + network, + graphics, + url=url, + headers=headers, + json_path=json_path, + regexp_path=regexp_path, + json_transform=json_transform, debug=debug, ) - self._url = None - self.url = url - self._headers = headers - self._json_path = None - self.json_path = json_path - - # Font Cache - self._fonts = {} - - try: - import alarm # pylint: disable=import-outside-toplevel - - self._alarm = alarm - except ImportError: - self._alarm = None - - self._regexp_path = regexp_path - - self.splash = self.graphics.splash - self.peripherals = Peripherals() - # Add any JSON translators - if json_transform: - self.network.add_json_transform(json_transform) - - self._text = [] - self._text_color = [] - self._text_position = [] - self._text_wrap = [] - self._text_maxlen = [] - self._text_transform = [] - self._text_scale = [] - self._text_font = [] - self._text_line_spacing = [] - self._text_anchor_point = [] - self._text_is_data = [] - - gc.collect() - - # pylint: disable=too-many-arguments - def add_text( - self, - text_position=(0, 0), - text_font=terminalio.FONT, - text_color=0x000000, - text_wrap=0, - text_maxlen=0, - text_transform=None, - text_scale=1, - line_spacing=1.25, - text_anchor_point=(0, 0.5), - is_data=True, - ): - """ - Add text labels with settings - - :param str text_font: The path to your font file for your data text display. - :param text_position: The position of your extracted text on the display in an (x, y) tuple. - Can be a list of tuples for when there's a list of json_paths, for - example. - :param text_color: The color of the text, in 0xRRGGBB format. Can be a list of colors for - when there's multiple texts. Defaults to ``None``. - :param text_wrap: When non-zero, the maximum number of characters on each line before text - is wrapped. (for long text data chunks). Defaults to 0, no wrapping. - :param text_maxlen: The max length of the text. If non-zero, it will be truncated to this - length. Defaults to 0. - :param text_transform: A function that will be called on the text before display - :param int text_scale: The factor to scale the default size of the text by - :param float line_spacing: The factor to space the lines apart - :param (float, float) text_anchor_point: Values between 0 and 1 to indicate where the text - position is relative to the label - :param bool is_data: If True, fetch will attempt to update the label - """ - - if not text_wrap: - text_wrap = 0 - if not text_maxlen: - text_maxlen = 0 - if not text_transform: - text_transform = None - if not isinstance(text_scale, (int, float)) or text_scale < 1: - text_scale = 1 - if not isinstance(text_anchor_point, (tuple, list)): - text_anchor_point = (0, 0.5) - if not 0 <= text_anchor_point[0] <= 1 or not 0 <= text_anchor_point[1] <= 1: - raise ValueError("Text anchor point values should be between 0 and 1.") - text_scale = round(text_scale) gc.collect() - if self._debug: - print("Init text area") - self._text.append(None) - self._text_font.append(self._load_font(text_font)) - self._text_color.append(self.html_color_convert(text_color)) - self._text_position.append(text_position) - self._text_wrap.append(text_wrap) - self._text_maxlen.append(text_maxlen) - self._text_transform.append(text_transform) - self._text_scale.append(text_scale) - self._text_line_spacing.append(line_spacing) - self._text_anchor_point.append(text_anchor_point) - self._text_is_data.append(bool(is_data)) - - return len(self._text) - 1 - - # pylint: enable=too-many-arguments - - def _load_font(self, font): - """ - Load and cache a font if not previously loaded - Return the key of the cached font - """ - if font is terminalio.FONT: - if "terminal" not in self._fonts: - self._fonts["terminal"] = terminalio.FONT - return "terminal" - if font not in self._fonts: - self._fonts[font] = bitmap_font.load_font(font) - return font - - @staticmethod - def html_color_convert(color): - """Convert an HTML color code to an integer - - :param color: The color value to be converted - - """ - if isinstance(color, str): - if color[0] == "#": - color = color.lstrip("#") - return int(color, 16) - return color # Return unconverted - - def set_headers(self, headers): - """Set the headers used by fetch(). - - :param headers: The new header dictionary - - """ - self._headers = headers - - def set_background(self, file_or_color, position=None): - """The background image to a bitmap file. - - :param file_or_color: The filename of the chosen background image, or a hex color. - - """ - self.graphics.set_background(file_or_color, position) - - def preload_font(self, glyphs=None, index=0): - # pylint: disable=line-too-long - """Preload font. - - :param glyphs: The font glyphs to load. Defaults to ``None``, uses alphanumeric glyphs if - None. - """ - # pylint: enable=line-too-long - if not glyphs: - glyphs = b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-!,. \"'?!" - print("Preloading font glyphs:", glyphs) - if self._fonts[self._text_font[index]] is not terminalio.FONT: - self._fonts[self._text_font[index]].load_glyphs(glyphs) - - def set_text_color(self, color, index=0): - """Update the text color, with indexing into our list of text boxes. - - :param int color: The color value to be used - :param index: Defaults to 0. - - """ - if self._text[index]: - color = self.html_color_convert(color) - self._text_color[index] = color - self._text[index].color = color - - def set_text(self, val, index=0, auto_refresh=True): - """Display text, with indexing into our list of text boxes. - - :param str val: The text to be displayed - :param index: Defaults to 0. - - """ - # Make sure at least a single label exists - if not self._text: - self.add_text() - string = str(val) - if self._text_maxlen[index]: - if len(string) > self._text_maxlen[index]: - # too long! shorten it - string = string[: self._text_maxlen[index] - 3] - string += "..." - index_in_splash = None - - if len(string) > 0 and self._text_wrap[index]: - if self._debug: - print("Wrapping text with length of", self._text_wrap[index]) - lines = self.wrap_nicely(string, self._text_wrap[index]) - string = "\n".join(lines) - - if self._text[index] is not None: - if self._debug: - print("Replacing text area with :", string) - index_in_splash = self.splash.index(self._text[index]) - elif self._debug: - print("Creating text area with :", string) - - if len(string) > 0: - self._text[index] = Label( - self._fonts[self._text_font[index]], - text=string, - scale=self._text_scale[index], - ) - self._text[index].color = self._text_color[index] - self._text[index].anchor_point = self._text_anchor_point[index] - self._text[index].anchored_position = self._text_position[index] - self._text[index].line_spacing = self._text_line_spacing[index] - elif index_in_splash is not None: - self._text[index] = None - - if index_in_splash is not None: - if self._text[index] is not None: - self.splash[index_in_splash] = self._text[index] - else: - del self.splash[index_in_splash] - elif self._text[index] is not None: - self.splash.append(self._text[index]) - if auto_refresh: - self.refresh() - def exit_and_deep_sleep(self, sleep_time): """ Stops the current program and enters deep sleep. The program is restarted from the beginning @@ -334,14 +117,7 @@ def exit_and_deep_sleep(self, sleep_time): if self._alarm: self.peripherals.neopixel_disable = True self.peripherals.speaker_disable = True - pause = self._alarm.time.TimeAlarm( - monotonic_time=time.monotonic() + sleep_time - ) - self._alarm.exit_and_deep_sleep_until_alarms(pause) - else: - raise NotImplementedError( - "Deep sleep not supported. Make sure you have the latest CircuitPython." - ) + super().exit_and_deep_sleep(sleep_time) def enter_light_sleep(self, sleep_time): """ @@ -359,70 +135,31 @@ def enter_light_sleep(self, sleep_time): self.peripherals.neopixel_disable = True speaker_state = self.peripherals.speaker_disable self.peripherals.speaker_disable = True - pause = self._alarm.time.TimeAlarm( - monotonic_time=time.monotonic() + sleep_time - ) - self._alarm.light_sleep_until_alarms(pause) - self.peripherals.neopixel_disable = neopixel_state - self.peripherals.speaker_disable = speaker_state - for i in range(4): - self.peripherals.neopixels[i] = neopixel_values[i] - gc.collect() - else: - raise NotImplementedError( - "Hardware light sleep not supported. Make sure you have the latest CircuitPython." - ) - - def get_local_time(self, location=None): - """Accessor function for get_local_time()""" - return self.network.get_local_time(location=location) - - def push_to_io(self, feed_key, data): - """Push data to an adafruit.io feed - - :param str feed_key: Name of feed key to push data to. - :param data: data to send to feed - - """ - - self.network.push_to_io(feed_key, data) - - def get_io_data(self, feed_key): - """Return all values from the Adafruit IO Feed Data that matches the feed key - - :param str feed_key: Name of feed key to receive data from. - - """ - - return self.network.get_io_data(feed_key) + super().enter_light_sleep(sleep_time) + self.peripherals.neopixel_disable = neopixel_state + self.peripherals.speaker_disable = speaker_state + for i in range(4): + self.peripherals.neopixels[i] = neopixel_values[i] + gc.collect() - def get_io_feed(self, feed_key, detailed=False): - """Return the Adafruit IO Feed that matches the feed key + # pylint: disable=arguments-differ + def set_text(self, val, index=0, auto_refresh=True): + """Display text, with indexing into our list of text boxes. - :param str feed_key: Name of feed key to match. - :param bool detailed: Whether to return additional detailed information + :param str val: The text to be displayed + :param index: Defaults to 0. + :param auto_refresh: Automatically refresh the display after setting the + text. Defaults to True """ - return self.network.get_io_feed(feed_key, detailed) - - def get_io_group(self, group_key): - """Return the Adafruit IO Group that matches the group key - - :param str group_key: Name of group key to match. + super().set_text(val, index) + if auto_refresh: + self.refresh() - """ - return self.network.get_io_group(group_key) + # pylint: enable=arguments-differ - def refresh(self): - """ - Refresh the display - """ - while True: - try: - self.graphics.display.refresh() - return - except RuntimeError: - time.sleep(1) + def _fetch_set_text(self, val, index=0): + self.set_text(val, index=index, auto_refresh=False) def fetch(self, refresh_url=None, timeout=10): """Fetch data from the url we initialized with, perfom any parsing, @@ -433,94 +170,18 @@ def fetch(self, refresh_url=None, timeout=10): :param int timeout: The timeout period in seconds. """ - if refresh_url: - self._url = refresh_url - values = [] - - values = self.network.fetch_data( - self._url, - headers=self._headers, - json_path=self._json_path, - regexp_path=self._regexp_path, - timeout=timeout, - ) - # fill out all the text blocks - if self._text: - value_index = 0 # In case values and text is not the same - for i in range(len(self._text)): - if not self._text_is_data[i]: - continue - string = None - if self._text_transform[i]: - func = self._text_transform[i] - string = func(values[value_index]) - else: - try: - string = "{:,d}".format(int(values[value_index])) - except (TypeError, ValueError): - string = values[value_index] # ok its a string - self.set_text(string, index=i, auto_refresh=False) - value_index += 1 + values = super().fetch(refresh_url=refresh_url, timeout=timeout) self.refresh() - if len(values) == 1: - return values[0] return values - # return a list of lines with wordwrapping - @staticmethod - def wrap_nicely(string, max_chars): - """A helper that will return a list of lines with word-break wrapping. - - :param str string: The text to be wrapped. - :param int max_chars: The maximum number of characters on a line before wrapping. - - """ - string = string.replace("\n", "").replace("\r", "") # strip confusing newlines - words = string.split(" ") - the_lines = [] - the_line = "" - for w in words: - if len(the_line + " " + w) <= max_chars: - the_line += " " + w - else: - the_lines.append(the_line) - the_line = "" + w - if the_line: # last line remaining - the_lines.append(the_line) - # remove first space from first line: - the_lines[0] = the_lines[0][1:] - return the_lines - - @property - def url(self): - """ - Get or set the URL of your data source. - """ - return self._url - - @url.setter - def url(self, value): - self._url = value - if value and not self.network.uselocal: - self.network.connect() - # if self._debug: - # print("My IP address is", self.network.ip_address) - - @property - def json_path(self): + def refresh(self): """ - Get or set the list of json traversal to get data out of. Can be list - of lists for multiple data points. + Refresh the display """ - return self._json_path - - @json_path.setter - def json_path(self, value): - if value: - if isinstance(value[0], (list, tuple)): - self._json_path = value - else: - self._json_path = (value,) - else: - self._json_path = None + while True: + try: + self.graphics.display.refresh() + return + except RuntimeError: + time.sleep(1) diff --git a/adafruit_magtag/network.py b/adafruit_magtag/network.py index 26d9d22..5a61a37 100755 --- a/adafruit_magtag/network.py +++ b/adafruit_magtag/network.py @@ -23,63 +23,18 @@ * Adafruit CircuitPython firmware for the supported boards: https://github.com/adafruit/circuitpython/releases +* Adafruit's PortalBase library: https://github.com/adafruit/Adafruit_CircuitPython_PortalBase + """ -import os -import time -import gc -from micropython import const -from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError -import supervisor -import rtc +from adafruit_portalbase.network import NetworkBase from adafruit_magtag.wifi_module import WiFi -from adafruit_magtag.fakerequests import Fake_Requests - - -try: - from secrets import secrets -except ImportError: - print( - """WiFi settings are kept in secrets.py, please add them there! -the secrets dictionary must contain 'ssid' and 'password' at a minimum""" - ) - raise __version__ = "0.0.0-auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MagTag.git" -# pylint: disable=line-too-long -# pylint: disable=too-many-lines -# you'll need to pass in an io username and key -TIME_SERVICE = ( - "https://io.adafruit.com/api/v2/%s/integrations/time/strftime?x-aio-key=%s" -) -# our strftime is %Y-%m-%d %H:%M:%S.%L %j %u %z %Z see http://strftime.net/ for decoding details -# See https://apidock.com/ruby/DateTime/strftime for full options -TIME_SERVICE_STRFTIME = ( - "&fmt=%25Y-%25m-%25d+%25H%3A%25M%3A%25S.%25L+%25j+%25u+%25z+%25Z" -) -LOCALFILE = "local.txt" -# pylint: enable=line-too-long - -STATUS_NO_CONNECTION = (100, 0, 0) -STATUS_CONNECTING = (0, 0, 100) -STATUS_FETCHING = (200, 100, 0) -STATUS_DOWNLOADING = (0, 100, 100) -STATUS_CONNECTED = (0, 100, 0) -STATUS_DATA_RECEIVED = (0, 0, 100) -STATUS_OFF = (0, 0, 0) - -CONTENT_TEXT = const(1) -CONTENT_JSON = const(2) -CONTENT_IMAGE = const(3) - -class HttpError(Exception): - """HTTP Specific Error""" - - -class Network: +class Network(NetworkBase): """Class representing the Adafruit MagTag. :param status_neopixel: The pin for the status NeoPixel. Use ``board.NEOPIXEL`` for the on-board @@ -98,452 +53,11 @@ def __init__( extract_values=True, debug=False, ): - self._wifi = WiFi(status_neopixel=status_neopixel) - self._debug = debug - self.json_transform = [] - self._extract_values = extract_values - - # This may be removed. Using for testing - self.requests = None - - try: - os.stat(LOCALFILE) - self.uselocal = True - except OSError: - self.uselocal = False - - gc.collect() - - def neo_status(self, value): - """The status NeoPixel. - - :param value: The color to change the NeoPixel. - - """ - self._wifi.neo_status(value) - - @staticmethod - def json_traverse(json, path): - """ - Traverse down the specified JSON path and return the value or values - - :param json: JSON data to traverse - :param list path: The path that we want to follow - - """ - value = json - if not isinstance(path, (list, tuple)): - raise ValueError( - "The json_path parameter should be enclosed in a list or tuple." - ) - for x in path: - try: - value = value[x] - except (TypeError, KeyError, IndexError) as error: - raise ValueError( - "The specified json_path was not found in the results." - ) from error - gc.collect() - return value - - def add_json_transform(self, json_transform): - """Add a function that is applied to JSON data when data is fetched - - :param json_transform: A function or a list of functions to call with the parsed JSON. - Changes and additions are permitted for the ``dict`` object. - """ - if callable(json_transform): - self.json_transform.append(json_transform) - else: - self.json_transform.extend(filter(callable, json_transform)) - - def get_local_time(self, location=None): - # pylint: disable=line-too-long - """ - Fetch and "set" the local time of this microcontroller to the local time at the location, using an internet time API. - - :param str location: Your city and country, e.g. ``"New York, US"``. - - """ - # pylint: enable=line-too-long - self.connect() - api_url = None - try: - aio_username = secrets["aio_username"] - aio_key = secrets["aio_key"] - except KeyError: - raise KeyError( - "\n\nOur time service requires a login/password to rate-limit. Please register for a free adafruit.io account and place the user/key in your secrets file under 'aio_username' and 'aio_key'" # pylint: disable=line-too-long - ) from KeyError - - location = secrets.get("timezone", location) - if location: - print("Getting time for timezone", location) - api_url = (TIME_SERVICE + "&tz=%s") % (aio_username, aio_key, location) - else: # we'll try to figure it out from the IP address - print("Getting time from IP address") - api_url = TIME_SERVICE % (aio_username, aio_key) - api_url += TIME_SERVICE_STRFTIME - try: - response = self._wifi.requests.get(api_url, timeout=10) - if response.status_code != 200: - error_message = ( - "Error connection to Adafruit IO. The response was: " - + response.text - ) - raise ValueError(error_message) - if self._debug: - print("Time request: ", api_url) - print("Time reply: ", response.text) - times = response.text.split(" ") - the_date = times[0] - the_time = times[1] - year_day = int(times[2]) - week_day = int(times[3]) - is_dst = None # no way to know yet - except KeyError: - raise KeyError( - "Was unable to lookup the time, try setting secrets['timezone'] according to http://worldtimeapi.org/timezones" # pylint: disable=line-too-long - ) from KeyError - year, month, mday = [int(x) for x in the_date.split("-")] - the_time = the_time.split(".")[0] - hours, minutes, seconds = [int(x) for x in the_time.split(":")] - now = time.struct_time( - (year, month, mday, hours, minutes, seconds, week_day, year_day, is_dst) + super().__init__( + WiFi(status_neopixel=status_neopixel), + extract_values=extract_values, + debug=debug, ) - rtc.RTC().datetime = now - - # now clean up - response.close() - response = None - gc.collect() - - def wget(self, url, filename, *, chunk_size=12000): - """Download a url and save to filename location, like the command wget. - - :param url: The URL from which to obtain the data. - :param filename: The name of the file to save the data to. - :param chunk_size: how much data to read/write at a time. - - """ - print("Fetching stream from", url) - - self.neo_status(STATUS_FETCHING) - response = self._wifi.requests.get(url, stream=True) - - headers = {} - for title, content in response.headers.items(): - headers[title.lower()] = content - - if response.status_code == 200: - print("Reply is OK!") - self.neo_status((0, 0, 100)) # green = got data - else: - if self._debug: - if "content-length" in headers: - print("Content-Length: {}".format(int(headers["content-length"]))) - if "date" in headers: - print("Date: {}".format(headers["date"])) - self.neo_status((100, 0, 0)) # red = http error - raise HttpError( - "Code {}: {}".format( - response.status_code, response.reason.decode("utf-8") - ) - ) - - if self._debug: - print(response.headers) - if "content-length" in headers: - content_length = int(headers["content-length"]) - else: - raise RuntimeError("Content-Length missing from headers") - remaining = content_length - print("Saving data to ", filename) - stamp = time.monotonic() - file = open(filename, "wb") - for i in response.iter_content(min(remaining, chunk_size)): # huge chunks! - self.neo_status(STATUS_DOWNLOADING) - remaining -= len(i) - file.write(i) - if self._debug: - print( - "Read %d bytes, %d remaining" - % (content_length - remaining, remaining) - ) - else: - print(".", end="") - if not remaining: - break - self.neo_status(STATUS_FETCHING) - file.close() - - response.close() - stamp = time.monotonic() - stamp - print( - "Created file of %d bytes in %0.1f seconds" % (os.stat(filename)[6], stamp) - ) - self.neo_status(STATUS_OFF) - if not content_length == os.stat(filename)[6]: - raise RuntimeError - - def connect(self): - """ - Connect to WiFi using the settings found in secrets.py - """ - self._wifi.neo_status(STATUS_CONNECTING) - while not self._wifi.is_connected: - # secrets dictionary must contain 'ssid' and 'password' at a minimum - print("Connecting to AP", secrets["ssid"]) - if secrets["ssid"] == "CHANGE ME" or secrets["password"] == "CHANGE ME": - change_me = "\n" + "*" * 45 - change_me += "\nPlease update the 'secrets.py' file on your\n" - change_me += "CIRCUITPY drive to include your local WiFi\n" - change_me += "access point SSID name in 'ssid' and SSID\n" - change_me += "password in 'password'. Then save to reload!\n" - change_me += "*" * 45 - raise OSError(change_me) - self._wifi.neo_status(STATUS_NO_CONNECTION) # red = not connected - try: - self._wifi.connect(secrets["ssid"], secrets["password"]) - self.requests = self._wifi.requests - except RuntimeError as error: - print("Could not connect to internet", error) - print("Retrying in 3 seconds...") - time.sleep(3) - - def _get_io_client(self): - self.connect() - - try: - aio_username = secrets["aio_username"] - aio_key = secrets["aio_key"] - except KeyError: - raise KeyError( - "Adafruit IO secrets are kept in secrets.py, please add them there!\n\n" - ) from KeyError - - return IO_HTTP(aio_username, aio_key, self._wifi.requests) - - def push_to_io(self, feed_key, data): - """Push data to an adafruit.io feed - - :param str feed_key: Name of feed key to push data to. - :param data: data to send to feed - - """ - - io_client = self._get_io_client() - - while True: - try: - feed_id = io_client.get_feed(feed_key) - except AdafruitIO_RequestError: - # If no feed exists, create one - feed_id = io_client.create_new_feed(feed_key) - except RuntimeError as exception: - print("An error occured, retrying! 1 -", exception) - continue - break - - while True: - try: - io_client.send_data(feed_id["key"], data) - except RuntimeError as exception: - print("An error occured, retrying! 2 -", exception) - continue - except NameError as exception: - print(feed_id["key"], data, exception) - continue - break - - def get_io_feed(self, feed_key, detailed=False): - """Return the Adafruit IO Feed that matches the feed key - - :param str feed_key: Name of feed key to match. - :param bool detailed: Whether to return additional detailed information - - """ - io_client = self._get_io_client() - - while True: - try: - return io_client.get_feed(feed_key, detailed=detailed) - except RuntimeError as exception: - print("An error occured, retrying! 1 -", exception) - continue - break - - def get_io_group(self, group_key): - """Return the Adafruit IO Group that matches the group key - - :param str group_key: Name of group key to match. - - """ - io_client = self._get_io_client() - - while True: - try: - return io_client.get_group(group_key) - except RuntimeError as exception: - print("An error occured, retrying! 1 -", exception) - continue - break - - def get_io_data(self, feed_key): - """Return all values from Adafruit IO Feed Data that matches the feed key - - :param str feed_key: Name of feed key to receive data from. - - """ - io_client = self._get_io_client() - - while True: - try: - return io_client.receive_all_data(feed_key) - except RuntimeError as exception: - print("An error occured, retrying! 1 -", exception) - continue - break - - def fetch(self, url, *, headers=None, timeout=10): - """Fetch data from the specified url and return a response object - - :param str url: The URL to fetch from. - :param list headers: Extra headers to include in the request. - :param int timeout: The timeout period in seconds. - - """ - gc.collect() - if self._debug: - print("Free mem: ", gc.mem_free()) # pylint: disable=no-member - - response = None - if self.uselocal: - print("*** USING LOCALFILE FOR DATA - NOT INTERNET!!! ***") - response = Fake_Requests(LOCALFILE) - - if not response: - self.connect() - # great, lets get the data - print("Retrieving data...", end="") - self.neo_status(STATUS_FETCHING) # yellow = fetching data - gc.collect() - response = self._wifi.requests.get(url, headers=headers, timeout=timeout) - gc.collect() - - return response - - def fetch_data( - self, - url, - *, - headers=None, - json_path=None, - regexp_path=None, - timeout=10, - ): - """Fetch data from the specified url and perfom any parsing - - :param str url: The URL to fetch from. - :param list headers: Extra headers to include in the request. - :param json_path: The path to drill down into the JSON data. - :param regexp_path: The path formatted as a regular expression to drill down - into the JSON data. - :param int timeout: The timeout period in seconds. - - """ - json_out = None - values = [] - content_type = CONTENT_TEXT - - response = self.fetch(url, headers=headers, timeout=timeout) - - headers = {} - for title, content in response.headers.items(): - headers[title.lower()] = content - gc.collect() - if self._debug: - print("Headers:", headers) - if response.status_code == 200: - print("Reply is OK!") - self.neo_status(STATUS_DATA_RECEIVED) # green = got data - if "content-type" in headers: - if "image/" in headers["content-type"]: - content_type = CONTENT_IMAGE - elif "application/json" in headers["content-type"]: - content_type = CONTENT_JSON - elif "application/javascript" in headers["content-type"]: - content_type = CONTENT_JSON - else: - if self._debug: - if "content-length" in headers: - print("Content-Length: {}".format(int(headers["content-length"]))) - if "date" in headers: - print("Date: {}".format(headers["date"])) - self.neo_status((100, 0, 0)) # red = http error - raise HttpError( - "Code {}: {}".format( - response.status_code, response.reason.decode("utf-8") - ) - ) - - if content_type == CONTENT_JSON and json_path is not None: - if isinstance(json_path, (list, tuple)) and ( - not json_path or not isinstance(json_path[0], (list, tuple)) - ): - json_path = (json_path,) - try: - gc.collect() - json_out = response.json() - if self._debug: - print(json_out) - gc.collect() - except ValueError: # failed to parse? - print("Couldn't parse json: ", response.text) - raise - except MemoryError: - supervisor.reload() - - if regexp_path: - import re # pylint: disable=import-outside-toplevel - - # optional JSON post processing, apply any transformations - # these MAY change/add element - for idx, json_transform in enumerate(self.json_transform): - try: - json_transform(json_out) - except Exception as error: - print("Exception from json_transform: ", idx, error) - raise - - # extract desired text/values from json - if json_out is not None and json_path: - for path in json_path: - try: - values.append(self.json_traverse(json_out, path)) - except KeyError: - print(json_out) - raise - elif content_type == CONTENT_TEXT and regexp_path: - for regexp in regexp_path: - values.append(re.search(regexp, response.text).group(1)) - else: - if json_out: - # No path given, so return JSON as string for compatibility - import json # pylint: disable=import-outside-toplevel - - values = json.dumps(response.json()) - else: - values = response.text - - # we're done with the requests object, lets delete it so we can do more! - json_out = None - response = None - gc.collect() - if self._extract_values and len(values) == 1: - return values[0] - - return values @property def enabled(self): diff --git a/adafruit_magtag/peripherals.py b/adafruit_magtag/peripherals.py index b37ca3e..75d00a6 100755 --- a/adafruit_magtag/peripherals.py +++ b/adafruit_magtag/peripherals.py @@ -23,6 +23,8 @@ * Adafruit CircuitPython firmware for the supported boards: https://github.com/adafruit/circuitpython/releases +* Adafruit's PortalBase library: https://github.com/adafruit/Adafruit_CircuitPython_PortalBase + """ import board diff --git a/adafruit_magtag/wifi_module.py b/adafruit_magtag/wifi_module.py index 7879afb..84dda5c 100755 --- a/adafruit_magtag/wifi_module.py +++ b/adafruit_magtag/wifi_module.py @@ -23,6 +23,8 @@ * Adafruit CircuitPython firmware for the supported boards: https://github.com/adafruit/circuitpython/releases +* Adafruit's PortalBase library: https://github.com/adafruit/Adafruit_CircuitPython_PortalBase + """ import gc diff --git a/docs/api.rst b/docs/api.rst index 5bfbe85..41694b9 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1,8 +1,5 @@ -.. automodule:: adafruit_magtag.fakerequests - :members: - .. automodule:: adafruit_magtag.graphics :members: diff --git a/requirements.txt b/requirements.txt index 7d663bf..e053b43 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,10 +5,7 @@ Adafruit-Blinka adafruit-blinka-displayio -adafruit-circuitpython-bitmap-font -adafruit-circuitpython-display-text +adafruit-circuitpython-portalbase adafruit-circuitpython-neopixel adafruit-circuitpython-requests -adafruit-circuitpython-adafruitio -adafruit-circuitpython-il0373 adafruit-circuitpython-simpleio