# Metro Matrix Clock # Runs on Airlift Metro M4 with 64x32 RGB Matrix display & shield import time import board import json import busio from digitalio import DigitalInOut import displayio import neopixel from adafruit_display_text.label import Label from adafruit_bitmap_font import bitmap_font from adafruit_matrixportal.matrix import Matrix from adafruit_esp32spi import adafruit_esp32spi from adafruit_esp32spi import adafruit_esp32spi_wifimanager import adafruit_esp32spi.adafruit_esp32spi_socket as socket import adafruit_minimqtt.adafruit_minimqtt as MQTT # Get wifi details and more from a secrets.py file try: from secrets import secrets except ImportError: print("WiFi secrets are kept in secrets.py, please add them there!") raise print("ClusterX Prices") # If you are using a board with pre-defined ESP32 Pins: esp32_cs = DigitalInOut(board.ESP_CS) esp32_ready = DigitalInOut(board.ESP_BUSY) esp32_reset = DigitalInOut(board.ESP_RESET) spi = busio.SPI(board.SCK, board.MOSI, board.MISO) esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) status_light = neopixel.NeoPixel( board.NEOPIXEL, 1, brightness=0.2 ) wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light) # --- Display setup --- matrix = Matrix() display = matrix.display # --- Drawing setup --- group = displayio.Group(max_size=6) # Create a Group bitmap = displayio.Bitmap(64, 32, 2) # Create a bitmap object,width, height, bit depth color = displayio.Palette(4) # Create a color palette color[0] = 0x000000 # black background color[1] = 0xFF0000 # red color[2] = 0xCC4000 # amber color[3] = 0x85FF00 # greenish # Create a TileGrid using the Bitmap and Palette tile_grid = displayio.TileGrid(bitmap, pixel_shader=color) group.append(tile_grid) # Add the TileGrid to the Group display.show(group) font = bitmap_font.load_font("/bitocra-13.bdf") font_small = bitmap_font.load_font("/bitocra-full.bdf") title_label = Label(font_small, max_glyphs=12) title_label.y = 4 title_label.color = color[2] price_label = Label(font, max_glyphs=6) price_label.y = 14 change_label = Label(font, max_glyphs=6) change_label.y = 24 wti_topic="cell/403111" group.append(title_label) group.append(price_label) group.append(change_label) # Define callback methods which are called when events occur # pylint: disable=unused-argument, redefined-outer-name def connected(client, userdata, flags, rc): # This function will be called when the client is connected # successfully to the broker. print("Connected to MQTT broker! Listening for topic changes on %s" % wti_topic) title_label.text = "{}".format("Connected!") # Subscribe to all changes on the default_topic feed. client.subscribe(wti_topic) def disconnected(client, userdata, rc): # This method is called when the client is disconnected print("Disconnected from MQTT Broker!") title_label.text = "{}".format("Disconnect") def message(client, topic, message): """Method callled when a client's subscribed feed has a new value. :param str topic: The topic of the feed with a new value. :param str message: The new value """ print("New message on topic {0}: {1}".format(topic, message)) try: data = json.loads(message) title_label.text = "{title}".format( title=data["title"][0:10] ) price_label.text = "{price}".format( price=data["data"]["value"] ) change_label.text = "{price}".format( price=data["data"]["suffix"] ) title_label.x = round(display.width / 2 - title_label.bounding_box[2] / 2) price_label.x = round(display.width / 2 - price_label.bounding_box[2] / 2) change_label.x = round(display.width / 2 - change_label.bounding_box[2] / 2) if float(data["data"]["suffix"]) > 0: change_label.color = color[3] elif float(data["data"]["suffix"]) < 0: change_label.color = color[1] except (ValueError, RuntimeError) as e: print("Failed to parse data\n", e) print("Connecting to WiFi...") title_label.text = "{}".format("Conn Wifi..") wifi.connect() print("Connected!") title_label.text = "{}".format("Connected!") MQTT.set_socket(socket, esp) # Set up a MiniMQTT Client mqtt_client = MQTT.MQTT( broker=secrets["mqtt_host"], username=secrets["mqtt_user"], password=secrets["mqtt_pass"] ) # Setup the callback methods above mqtt_client.on_connect = connected mqtt_client.on_disconnect = disconnected mqtt_client.on_message = message # Connect the client to the MQTT broker. print("Connecting to MQTT broker...") title_label.text = "{}".format("Conn MQTT..") mqtt_client.connect() # Start a blocking message loop... # NOTE: NO code below this loop will execute # NOTE: Network reconnection is handled within this loop while True: try: mqtt_client.loop() except (ValueError, RuntimeError) as e: print("Failed to get data, retrying\n", e) wifi.reset() mqtt_client.reconnect() continue time.sleep(30)