Skip to content

Race condition in subscribe #192

Closed
Closed
@romkey

Description

@romkey

The subscribe() method sends a subscription request to the broker and waits until it receives a SUBACK message. While waiting, if it receives any messages that aren't of type SUBACK it raises an MMQTTException: "invalid message received as response to SUBSCRIBE: {hex(op)}"

It's valid to receive PUBLISH messages between the sending the SUBSCRIBE and receiving the SUBACK. Even if the broker acts synchronously in response to the SUBSCRIBE, it could have been transmitting a PUBLISH at the same time as the client was sending the SUBSCRIBE. This will only happen when the application subscribes to multiple topics, but can happen easily on a busy MQTT network.

Here's debugging output showing an example of this happening. In this case multiple subscribe requests are issued inside the onConnect callback.

Connecting to WiFi...
Connected!
946.886: DEBUG - Attempting to connect to MQTT broker (attempt #0)
946.887: DEBUG - Attempting to establish MQTT connection...
946.890: INFO - Establishing an INSECURE connection to 10.0.1.11:1883
947.001: DEBUG - Sending CONNECT to broker...
947.003: DEBUG - Fixed Header: bytearray(b'\x10)\x00')
947.005: DEBUG - Variable Header: bytearray(b'\x04MQTT\x04\xc2\x00<')
947.080: DEBUG - Receiving CONNACK packet from broker
947.092: DEBUG - Got message type: 0x20 pkt: 0x20
Connected to MQTT broker! Listening for topic changes
947.098: DEBUG - SUBSCRIBING to topic zigbee2mqtt/# with QoS 0
947.119: DEBUG - Got message type: 0x90 pkt: 0x90
947.127: DEBUG - SUBSCRIBING to topic givemeasign/# with QoS 0
947.143: DEBUG - Got message type: 0x30 pkt: 0x31
947.158: DEBUG - Receiving PUBLISH
Topic: zigbee2mqtt/bridge/state
Msg: bytearray(b'online')

New message on topic zigbee2mqtt/bridge/state: online
947.163: INFO - MMQT error: invalid message received as response to SUBSCRIBE: 0x30
947.165: DEBUG - Reconnect timeout computed to 2.00
947.166: DEBUG - adding jitter 0.91 to 2.00 seconds
947.167: DEBUG - Attempting to connect to MQTT broker (attempt #1)

The library shouldn't raise an exception in this case. The PUBLISH has already been processed, the loop just needs to not raise an exception in this case.

When this happens during the onConnect callback, the connect() method catches the exception and tries to reconnect. If this happens every time, connect() will fail even though a successful connection was made each time.

The fix is very simple, just don't raise the exception if the non-SUBACK packet is a PUBLISH packet. By this point in the processing, the packet has already been properly handled.

Example code (based off Adafruit's example):

import os
import time
import board
import busio
from digitalio import DigitalInOut

import adafruit_logging

from adafruit_esp32spi import adafruit_esp32spi
import adafruit_esp32spi.adafruit_esp32spi_socket as socket

import adafruit_minimqtt.adafruit_minimqtt as MQTT

esp32_cs = DigitalInOut(board.ESP_CS)
esp32_ready = DigitalInOut(board.ESP_BUSY)
esp32_reset = DigitalInOut(board.ESP_RESET)

spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)

def connected(client, userdata, flags, rc):
    print(f"Connected to MQTT broker! Listening for topic changes")

    client.subscribe("zigbee2mqtt/#")
    client.subscribe("givemeasign/#")
    client.subscribe("rtlamr/#")
    return

def disconnected(client, userdata, rc):
    print("Disconnected from MQTT Broker!")

def message(client, topic, message):
    print(f"New message on topic {topic}: {message}")

print("Connecting to WiFi...")
esp.connect_AP(os.getenv("wifi_ssid"), os.getenv("wifi_password"))
print("Connected!")

MQTT.set_socket(socket, esp)

mqtt_client = MQTT.MQTT(
                broker=os.getenv("MQTT_BROKER"),
                port=os.getenv("MQTT_PORT") or 1888,
                is_ssl=os.getenv("MQTT_SSL") or False,
                client_id=os.getenv("MQTT_CLIENTID"),
                username=os.getenv("MQTT_USERNAME"),
                password=os.getenv("MQTT_PASSWORD")
)

mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message

logger = adafruit_logging.Logger("logger", 0)
logger.addHandler(adafruit_logging.StreamHandler)

mqtt_client.enable_logger(adafruit_logging, 0)

mqtt_client.connect()
print("MQTT connect returned")

while True:
    mqtt_client.loop(timeout=0.1)

I've submitted PR 193 with a three line fix for the problem.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions