Uložení dat z MQTT z Raspberry Pi do InfluxDB 2

04.12.2023 Raspberry Pi #influxdb #raspberrypi #esp32 #bme280

Návod, jak odeslat data do InfluxDB z Raspberry Pi, která jsou získána například z ESP32 a BME280.


Pro odeslání dat do InfluxDB jsou nutné následující prerekvizity:

  1. Nainstalovaný systém na RaspberryPi.
  2. Nainstalovaný InfluxDB v2.

Kód Python

Programový kód vychází z návodu Odesílání dat prostřednictvím MQTT z ESP32 a BME280 do RPi.

import paho.mqtt.client as mqtt
import time

from time import gmtime, strftime
from datetime import datetime
from influxdb_client import InfluxDBClient, Point, WritePrecision, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS

MQTT_SERVER = "192.168.50.51"
MQTT_USER = '*****'
MQTT_PASSWORD = '*****'


# Configure InfluxDB connection variables
token = "*****************************"
org = "Test"
bucket = "Test"
_client = InfluxDBClient(url="http://192.168.50.51:8086/", token=token)
_write_client = _client.write_api(write_options=WriteOptions(batch_size=500,
                                                             flush_interval=10_000,
                                                             jitter_interval=2_000,
                                                             retry_interval=5_000,
                                                             max_retries=5,
                                                             max_retry_delay=30_000,
                                                             exponential_base=2))

#Configure the labels for the database
measurement = "rpi-bme280"
location = "GlobalWeather"

def on_connect(client, userdata, flags, rc):
   global flag_connected
   flag_connected = 1
   client_subscriptions(client)
   print("Connected to MQTT server")

def on_disconnect(client, userdata, rc):
   global flag_connected
   flag_connected = 0
   print("Disconnected from MQTT server")
   
# a callback functions 
def callback_esp32_getGlobalWeather(client, userdata, msg):
    getData= msg.payload.decode('utf-8')
    print('ESP BME280 current data: ', getData)
    splitData = getData.split(";")
    temp=float(splitData[0])
    hum=float(splitData[1])
    press=float(splitData[2])
    alt=float(splitData[3])

    now = datetime.now()
    timestamp_aq = datetime.timestamp(now)
    iso = datetime.utcnow()

    _write_client.write(bucket, org, [{"measurement": measurement, "tags": {"location": location}, "fields": {"temperature": temp}, "time": iso}])
    _write_client.write(bucket, org, [{"measurement":measurement, "tags": {"location": location}, "fields":{"humidity": hum}, "time": iso}])
    _write_client.write(bucket, org, [{"measurement":measurement, "tags": {"location": location}, "fields":{"pressure": press}, "time": iso}])
    _write_client.write(bucket, org, [{"measurement":measurement, "tags": {"location": location}, "fields":{"altitude": alt}, "time": iso}])
    print("Send data to InfluxDB")
    '''time.sleep(10)'''

def client_subscriptions(client):
    client.subscribe("esp32/#")

def on_publish(client, userdata, mid):
    print("Message published")

def main():
    client = mqtt.Client("rpi_clientSub") #this should be a unique name
    client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
    flag_connected = 0

    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.message_callback_add('esp32/sensorGW', callback_esp32_getGlobalWeather)
    client.connect(MQTT_SERVER,1883)
    client.on_publish = on_publish
    # start a new thread
    client.loop_start()
    client_subscriptions(client)
    print("Client setup complete.")

    while True:
        time.sleep(5)
        if (flag_connected != 1):
            print("Waiting for message ...")
            try:
                msg ="getGlobalWeather"
                pubMsg = client.publish(
                    topic='rpi/getGlobalWeather',
                    payload=msg.encode('utf-8'),
                    qos=0,
                )
                pubMsg.wait_for_publish()
                print("Message was sent: ")
                print(pubMsg.is_published())
        
            except Exception as e:
                print(e)


if __name__ == '__main__':
    print('MQTT to bridge')
    main()

V první řadě se musí nastavit proměnné pro připojení k InfluxDB:

  • token = "*****************************" - API token, který se vygeneruje v prostředí InfluxDB
  • org = "Test" - Název organizace nastavení v InfluxDB
  • bucket = "Test" - Název místa kam se budou ukládat data odeslaná z MQTT klienta.
  • _client = InfluxDBClient(url="http://192.168.50.51:8086/", token=token) - nastavení url adresy, na které běží InfluxDB.
  • measurement = "rpi-bme280" - Nastavení štítku, který bude identifikovat ukládaná data. 
  • location = "GlobalWeather" - Název lokality pro identifikace dat. Může se například jednat o název místnosti, kde je senzor umístěn.

Ukládání dat do InfluxDB se děje ve funkci callback_esp32_getGlobalWeather na řádcích:

_write_client.write(bucket, org, [{"measurement": measurement, "tags": {"location": location}, "fields": {"temperature": temp}, "time": iso}])
_write_client.write(bucket, org, [{"measurement":measurement, "tags": {"location": location}, "fields":{"humidity": hum}, "time": iso}])
_write_client.write(bucket, org, [{"measurement":measurement, "tags": {"location": location}, "fields":{"pressure": press}, "time": iso}])
_write_client.write(bucket, org, [{"measurement":measurement, "tags": {"location": location}, "fields":{"altitude": alt}, "time": iso}])

Data jsou odesílána ve formátu JSON.

Nastavení InfluxDB

Po přihlášení do InfluxDB se zobrazí obrazovka:

Klikněte na odkaz Load Data. Následně na Buckets a tlačítko CREATE BUCKET.

Do otevřeného okna napište název datové sady - Bucketu a klikněte na tlačítko Create.

Pro bezpečný přijem dat z konkrétního zařízení se musí vygenerovat API Token. 

Klikněte na tlačítko GENERATE API TOKEN.

Z nabídky zvolte All Access API Token.

V otevřeném okně klikněte na tlačítko SAVE. Description se nemusí popisovat, ale pro lepší orientaci můžete.

Dojde k vygenerování a zobrazení API Tokenu. Před zavřením okna je nutné si tento Token zkopírovat do schránky a následně vložit do Python kódu.

Po zavření okna se API Token zobrazí v seznamu tokenů.

Pokud již odesíláme data přes MQTT například z ESP32, již nyní můžeme zobrazit data v rozhraní InfluxDB. Klikněte na Data Explorer. V této části lze složit DB dotaz pro vypsání dat do grafu. V sekci QUERY jsou jednotlivé položky pro výběr dat. Vyberte FROM a následně filtry. Výběrem filtrů se skládá SQL dotaz a vybírají se data pro zobrazení.

SQL dotaz lze upravit přímo ve SCRIPT EDITORU. Po změně SQL klikněte na SUBMIT. Tím dojde k vyfiltrování dat.

Data lze zobrazit v různých typech grafů nebo v tabulce.

V seznamu zvolte požadovaný typ zobrazení.