air-Q-pro Messdaten am LCD1602 Display visualisieren
Quellcode
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Titel: Luftqualität live: air-Q pro Daten via REST abrufen, entschlüsseln und weiterverarbeiten
Autor: Stefan Draeger
Blogartikel: https://draeger-it.blog/luftqualitaet-live-air-q-pro-daten-via-rest-abrufen-und-anzeigen/
Beschreibung:
Dieses Skript lädt Sensordaten vom air-Q pro, entschlüsselt sie (AES-256-CBC) und liefert ein
klar strukturiertes Objekt (AirQMessung) zurück. Die Anzeige/Weiterverarbeitung (CSV, LCD)
ist bewusst entkoppelt, damit du das Objekt flexibel verwenden kannst.
"""
from __future__ import annotations
import argparse
import base64
import csv
import http.client
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional
import urllib.request
from typing import Tuple
from Crypto.Cipher import AES # pycryptodome
# --------------------------------------------------------------------
# Konfiguration (Defaults für CLI; produktiv per .env/Umgebungsvariablen)
# --------------------------------------------------------------------
STANDARD_AIRQ_IP = "192.168.178.108"
STANDARD_AIRQ_PASSWORT = "draeger1980"
AIRQ_HTTP_PORT = 80
AIRQ_HTTP_PFAD = "/data"
AIRQ_HTTP_TIMEOUT_S = 8
AIRQ_HTTP_RETRIES = 3
AES_SCHLUESSEL_LAENGE = 32
AES_BLOCKGROESSE = 16
# --------------------------------------------------------------------
# Domänenmodell
# --------------------------------------------------------------------
@dataclass(frozen=True)
class AirQMessung:
"""
Repräsentiert eine Messung des air-Q.
- zeitpunkt_local / zeitpunkt_utc: Zeitpunkt (aus 'timestamp' in ms)
- device_id, uptime_s: Gerätemetadaten
- rohwerte: Original-JSON (entschlüsselt)
- werte: deutsch beschriftete, einheitenklare Basiswerte (für CSV/LCD)
- status: Warmup-/Hinweistexte je Sensor
"""
zeitpunkt_local: datetime
zeitpunkt_utc: datetime
device_id: Optional
uptime_s: Optional
rohwerte: Dict
werte: Dict
status: Dict
# --------------------------------------------------------------------
# Datenabruf & Entschlüsselung
# --------------------------------------------------------------------
def _entferne_pkcs7_padding(daten: bytes) -> bytes:
"""Entfernt PKCS#7-Padding sicher im Byte-Kontext."""
return daten]
def _erzeuge_aes_schluessel(passwort: str) -> bytes:
"""Erzeugt einen 32-Byte-AES-Schlüssel aus dem Passwort (auffüllen/abschneiden)."""
key = passwort.encode("utf-8")
return key + b"0" * (AES_SCHLUESSEL_LAENGE - len(key)) if len(key) Dict:
"""Entschlüsselt den base64-kodierten 'content' des air-Q und liefert das JSON als Dict."""
verschluesselt = base64.b64decode(base64_text)
iv, payload = verschluesselt, verschluesselt
cipher = AES.new(key=_erzeuge_aes_schluessel(passwort), mode=AES.MODE_CBC, IV=iv)
entschluesselt = cipher.decrypt(payload)
json_text = _entferne_pkcs7_padding(entschluesselt).decode("utf-8")
return json.loads(json_text)
def _hole_rohdaten(ip_adresse: str) -> Dict:
"""
Holt das äußere JSON (mit 'content') fest über http:///data (Port 80).
Baut Retries und aussagekräftige Fehler ein.
"""
last_err: Optional = None
for attempt in range(1, AIRQ_HTTP_RETRIES + 1):
conn = http.client.HTTPConnection(ip_adresse, port=AIRQ_HTTP_PORT, timeout=AIRQ_HTTP_TIMEOUT_S)
try:
conn.request("GET", AIRQ_HTTP_PFAD)
resp = conn.getresponse()
body = resp.read()
if resp.status == 200:
return json.loads(body)
raise RuntimeError(f"HTTP {resp.status} {resp.reason} auf {AIRQ_HTTP_PFAD}")
except (socket.timeout, TimeoutError, OSError, RuntimeError, json.JSONDecodeError) as e:
last_err = e
# kleiner Backoff zwischen den Versuchen
time.sleep(0.5 * attempt)
finally:
try:
conn.close()
except Exception:
pass
raise ConnectionError(
f"air-Q unter http://{ip_adresse}{AIRQ_HTTP_PFAD} nicht erreichbar (Port 80). "
f"Letzter Fehler: {last_err}"
)
def hole_airq_objekt(ip_adresse: str, passwort: str) -> Dict:
"""
EINSTIEGSPUNKT für Datenzugriff:
- Holt die Rohantwort
- Entschlüsselt 'content'
- Gibt das entschlüsselte JSON (als Dict) zurück
"""
rohdaten = _hole_rohdaten(ip_adresse)
return _entschluessle_content(rohdaten, passwort)
# --------------------------------------------------------------------
# Normierung & Beschriftung
# --------------------------------------------------------------------
# Mapping: JSON-Key -> (deutscher Name, Einheit)
MAPPING: Dict]] = {
"pm1": ("PM1.0", "µg/m³"),
"pm2_5": ("PM2.5", "µg/m³"),
"pm10": ("PM10", "µg/m³"),
"TypPS": ("Ø Partikelgröße", "µm"),
"dewpt": ("Taupunkt", "°C"),
"temperature": ("Temperatur", "°C"),
"humidity": ("Relative Luftfeuchte", "%"),
"humidity_abs":("Absolute Luftfeuchte", "g/m³"),
"pressure": ("Luftdruck", "hPa"),
"sound": ("Lärmpegel", "dB(A)"),
"sound_max": ("Lärmpegel max (2 min)", "dB(A)"),
"tvoc": ("TVOC (flüchtige organische Verbindungen)", "ppb"),
"co": ("CO (Kohlenmonoxid)", "ppm"), # korrigiert
"co2": ("CO₂ (Kohlendioxid)", "ppm"), # korrigiert
"health": ("Gesundheitsindex", None),
"performance": ("Leistungsindex", None),
"dHdt": ("Δ absolute LF", "mg/m³/s"),
"dCO2dt": ("Δ CO₂", "ppb/s"),
"uptime": ("Betriebszeit", "s"),
"measuretime": ("Messdauer letzter Durchlauf", "ms"),
"DeviceID": ("Geräte-ID", None),
"timestamp": ("Zeitstempel (ms)", "ms"),
}
def _wert_aus_array_oder_skalar(v: Any) -> Tuple]:
"""
air-Q liefert häufig .
Gibt (basiswert, unsicherheit) zurück.
"""
if isinstance(v, (list, tuple)) and len(v) >= 1:
basis = v
unsicherheit = v if len(v) > 1 else None
return basis, unsicherheit
return v, None
def _normalisiere_status(raw) -> Dict:
"""
Wandelt das 'Status'-Feld in ein Dict um.
- Dict: Schlüssel/Werte als Strings
- String/Number/sonstiges: {"status": ""}
- Liste/Tuple: {"status_0": "...", "status_1": "...", ...}
- None: {}
"""
if raw is None:
return {}
if isinstance(raw, dict):
return {str(k): str(v) for k, v in raw.items()}
if isinstance(raw, (list, tuple)):
return {f"status_{i}": str(v) for i, v in enumerate(raw)}
# alles andere (z. B. "OK")
return {"status": str(raw)}
def normiere_messung(entschluesselt: Dict) -> AirQMessung:
"""
Wandelt das entschlüsselte Dict in eine AirQMessung:
- Zeit aus 'timestamp' (ms) → UTC + lokale Zeit
- deutsch beschriftete, einheitenklare Basiswerte
- Statusmeldungen (Warmup etc.) unter 'Status'
"""
# Zeitstempel (ms) → UTC/local
ts_ms = entschluesselt.get("timestamp")
if isinstance(ts_ms, (int, float)):
ts_utc = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
else:
ts_utc = datetime.now(tz=timezone.utc)
ts_local = ts_utc.astimezone()
# Status-Hinweise
status_norm = _normalisiere_status(entschluesselt.get("Status"))
# Beschriftete Werte
werte: Dict = {}
for key, (name_de, einheit) in MAPPING.items():
if key in entschluesselt:
basiswert, _unsicherheit = _wert_aus_array_oder_skalar(entschluesselt)
label = f"{name_de} ({einheit})" if einheit else name_de
werte = basiswert
return AirQMessung(
zeitpunkt_local=ts_local,
zeitpunkt_utc=ts_utc,
device_id=str(entschluesselt.get("DeviceID")) if entschluesselt.get("DeviceID") else None,
uptime_s=int(entschluesselt.get("uptime")) if entschluesselt.get("uptime") is not None else None,
rohwerte=entschluesselt,
werte=werte,
status=status_norm,
)
# --------------------------------------------------------------------
# Weiterverarbeitung (CSV / LCD-Stub) – optional
# --------------------------------------------------------------------
def schreibe_csv_zeile(datei: Path, messung: AirQMessung, erstelle_header: bool = True) -> None:
"""
Schreibt eine Zeile in eine CSV-Datei (Delimiter ';').
- Erstellt optional beim ersten Mal einen Header.
- 'zeitpunkt' wird als ISO-String vorangestellt.
"""
datei.parent.mkdir(parents=True, exist_ok=True)
spalten = + sorted(messung.werte.keys())
schreibe_header = (not datei.exists()) and erstelle_header
with datei.open("a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=spalten, delimiter=";")
if schreibe_header:
writer.writeheader()
zeile = {"zeitpunkt": messung.zeitpunkt_local.isoformat(timespec="seconds")}
zeile.update({k: messung.werte.get(k, "") for k in spalten if k != "zeitpunkt"})
writer.writerow(zeile)
# ------------------------------------------------------------
# 1) Anzeigezeilen aus Messung erzeugen
# ------------------------------------------------------------
def formatiere_lcd_zeilen(messung: AirQMessung) -> Tuple:
"""
Baut zwei Textzeilen für ein 16x2 LCD.
Da dein ESP 'scroll' unterstützt, dürfen die Zeilen länger sein;
wir halten sie trotzdem kompakt & gut lesbar.
"""
# Werte robust lesen (falls mal etwas fehlt)
co2 = messung.werte.get("CO₂ (Kohlendioxid) (ppm)")
tvoc = messung.werte.get("TVOC (flüchtige organische Verbindungen) (ppb)")
temp = messung.werte.get("Temperatur (°C)")
rh = messung.werte.get("Relative Luftfeuchte (%)")
# Schonend runden/formatieren
def fmt(v, nd=0):
if v is None or v == "-":
return "-"
try:
return f"{round(float(v), nd)}"
except Exception:
return str(v)
co2_s = fmt(co2, 0)
tvoc_s = fmt(tvoc, 0)
temp_s = fmt(temp, 1)
rh_s = fmt(rh, 1)
# Zeile 1 & 2 kompakt – darf länger sein (scroll)
zeile1 = f"CO2:{co2_s}ppm TVOC:{tvoc_s}ppb"
zeile2 = f"T:{temp_s}C rH:{rh_s}%"
return zeile1, zeile2
# ------------------------------------------------------------
# 2) HTTP-POST an den ESP-Endpoint senden
# ------------------------------------------------------------
def sende_an_lcd_http(lcd_url: str, zeile1: str, zeile2: str,
modus: str = "scroll", anzeigedauer: int = 15,
timeout_s: int = 5) -> None:
"""
Sendet die gewünschten LCD-Zeilen an den ESP (JSON-POST).
Erwartet einen Endpoint wie: http://192.168.178.96/anzeige
"""
payload = {
"zeile1": zeile1,
"zeile2": zeile2,
"modus": modus, # z.B. "scroll", "static", ...
"anzeigedauer": anzeigedauer # Sekunden
}
daten = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url=lcd_url,
data=daten,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
# Antwort bei Bedarf auswerten:
_ = resp.read()
except HTTPError as e:
# Hier gerne auf logging umstellen
print(f"HTTP-Fehler beim Senden an LCD ({e.code}): {e.reason}")
except URLError as e:
print(f"Verbindungsfehler zum LCD: {e.reason}")
except Exception as e:
print(f"Unerwarteter Fehler beim LCD-POST: {e}")
# ------------------------------------------------------------
# 3) Öffentliche Anzeige-Funktion (ersetzt den Stub)
# ------------------------------------------------------------
def zeige_auf_lcd_http(messung: AirQMessung, lcd_url: str,
modus: str = "scroll", anzeigedauer: int = 15) -> None:
"""
Erzeugt zwei Anzeigezeilen aus der Messung und sendet sie per HTTP an den ESP-LCD-Service.
Read the full article