Odesílání dat s využitím MQTT na vyžádání z ESP32 do Raspberry Pi

04.12.2023 Arduino #mqtt #esp32 #bme280 #raspberrypi

Ukázka, jak odesílat data z MCU ESP32 se senzorem BME280 do Raspberry Pi na vyžádání.


Odesílání dat z MCU s využitím MQTT je založeno na základním zapojení ESP32 se senzorem teploty, tlaku, výšky a vlhkosti BME280, které je uvedeno v předchozím návodu. Pro úspěšné odesílání dat s využitím protokolu MQTT jsou nutné prerekvizity:

  1. Něco si načíst o MQTT.
  2. Mít nainstalované a zprovozněné Raspberry Pi.
  3. Nainstalovaný MQTT na Raspberry Pi.
  4. Výchozí příklad odesílání dat z ESP32 do RPi.

Motivace

Za normálního stavu se data odesílají kontinuálně z ESP32 na určený MQTT topic. Raspberry Pi naslouchá na tomto topicu a přijímá zprávy odeslané z ESP32. Jak nastavit frekvenci odesílání dat? V ESP32 se dá nastavit prodleva pomocí funkce delay(). Co, když máme několik MCU. To se musí nastavit frekvence odesílání dat pro každé zvlášť? Asi by se dalo z centrálního počítače odesílat na každé MCU hodnota pro delay(), která by se mohla uložit do SPIFFS. Co kdyby se frekvence odečítání dat řídila přímo z řidícího počítače - Raspberry Pi. Tzn., že RPi by odesílalo zprávu, kterou by přijmulo MCU a na základě správnosti zprávy by se odeslali dat z MCU na určený topic.

mqqt03 

Kód Python

import paho.mqtt.client as mqtt
import time

from time import gmtime, strftime
from datetime import datetime

def on_connect(client, userdata, flags, rc):
   global flag_connected
   flag_connected = 1
   client_subscriptions(client)
   print("Connected to MQTT server")

def on_disconnect(client, userdata, rc):
   global flag_connected
   flag_connected = 0
   print("Disconnected from MQTT server")
   
# a callback functions 
def callback_esp32_getGlobalWeather(client, userdata, msg):
    getData= msg.payload.decode('utf-8')
    print('ESP BME280 current data: ', getData)

def client_subscriptions(client):
    client.subscribe("esp32/#")

def on_publish(client, userdata, mid):
    print("Message published")

def main():
    client = mqtt.Client("rpi_clientSub") #this should be a unique name
    client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
    flag_connected = 0

    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.message_callback_add('esp32/sensorGW', callback_esp32_getGlobalWeather)
    client.connect(MQTT_SERVER,1883)
    client.on_publish = on_publish
    # start a new thread
    client.loop_start()
    client_subscriptions(client)
    print("Client setup complete.")

    while True:
        time.sleep(5)
        if (flag_connected != 1):
            print("Waiting for message ...")
            try:
                msg ="getGlobalWeather"
                pubMsg = client.publish(
                    topic='rpi/getGlobalWeather',
                    payload=msg.encode('utf-8'),
                    qos=0,
                )
                pubMsg.wait_for_publish()
                print("Message was sent: ")
                print(pubMsg.is_published())
        
            except Exception as e:
                print(e)


if __name__ == '__main__':
    print('MQTT to bridge')
    main()

V cyklu while se odesílá na topic rpi/getGlobalWeather zpráva v proměnné msg. Kód v cyklu se vykonává každých 5 vteřin - time.sleep(5).

Kód ESP32

#include <WiFi.h>
#include <PubSubClient.h>
#include <Wire.h>
#include <Adafruit_Sensor.h>
#include <Adafruit_BME280.h>
#include <WiFi.h>
#include <ESPAsyncWebServer.h>
#include <ESP32AnalogRead.h>
#include <math.h>

#include <SPIFFS.h>
#include <ArduinoJson.h>

#define SEALEVELPRESSURE_HPA (1013.25)
#define CORRECT_TEMP (0.40)

// WiFi
const char* ssid = "XXX";                 // Your personal network SSID
const char* wifi_password = "XXXXXXXXX";     // Your personal network password

// MQTT
const char* mqtt_server = "192.168.50.51";  // IP of the MQTT broker (RPi)
const int mqtt_server_port = 1883;
const char* mqtt_username = "XXX";       // MQTT username
const char* mqtt_password = "XXXXXXXXX";  // MQTT password
const char* clientID = "data_weatherviot";          // MQTT client ID
const char* topicSubscription = "rpi/getGlobalWeather";
const char* topicPublish = "esp32/sensorGW";
const char* msgSubscription = "getGlobalWeather";
long lastMsg=0;

WiFiClient wifiClient;
PubSubClient client(mqtt_server, mqtt_server_port, wifiClient); 
AsyncWebServer server(80);

Adafruit_BME280 bme;  // I2C

ESP32AnalogRead adc;

String getData(){

  String temp=String(bme.readTemperature()-CORRECT_TEMP);
  String hum=String(bme.readHumidity());
  String press=String(bme.readPressure()/100.0F);
  String alt=String(bme.readAltitude(SEALEVELPRESSURE_HPA));

  String actGlobalWeather = String(temp) + ";" + String(hum) + ";" + String(press) + ";" + String(alt) + "";
 
  return actGlobalWeather;
}

void callback(char* topic, byte* message, unsigned int length) {
  Serial.print("Message arrived on topic: ");
  Serial.print(topic);
  Serial.print(". Message: ");
  String messageTemp;
  
  for (int i = 0; i < length; i++) {
    Serial.print((char)message[i]);
    messageTemp += (char)message[i];
  }
  Serial.println();

  // Check if a message is received on the topic "rpi/broadcast"
  if (String(topic) == topicSubscription) {
      if(messageTemp == msgSubscription){
        Serial.println("Action: send weather data to RPi.");
        String sendData=getData();
        
        Serial.println("Temperature; Humidity; Preassure; Altitude");
        Serial.println("Measure data: " + sendData);
        
        client.publish(topicPublish,  sendData.c_str());
      }
  }

}

void connect_mqttServer() {
  client.connect(clientID, mqtt_username, mqtt_password);
  client.subscribe(topicSubscription);
  Serial.println("Set subscription OK ...");
  Serial.println(".Client connected:" + client.connected());
  // Loop until we're reconnected
  while (!client.connected()) {
        //now attemt to connect to MQTT server
        Serial.print("Attempting MQTT connection...");
        // Attempt to connect
        if (client.connect(clientID, mqtt_username, mqtt_password)) { // Change the name of client here if multiple ESP32 are connected
          //attempt successful
          Serial.println("connected");
          // Subscribe to topics here
          client.subscribe(topicSubscription);
          
        } 
        else {
          //attempt not successful
          Serial.print("failed, rc=");
          Serial.print(client.state());
          Serial.println(" trying again in 2 seconds");
    
          //blink_led(3,200); //blink LED three times (200ms on duration) to show that MQTT server connection attempt failed
          // Wait 2 seconds before retrying
          delay(2000);
        }
  }
  
}

void setup() {
  Serial.begin(9600);
  bool status = bme.begin(0x77);  // OR 0x76 for left pad on board
  if (!status) {
    Serial.println("Could not find a valid BME280 sensor, check wiring or change I2C address!");
    while (1);
  }

  // WiFi
  Serial.print("Connecting to ");
  Serial.println(ssid);

  // Connect to the WiFi
  WiFi.begin(ssid, wifi_password);

  // Wait until the connection has been confirmed before continuing
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }

  // Debugging - Output the IP Address of the ESP
  Serial.println("WiFi connected");
  Serial.print("IP address: ");
  Serial.println(WiFi.localIP());
  
  client.setServer(mqtt_server, mqtt_server_port);
  client.setCallback(callback);
}

void loop() {
   
  if (!client.connected()) {
    connect_mqttServer();
  }
  client.loop();

  long now = millis();
  
  if (now - lastMsg > 4000) {
    lastMsg = now;
  }

}

Ve funkci callback se skládá zpráva odeslaná z RPi a testuje se její znění. Pokud se shoduje, tak dojde k odeslání dat do RPi.

Pokud bychom chtěli odesílat data kontinuálně, stačí odkomentovat ve smyčce loop řádek client.publish(topicPublish, getData().c_str()); //topic name (to which this ESP32 publishes its data)