Návod, jak odeslat data do InfluxDB z Raspberry Pi, která jsou získána například z ESP32 a BME280.
Pro odeslání dat do InfluxDB jsou nutné následující prerekvizity:
Kód Python
Programový kód vychází z návodu Odesílání dat prostřednictvím MQTT z ESP32 a BME280 do RPi.
import paho.mqtt.client as mqtt
import time
from time import gmtime, strftime
from datetime import datetime
from influxdb_client import InfluxDBClient, Point, WritePrecision, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
MQTT_SERVER = "192.168.50.51"
MQTT_USER = '*****'
MQTT_PASSWORD = '*****'
# Configure InfluxDB connection variables
token = "*****************************"
org = "Test"
bucket = "Test"
_client = InfluxDBClient(url="http://192.168.50.51:8086/", token=token)
_write_client = _client.write_api(write_options=WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2))
#Configure the labels for the database
measurement = "rpi-bme280"
location = "GlobalWeather"
def on_connect(client, userdata, flags, rc):
global flag_connected
flag_connected = 1
client_subscriptions(client)
print("Connected to MQTT server")
def on_disconnect(client, userdata, rc):
global flag_connected
flag_connected = 0
print("Disconnected from MQTT server")
# a callback functions
def callback_esp32_getGlobalWeather(client, userdata, msg):
getData= msg.payload.decode('utf-8')
print('ESP BME280 current data: ', getData)
splitData = getData.split(";")
temp=float(splitData[0])
hum=float(splitData[1])
press=float(splitData[2])
alt=float(splitData[3])
now = datetime.now()
timestamp_aq = datetime.timestamp(now)
iso = datetime.utcnow()
_write_client.write(bucket, org, [{"measurement": measurement, "tags": {"location": location}, "fields": {"temperature": temp}, "time": iso}])
_write_client.write(bucket, org, [{"measurement":measurement, "tags": {"location": location}, "fields":{"humidity": hum}, "time": iso}])
_write_client.write(bucket, org, [{"measurement":measurement, "tags": {"location": location}, "fields":{"pressure": press}, "time": iso}])
_write_client.write(bucket, org, [{"measurement":measurement, "tags": {"location": location}, "fields":{"altitude": alt}, "time": iso}])
print("Send data to InfluxDB")
'''time.sleep(10)'''
def client_subscriptions(client):
client.subscribe("esp32/#")
def on_publish(client, userdata, mid):
print("Message published")
def main():
client = mqtt.Client("rpi_clientSub") #this should be a unique name
client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
flag_connected = 0
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.message_callback_add('esp32/sensorGW', callback_esp32_getGlobalWeather)
client.connect(MQTT_SERVER,1883)
client.on_publish = on_publish
# start a new thread
client.loop_start()
client_subscriptions(client)
print("Client setup complete.")
while True:
time.sleep(5)
if (flag_connected != 1):
print("Waiting for message ...")
try:
msg ="getGlobalWeather"
pubMsg = client.publish(
topic='rpi/getGlobalWeather',
payload=msg.encode('utf-8'),
qos=0,
)
pubMsg.wait_for_publish()
print("Message was sent: ")
print(pubMsg.is_published())
except Exception as e:
print(e)
if __name__ == '__main__':
print('MQTT to bridge')
main()
V první řadě se musí nastavit proměnné pro připojení k InfluxDB:
- token = "*****************************" - API token, který se vygeneruje v prostředí InfluxDB
- org = "Test" - Název organizace nastavení v InfluxDB
- bucket = "Test" - Název místa kam se budou ukládat data odeslaná z MQTT klienta.
- _client = InfluxDBClient(url="http://192.168.50.51:8086/", token=token) - nastavení url adresy, na které běží InfluxDB.
- measurement = "rpi-bme280" - Nastavení štítku, který bude identifikovat ukládaná data.
- location = "GlobalWeather" - Název lokality pro identifikace dat. Může se například jednat o název místnosti, kde je senzor umístěn.
Ukládání dat do InfluxDB se děje ve funkci callback_esp32_getGlobalWeather na řádcích:
_write_client.write(bucket, org, [{"measurement": measurement, "tags": {"location": location}, "fields": {"temperature": temp}, "time": iso}])
_write_client.write(bucket, org, [{"measurement":measurement, "tags": {"location": location}, "fields":{"humidity": hum}, "time": iso}])
_write_client.write(bucket, org, [{"measurement":measurement, "tags": {"location": location}, "fields":{"pressure": press}, "time": iso}])
_write_client.write(bucket, org, [{"measurement":measurement, "tags": {"location": location}, "fields":{"altitude": alt}, "time": iso}])
Data jsou odesílána ve formátu JSON.
Nastavení InfluxDB
Po přihlášení do InfluxDB se zobrazí obrazovka:
Klikněte na odkaz Load Data. Následně na Buckets a tlačítko CREATE BUCKET.
Do otevřeného okna napište název datové sady - Bucketu a klikněte na tlačítko Create.
Pro bezpečný přijem dat z konkrétního zařízení se musí vygenerovat API Token.
Klikněte na tlačítko GENERATE API TOKEN.
Z nabídky zvolte All Access API Token.
V otevřeném okně klikněte na tlačítko SAVE. Description se nemusí popisovat, ale pro lepší orientaci můžete.
Dojde k vygenerování a zobrazení API Tokenu. Před zavřením okna je nutné si tento Token zkopírovat do schránky a následně vložit do Python kódu.
Po zavření okna se API Token zobrazí v seznamu tokenů.
Pokud již odesíláme data přes MQTT například z ESP32, již nyní můžeme zobrazit data v rozhraní InfluxDB. Klikněte na Data Explorer. V této části lze složit DB dotaz pro vypsání dat do grafu. V sekci QUERY jsou jednotlivé položky pro výběr dat. Vyberte FROM a následně filtry. Výběrem filtrů se skládá SQL dotaz a vybírají se data pro zobrazení.
SQL dotaz lze upravit přímo ve SCRIPT EDITORU. Po změně SQL klikněte na SUBMIT. Tím dojde k vyfiltrování dat.
Data lze zobrazit v různých typech grafů nebo v tabulce.
V seznamu zvolte požadovaný typ zobrazení.