Skip to content

Connection interrupted intermittently #551

Closed
@jmklix

Description

@jmklix

Discussed in #548

Originally posted by tucker-SB January 24, 2024
I have the follow python code, which is modified from the pubsub sample

from awscrt import mqtt, http
from awsiot import mqtt_connection_builder
import sys
import secrets
import threading
import time
import json
import os
class example_settings:
    def __init__(self, cred_dir, sn, mode):
        self.CREDENTIALS_DIR = cred_dir
        self.SERIAL_NUMBER = sn
        self.HEADLESS_START = mode


def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. error: {}".format(error))


# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))

    if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
        print("Session did not persist. Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()

        # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
        # evaluate result with a callback instead.
        resubscribe_future.add_done_callback(on_resubscribe_complete)


def on_resubscribe_complete(resubscribe_future):
    resubscribe_results = resubscribe_future.result()
    print("Resubscribe results: {}".format(resubscribe_results))

    for topic, qos in resubscribe_results['topics']:
        if qos is None:
            sys.exit("Server rejected resubscribe to topic: {}".format(topic))


# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
    print("Received message from topic '{}': {}".format(topic, payload))

# Callback when the connection successfully connects
def on_connection_success(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
    print("Connection Successful with return code: {} session present: {}".format(callback_data.return_code, callback_data.session_present))

# Callback when a connection attempt fails
def on_connection_failure(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionFailureData)
    print("Connection failed with error code: {}".format(callback_data.error))

# Callback when a connection has been disconnected or shutdown successfully
def on_connection_closed(connection, callback_data):
    print("Connection closed")

class mqtt_transmitter:
    def __init__(self, cert_fp, pkey_fp, ca_fp, endpoint="a3dkrncnrub5qb-ats.iot.us-east-2.amazonaws.com"):
        self.mqtt_connection = mqtt_connection_builder.mtls_from_path(
            endpoint=endpoint,
            port=8883,
            cert_filepath=os.path.join(settings.CREDENTIALS_DIR, cert_fp),
            pri_key_filepath=os.path.join(settings.CREDENTIALS_DIR, pkey_fp),
            ca_filepath=os.path.join(settings.CREDENTIALS_DIR, ca_fp),
            on_connection_interrupted=on_connection_interrupted,
            on_connection_resumed=on_connection_resumed,
            client_id=settings.SERIAL_NUMBER,
            clean_session=settings.HEADLESS_START == 'receiver',
            keep_alive_secs=30,
            http_proxy_options=None,
            on_connection_success=on_connection_success,
            on_connection_failure=on_connection_failure,
            on_connection_closed=on_connection_closed)

        '''
        if not cmdData.input_is_ci:
            print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}'...")
        else:
            print("Connecting to endpoint with client ID")
        '''
        connect_future = self.mqtt_connection.connect()

        # Future.result() waits until a result is available
        connect_future.result()


    def subscribe(self, topic, action=on_message_received):
        # Subscribe
        print("Subscribing to topic '{}'...".format(topic))
        subscribe_future, packet_id = self.mqtt_connection.subscribe(
            topic=topic,
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=action)

        subscribe_result = subscribe_future.result()
        print("Subscribed with {}".format(str(subscribe_result)))

    def publish(self, topic, payload:dict):
        self.mqtt_connection.publish(
            topic=topic,
            payload=json.dumps(payload),
            qos=mqtt.QoS.AT_LEAST_ONCE)

    def publish_example_message(self, topic, count):
        hexdata = secrets.token_hex(1280)
        self.publish(topic,
                     {'device':'labnuc',
                      'stream':'DEV_MFAM_13',
                      'count' :count,
                      'data'  :hexdata})
    def disconnect(self):
        fut = self.mqtt_connection.disconnect()
        fut.result()

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('serial_number', help='device serial number, used for mqtt ClientID')
    parser.add_argument('mode', help='receiver or publisher')
    parser.add_argument('cred_path', help='path to directory containing credentials')
    parser.add_argument('cert_fp', help='cert filename')
    parser.add_argument('pkey_fp', help='private key filename')
    parser.add_argument('ca_fp', help='ca filename')
    topic = 'datastream'

    args = parser.parse_args()
    settings = example_settings(args.cred_path, args.serial_number, args.mode)

    transmitter = mqtt_transmitter(args.cert_fp, args.pkey_fp, args.ca_fp)

    if args.mode == 'publisher':
        for i in range(1000):
            transmitter.publish_example_message(topic, i)
            time.sleep(.1)

    if args.mode == 'receiver':
        transmitter.subscribe(topic)
        for i in range(1000): # hold program open
            time.sleep(.1)

else:
    from src.Settings.settingsMgr import settingsManager as settings

I am currently trying to setup a system of one publisher which publishes data 40 times per second, and one subscriber. Each creates a mqtt_transmitter object. The issue is that on both sides I intermittently receive the following

'Connection interrupted. error: AWS_ERROR_MQTT_UNEXPECTED_HANGUP: The connection was closed unexpectedly.'

The data being published is time series data so interpreting it on the subscriber side I can see that it is correct and comes in with low latency (although out of order), but drops out completely for seconds at a time.

Interestingly I created a stripped example in the main block of the above code, where a 'publisher' and 'receiver' can be created where the publisher publishes identically structured data as my system. Running these two the data comes in order, with no issue.

Wondering what about my complete system could cause this issue with intermittent AWS_ERROR_MQTT_UNEXPECTED_HANGUP. It is a multi-threaded program, but I only have one thread publishing to the mqtt_transmitter. Does the connection object somehow need to be contained in it's own dedicated thread?

Metadata

Metadata

Assignees

Labels

bugThis issue is a bug.closed-for-stalenessp2This is a standard priority issueresponse-requestedWaiting on additional info and feedback. Will move to "closing-soon" in 2 days.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions