Skip to content

Connection drops for batch of clients at the exact same time #6532

Closed
@mkin1337

Description

@mkin1337

Basic Infos

  • This issue complies with the issue POLICY doc.
  • I have read the documentation at readthedocs and the issue is not addressed there.
  • I have tested that the issue is present in current master branch (aka latest git).
  • I have searched the issue tracker for a similar issue.
  • If there is a stack dump, I have decoded it.
  • I have filled out all fields below.

Platform

  • Hardware: [ESP-12E]
  • Core Version: [2.5.0]
  • Development Env: [Arduino IDE]
  • Operating System: [Windows & MacOS]

Settings in IDE

  • Module: [Nodemcu]
  • Flash Mode: [qio|dio|other]
  • Flash Size: [4MB]
  • lwip Variant: [v2 Lower Memory]
  • Reset Method: [ck|nodemcu]
  • Flash Frequency: [40Mhz]
  • CPU Frequency: [80Mhz]
  • Upload Using: [OTA|SERIAL (both)]
  • Upload Speed: [115200]

Problem Description

I am struggling with a project that has ESP8266 (NodeMCUs) as clients communicating to VerneMQ running on a DigitalOcean droplet. We have about 200 things at different locations sending about 20msg per minute each. Yesterday 18 clients at one location suddenly dropped connection. A month before 40 clients showed the same behaviour at another location.

All clients were able to connect to MQTT again by simply restarting them.

Can you help us troubleshooting the issue? Receiving debugs messages / dumps is quite difficult due to the ESP8266 being remote. We could have them hooked up to a computer running Arduino IDE but to be honest we do not know what else to check other than the serial print - especially as when restarting the ESP8266, the connection is established again.

We did not manage to reproduce the behaviour locally. :/

Thank you very much in advance. Please let me know if you need some more specs.

Expected behavior

At several remote locations, clients connect to the (MQTT) server and publish data to diverse topics. Each client has a unique ID (MAC address). At each location, the clients connect via (dedicated) WIFI.

Actual behaviour

Clients connect to the MQTT server and publish data to diverse topics. Occasionally a batch of clients drops connection at the same time at a location and is not able to reconnect unless the devices are rebooted. This is not happening isolated at one single location but is random.

Sketch [Cannot reproduce - therefore no MVCE, sorry]


#include <ESP8266WiFi.h>
#include <ESP8266WiFiMulti.h>
#include <Ticker.h>
#include <AsyncMqttClient.h>
#include <Bounce2.h>
#include <WiFiUdp.h>
#include <ESP8266HTTPClient.h>
#include <ESP8266httpUpdate.h>
#include <ESP8266Ping.h>

// --------------------- Sensor Settings

const char* MQTT_LOCATION = "REDACTED";
const char* PLACE = "REDACTED";
const char* SKETCH_VERSION = "14";

// --------------------- Network Settings

#define WIFI_SSID "REDACTED"
#define WIFI_PASSWORD "REDACTED"

bool IPStatic = false;
IPAddress staticIP(10,49,13,103); //ESP static ip
IPAddress gateway(10,49,13,1);   //IP Address of your WiFi Router (Gateway)
IPAddress subnet(255, 255, 255, 0);  //Subnet mask
IPAddress dns(10,49,13,1);  //DNS

// --------------------- MQTT Settings

#define MQTT_HOST IPAddress(REDACTED)
#define MQTT_PORT 1883
#define MQTT_SECURE true
#define MQTT_USERNAME "REDACTED"
#define MQTT_PASSWORD "REDACTED"

// --------------------- Configuration --------------------- Don't change anything below this line ---------------------

char NODE_ID[16];

#define updateServer "http://REDACTED"

//Sensor input
#define SENSOR_PIN  3

//Reboot Settings
const int rebootHours = 24;
const int thresholdStoppedMinutes = 10;
long lastPulse = 0;

//Bounce definition
Bounce debouncer = Bounce(); 
int oldInput = -1;

//Variables
bool wifiConnected = false;
bool mqttConnected = false;
int mqttDisconnects = 0;
bool updateOTA = false;
bool updateOTAPLACE = false;
bool requestAvailable = false;

bool logAvailable = false;
char* logType = "";
char* logMsg = "";

bool sensorOnline = true;
unsigned long lastOnlineCheck = 0;
unsigned long lastWifiConnect = 0;

unsigned long lastHeartbeat = 0;
Ticker heartbeatTimer;

Ticker rebootTimer;
Ticker requestTimer;

String startDate = "0";

char startString[60];

int count = 0;

// --------------------- MQTT Topics Settings

char MQTT_TOPIC[20];

char MQTT_SUBSCRIBE[25];
char MQTT_SUBSCRIBE_PLACE[25];
char MQTT_TOPIC_REQ[25];

char MQTT_TOPIC_HEARTBEAT[25];
char MQTT_TOPIC_PULSE[25];
char MQTT_TOPIC_COUNT[25];
char MQTT_TOPIC_ONLINE[25];
char MQTT_TOPIC_ONLINE_WILL[35];

void mqttTopics(){
  strcpy(MQTT_TOPIC,MQTT_LOCATION);
  strcat(MQTT_TOPIC,"/");
  strcat(MQTT_TOPIC,NODE_ID);

  strcpy(MQTT_TOPIC_REQ,MQTT_TOPIC);
  strcat(MQTT_TOPIC_REQ,"/2");
  
  strcpy(MQTT_TOPIC_PULSE,MQTT_TOPIC);
  strcat(MQTT_TOPIC_PULSE,"/1/2");

  strcpy(MQTT_TOPIC_COUNT,MQTT_TOPIC);
  strcat(MQTT_TOPIC_COUNT,"/1/3");
  
  strcpy(MQTT_TOPIC_HEARTBEAT,MQTT_TOPIC);
  strcat(MQTT_TOPIC_HEARTBEAT,"/3/0");

  strcpy(MQTT_TOPIC_ONLINE,MQTT_TOPIC);
  strcat(MQTT_TOPIC_ONLINE,"/3/1");

  strcpy(MQTT_TOPIC_ONLINE_WILL, MQTT_TOPIC_ONLINE);
  strcat(MQTT_TOPIC_ONLINE_WILL, "/online");

  strcpy(MQTT_SUBSCRIBE,MQTT_LOCATION);
  strcat(MQTT_SUBSCRIBE,"/sub/");
  strcat(MQTT_SUBSCRIBE,NODE_ID);
  strcat(MQTT_SUBSCRIBE,"/#");

  strcpy(MQTT_SUBSCRIBE_PLACE,MQTT_LOCATION);
  strcat(MQTT_SUBSCRIBE_PLACE,"/sub/");
  strcat(MQTT_SUBSCRIBE_PLACE,PLACE);
  strcat(MQTT_SUBSCRIBE_PLACE,"/#");
}

AsyncMqttClient mqttClient;
Ticker mqttReconnectTimer;

ESP8266WiFiMulti WiFiMulti;
WiFiEventHandler wifiConnectHandler;
WiFiEventHandler wifiDisconnectHandler;
Ticker wifiReconnectTimer;

void connectToWifi() {
  Serial.println("Connecting to Wi-Fi...");

  if(IPStatic){
    WiFi.config(staticIP, subnet, gateway, dns);
  }
  WiFi.mode(WIFI_STA);
  
  if (WiFiMulti.run() == WL_CONNECTED) {
    Serial.println("");
    Serial.println("WiFi connected");
    Serial.println("IP address: ");
    Serial.println(WiFi.localIP());

    wifiConnected = true;
    connectToMqtt();
  }
}

void onWifiConnect(const WiFiEventStationModeGotIP& event) {
  Serial.println("Connected to Wi-Fi.");
  wifiConnected = true;
  connectToMqtt();
}

void onWifiDisconnect(const WiFiEventStationModeDisconnected& event) {
  Serial.println("Disconnected from Wi-Fi.");
  wifiConnected = false;
  mqttReconnectTimer.detach(); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
  wifiReconnectTimer.once(2, connectToWifi);
}

void connectToMqtt() {
  Serial.println("Connecting to MQTT...");
  mqttClient.connect();
}

void onMqttConnect(bool sessionPresent) {
  Serial.println("Connected to MQTT.");
  Serial.print("Session present: ");
  Serial.println(sessionPresent);
  mqttConnected = true;
  mqttDisconnects = 0;
  mqttClient.subscribe(MQTT_SUBSCRIBE, 2);
  mqttClient.subscribe(MQTT_SUBSCRIBE_PLACE, 2);
  Serial.print("Subscribed to ");
  Serial.println(MQTT_SUBSCRIBE);
  Serial.print("Subscribed to ");
  Serial.println(MQTT_SUBSCRIBE_PLACE);
  
  char topic[strlen(MQTT_TOPIC_ONLINE) + 9];
  strcpy(topic, MQTT_TOPIC_ONLINE);
  strcat(topic, "/PLACE");
  mqttClient.publish(topic, 2, false, PLACE);
  strcpy(topic, MQTT_TOPIC_ONLINE);
  strcat(topic, "/version");
  mqttClient.publish(topic, 2, false, SKETCH_VERSION);

  heartbeatTimer.attach(30, sendHeartbeat);

  strcpy(startString,SKETCH_VERSION);
  strcat(startString,"-");
  strcat(startString,PLACE);
  mqttClient.publish(MQTT_TOPIC_ONLINE, 2, true, startString);
  mqttClient.publish(MQTT_TOPIC_ONLINE_WILL, 2, true, "1");

  logType = "start";
  logMsg = startString;
  logAvailable = true;
}

void mqttDisconnectLog(char* reason){
  mqttDisconnects++;
  if(mqttDisconnects > 3 && mqttDisconnects < 25){
    logType = "error";
    logMsg = reason;
    logAvailable = true;
  }
  if(mqttDisconnects > 500){
    logType = "info";
    logMsg = "reboot due to mqtt connection problems";
    logAvailable = true;
    ESP.restart();
  }
}

void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
  Serial.println("Disconnected from MQTT.");
  
  mqttConnected = false;
  heartbeatTimer.detach();

  if (WiFi.isConnected()) {
    mqttReconnectTimer.once(2, connectToMqtt);
  }
}

void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
  Serial.println("Subscribe acknowledged.");
  Serial.print("  qos: ");
  Serial.println(qos);
}

void onMqttUnsubscribe(uint16_t packetId) {
  Serial.println("Unsubscribe acknowledged.");
}

void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
  char msg[len+1];
  strlcpy(msg, payload, len+1);
  if(strcmp(msg, (char*) "ping") == 0){
    String req = "Pong";
    Serial.println("REQ: ping");
    uint16_t pubPulse = mqttClient.publish(MQTT_TOPIC_REQ, 1, false, (char*) req.c_str());
  } else if(strcmp(msg, (char*) "version") == 0){
    Serial.println("REQ: version");
    uint16_t pubPulse = mqttClient.publish(MQTT_TOPIC_REQ, 1, false, SKETCH_VERSION);
  } else if(strcmp(msg, (char*) "PLACE") == 0){
    Serial.println("REQ: version");
    uint16_t pubPulse = mqttClient.publish(MQTT_TOPIC_REQ, 1, false, PLACE);
  } else if(strcmp(msg, (char*) "reboot") == 0){
    Serial.println("REQ: reboot");
    String req = "Reboot Sensor";
    logType = "info";
    logMsg = "reboot over MQTT";
    logAvailable = true;
    uint16_t pubPulse = mqttClient.publish(MQTT_TOPIC_REQ, 1, false, (char*) req.c_str());
    ESP.restart();
  } else if(strcmp(msg, (char*) "update") == 0){
    Serial.println("REQ: update");
    String req = "Start OTA Update";
    uint16_t pubPulse = mqttClient.publish(MQTT_TOPIC_REQ, 1, false, (char*) req.c_str());
    updateOTA = true;
  } else if(strcmp(msg, (char*) "update_PLACE") == 0){
    Serial.println("REQ: update");
    String req = "Start OTA Update PLACE";
    uint16_t pubPulse = mqttClient.publish(MQTT_TOPIC_REQ, 1, false, (char*) req.c_str());
    updateOTAPLACE = true;
    updateOTA = true;
  }
}

void onMqttPublish(uint16_t packetId) {
  
}
void ota(){
  if ((WiFiMulti.run() == WL_CONNECTED)) {
    logType = "info";
    logMsg = "OTA Update";
    logAvailable = true;
    WiFiClient client;
    char link[25 + strlen(updateServer)];
    
    strcpy(link, updateServer);
    strcat(link, "/");
    if(updateOTAPLACE){
      updateOTAPLACE = false;
      strcat(link, PLACE);
    } else {
      strcat(link, NODE_ID);
    }
    strcat(link, ".bin");
    Serial.println(link);
    
    t_httpUpdate_return response = ESPhttpUpdate.update(client, link);
    //t_httpUpdate_return response = ESPhttpUpdate.update(client, updateServer, 80, link, SKETCH_VERSION);
    //t_httpUpdate_return response = ESPhttpUpdate.update(client, updateServer, 80, link);
    String req;
    switch (response) {
      case HTTP_UPDATE_FAILED:
        Serial.printf("HTTP_UPDATE_FAILD Error (%d): %s\n", ESPhttpUpdate.getLastError(), ESPhttpUpdate.getLastErrorString().c_str());
        req = "OTA failed";
        mqttClient.publish(MQTT_TOPIC_REQ, 1, false, (char*) req.c_str());
        break;

      case HTTP_UPDATE_NO_UPDATES:
        Serial.println("HTTP_UPDATE_NO_UPDATES");
        req = "OTA no updates";
        mqttClient.publish(MQTT_TOPIC_REQ, 1, false, (char*) req.c_str());
        break;

      case HTTP_UPDATE_OK:
        Serial.println("HTTP_UPDATE_OK");
        req = "OTA done";
        mqttClient.publish(MQTT_TOPIC_REQ, 1, false, (char*) req.c_str());
        break;
      }
    }
}

void setup() {
  Serial.begin(9600);
  //Serial.setDebugOutput(true);

  for (uint8_t t = 6; t > 0; t--) {
    Serial.printf("[SETUP] WAIT %d...\n", t);
    Serial.flush();
    delay(250);
  }
  
  Serial.println();
  Serial.print("MAC: ");
  Serial.println(WiFi.macAddress());
  String mac = WiFi.macAddress();
  mac.replace(":","");
  Serial.println(mac);
  mac.toCharArray(NODE_ID,16);

  mqttTopics();

  // Setup the machine
  pinMode(SENSOR_PIN, INPUT);
  // Activate internal pull-up
  digitalWrite(SENSOR_PIN, HIGH);

  // After setting up the input, setup debouncer
  debouncer.attach(SENSOR_PIN);
  debouncer.interval(200);


  //rebootTimer.attach(60, rebootTime);
  rebootTimer.attach(60*60*rebootHours, rebootTime);
  requestTimer.attach(60*60, requestTime);
  
  wifiConnectHandler = WiFi.onStationModeGotIP(onWifiConnect);
  wifiDisconnectHandler = WiFi.onStationModeDisconnected(onWifiDisconnect);

  mqttClient.onConnect(onMqttConnect);
  mqttClient.onDisconnect(onMqttDisconnect);
  mqttClient.onSubscribe(onMqttSubscribe);
  mqttClient.onUnsubscribe(onMqttUnsubscribe);
  mqttClient.onMessage(onMqttMessage);
  mqttClient.onPublish(onMqttPublish);
  mqttClient.setServer(MQTT_HOST, MQTT_PORT);
  mqttClient.setCredentials(MQTT_USERNAME, MQTT_PASSWORD);
  mqttClient.setKeepAlive(5);
  mqttClient.setClientId(NODE_ID);
  mqttClient.setWill(MQTT_TOPIC_ONLINE_WILL, 2, true, "0", 0);

  WiFiMulti.addAP(WIFI_SSID, WIFI_PASSWORD);
  
  connectToWifi();
}


void sendLog(char* type, char* msg){
  
  HTTPClient http;  //Declare an object of class HTTPClient

  char url[100] = "http://REDACT:1880/sensor-log?";
  strcat(url,"sensor=");
  strcat(url, NODE_ID);
  strcat(url,"&PLACE=");
  strcat(url, PLACE);
  strcat(url,"&type=");
  strcat(url, type);
  strcat(url,"&version=");
  strcat(url, SKETCH_VERSION); 

  Serial.println(url);  
  http.begin(url);  //Specify request destination
  int httpCode = http.GET();                                                                  //Send the request
   
  if (httpCode > 0) { //Check the returning code
    String payload = http.getString();   //Get the request response payload
    Serial.println(payload);                     //Print the response payload
  }
   
  http.end();   //Close connection
}

void sendHeartbeat(){
  uint16_t pubHeartbeat = mqttClient.publish(MQTT_TOPIC_HEARTBEAT, 1, false, "1");
  Serial.print("Publishing heartbeat with QoS 1, packetId: ");
  Serial.print(pubHeartbeat);
}

void rebootTime(){
  rebootTimer.detach();
  unsigned long now = millis();
  if(now - lastPulse > (thresholdStoppedMinutes*60000) || lastPulse == 0){
    Serial.print("Reboot now");
    ESP.restart();
  } else {
    rebootTimer.attach(60*5, rebootTime);
  }
}

void requestTime(){
  Serial.println("Reeuest Time");
  requestTimer.detach();
  unsigned long now = millis();
  if(now - lastPulse > (thresholdStoppedMinutes*60000 || lastPulse == 0)){
    requestAvailable = true;
  } else {
    requestTimer.attach(60*5, requestTime);
  }
}

void pulse() {
  //get current state of machine
  debouncer.update();
  // Get the update value
  int input = debouncer.read();

  if (input != oldInput) {
    oldInput = input;
    if ( input == 0) {
      if (count >= 1000000){
        count = 0;
      }
      count ++;
      lastPulse = millis();
      if (mqttConnected) {
        String req = "pulse";
        uint16_t pubPulse = mqttClient.publish(MQTT_TOPIC_PULSE, 2, false, (char*) req.c_str());
        Serial.print("Publishing pulse with QoS 2, packetId: ");
        Serial.println(pubPulse);
  
        String tmp = String(count);
        uint16_t pubCount = mqttClient.publish(MQTT_TOPIC_COUNT, 1, true, (char*) tmp.c_str());
        Serial.print("Publishing count with QoS 1, packetId: ");
        Serial.println(pubCount);
      } else {
        Serial.print("Offline produced part - count: ");
        Serial.println(count);
      }
    } 
  }
}

void loop() {
  if (!wifiConnected) {
    unsigned long now = millis();
    if (now - lastWifiConnect > 2000) {
        lastWifiConnect = now;
        if(WiFiMulti.run() != WL_CONNECTED){
          Serial.println("WiFi not connected!");
        }
    }
  }
  pulse();
  if(updateOTA){
    updateOTA = false;
    ota();
  }
  if(logAvailable){
    sendLog(logType, logMsg);
    logType = "";
    logMsg = "";
    logAvailable = false;
  }
  if(requestAvailable){
    requestAvailable = false;
    char url[80] = "http://REDAC:1880/sensor-job?";
    strcat(url,"sensor=");
    strcat(url, NODE_ID);
    strcat(url,"&PLACE=");
    strcat(url, PLACE);
    strcat(url,"&version=");
    strcat(url, SKETCH_VERSION);
    HTTPClient http;  //Declare an object of class HTTPClient
    Serial.println(url);
    http.begin(url);  //Specify request destination
    int httpCode = http.GET();   
    if (httpCode > 0) { //Check the returning code
      String payload = http.getString();   //Get the request response payload
      Serial.println(payload);
      if(payload == "restart"){
        Serial.print("Reboot now");
        ESP.restart();
      } else if(payload == "update"){
        updateOTA = true;
      } else if(payload == "update_PLACE"){
        updateOTAPLACE = true;
        updateOTA = true;
      }
    }
    http.end();   //Close connection
  }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions