Skill Overview
Start with fit, limitations, and setup before diving into the repository.
Scenario recommande : Ideal for AI agents that need mqtt development skill. Resume localise : Fully Modular Sensor Network # MQTT Development Skill Architektur: Server-Centric. This AI agent skill supports Claude Code, Cursor, and Windsurf workflows.
Pourquoi utiliser cette compétence
Recommandation : mqtt-development helps agents mqtt development skill. Fully Modular Sensor Network # MQTT Development Skill Architektur: Server-Centric. This AI agent skill supports Claude Code, Cursor, and Windsurf
Meilleur pour
Scenario recommande : Ideal for AI agents that need mqtt development skill.
↓ Cas d'utilisation exploitables for mqtt-development
! Sécurité et Limitations
- Limitation : Requires repository-specific context from the skill documentation
- Limitation : Works best when the underlying tools and dependencies are already configured
About The Source
The section below comes from the upstream repository. Use it as supporting material alongside the fit, use-case, and installation summary on this page.
Browser Sandbox Environment
⚡️ Ready to unleash?
Experience this Agent in a zero-setup browser environment powered by WebContainers. No installation required.
FAQ et étapes d’installation
These questions and steps mirror the structured data on this page for better search understanding.
? Questions fréquentes
Qu’est-ce que mqtt-development ?
Scenario recommande : Ideal for AI agents that need mqtt development skill. Resume localise : Fully Modular Sensor Network # MQTT Development Skill Architektur: Server-Centric. This AI agent skill supports Claude Code, Cursor, and Windsurf workflows.
Comment installer mqtt-development ?
Exécutez la commande : npx killer-skills add Auto-one-Family/Automation-One. Elle fonctionne avec Cursor, Windsurf, VS Code, Claude Code et plus de 19 autres IDE.
Quels sont les cas d’usage de mqtt-development ?
Les principaux cas d’usage incluent : Cas d'usage : Applying MQTT Development Skill, Cas d'usage : Applying Architektur: Server-Centric. ESP32 = dumme Agenten. ALLE Logik auf Server, Cas d'usage : Applying Protokoll: MQTT 3.1.1 via Mosquitto Broker (Docker).
Quels IDE sont compatibles avec mqtt-development ?
Cette skill est compatible avec Cursor, Windsurf, VS Code, Trae, Claude Code, OpenClaw, Aider, Codex, OpenCode, Goose, Cline, Roo Code, Kiro, Augment Code, Continue, GitHub Copilot, Sourcegraph Cody, and Amazon Q Developer. Utilisez la CLI Killer-Skills pour une installation unifiée.
Y a-t-il des limites pour mqtt-development ?
Limitation : Requires repository-specific context from the skill documentation. Limitation : Works best when the underlying tools and dependencies are already configured.
↓ Comment installer ce skill
-
1. Ouvrir le terminal
Ouvrez le terminal ou la ligne de commande dans le dossier du projet.
-
2. Lancer la commande d’installation
Exécutez : npx killer-skills add Auto-one-Family/Automation-One. La CLI détectera automatiquement votre IDE ou votre agent et configurera la skill.
-
3. Commencer à utiliser le skill
Le skill est maintenant actif. Votre agent IA peut utiliser mqtt-development immédiatement dans le projet.
! Source Notes
This page is still useful for installation and source reference. Before using it, compare the fit, limitations, and upstream repository notes above.
Upstream Repository Material
The section below comes from the upstream repository. Use it as supporting material alongside the fit, use-case, and installation summary on this page.
mqtt-development
Fully Modular Sensor Network # MQTT Development Skill Architektur: Server-Centric. This AI agent skill supports Claude Code, Cursor, and Windsurf workflows.
MQTT Development Skill
Architektur: Server-Centric. ESP32 = dumme Agenten. ALLE Logik auf Server. Protokoll: MQTT 3.1.1 via Mosquitto Broker (Docker) Topic-Schema:
kaiser/{kaiser_id}/esp/{esp_id}/{category}/{gpio}/{action}
Evidence-Tabelle (Repo-Ist)
Vor größeren MQTT-Änderungen: Pfade verifizieren; keine Topic-Namen erfinden.
| Schicht | Pfad (relativ Repo-Wurzel) | Rolle (1 Satz) | Skill deckt heute? |
|---|---|---|---|
Broker-Compose / mosquitto.conf | docker-compose.yml (Service mqtt-broker); docker/mosquitto/mosquitto.conf | Compose startet Mosquitto und mountet mosquitto.conf als Broker-Konfiguration (Listener, Limits, Dev-allow_anonymous). | ja |
| Server MQTT-Einstieg | El Servador/god_kaiser_server/src/mqtt/client.py; subscriber.py; publisher.py; topics.py | paho-Client, Topic-Build, Subscription-Routing zu Handlern, ausgehendes Publish inkl. Retry/Resilience-Anbindung. | ja |
| Handler-Ordner | El Servador/god_kaiser_server/src/mqtt/handlers/ | Eingehende Messages pro Wildcard-Pattern: Persistenz, WebSocket, Fehlerlogging, ACK-Weiterleitung an Brücke. | ja |
| Firmware MQTT-Client | El Trabajante/src/services/communication/mqtt_client.cpp; mqtt_client.h; El Trabajante/src/utils/topic_builder.cpp; topic_builder.h | ESP-seitig Publish/Subscribe, LWT, Circuit Breaker, Offline-Buffer, optional PubSubClient-Pfad (MQTT_USE_PUBSUBCLIENT). | ja |
| Offline / Safety / Circuit Breaker / Bridge | El Servador/god_kaiser_server/src/mqtt/offline_buffer.py; El Servador/god_kaiser_server/src/services/mqtt_command_bridge.py; El Trabajante/src/services/safety/offline_mode_manager.cpp; El Trabajante/src/services/safety/offline_mode_manager.h | Server puffert MQTT bei Broker-Ausfall; MQTTCommandBridge korreliert Zone/Subzone-ACKs per correlation_id; Firmware: Grace-Timer → OFFLINE_ACTIVE und lokale Regeln. | ja |
| Tests / Wokwi MQTT | El Servador/god_kaiser_server/tests/integration/test_heartbeat_handler.py; tests/integration/test_zone_bridge.py; tests/unit/test_mqtt_command_bridge.py; tests/esp32/test_mqtt_last_will.py; tests/esp32/test_mqtt_fallback.py; tests/esp32/test_communication.py; El Trabajante/test/_archive/comm_mqtt_client.cpp | Pytest- und ESP32-Testartefakte als Contract-Referenz für Handler, LWT, Bridge, Transport-Kanten. | teils |
IST-Topic- und QoS-Verankerung: Vollständige Tabellen und Begründungen in Abschnitt 1–2; maschinenlesbare SSOT in .claude/reference/api/MQTT_TOPICS.md; Builder-Strings in topic_builder.cpp und Platzhalter in El Servador/god_kaiser_server/src/core/constants.py. Beispiel-Flows im Repo dokumentiert: Heartbeat (system/heartbeat / ack, QoS 0), Aktor-Befehl (actuator/.../command, QoS 2 + optional correlation_id im Publisher), LWT (system/will, QoS 1, Broker → lwt_handler), Zone/Subzone-ACK mit Echo correlation_id für MQTTCommandBridge.resolve_ack (siehe MQTT_TOPICS.md und mqtt_command_bridge.py).
Professionelle Betriebsstandards (Kurz)
- Broker-Härtung: In Produktion keine anonymen Clients; TLS, Authentifizierung und ACLs aktivieren. Im Repo ist
docker/mosquitto/mosquitto.confdie Konfigurations-SSOT (Compose mountet sie nach/mosquitto/config/mosquitto.conf); Dev-allow_anonymous trueist bewusst markiert — für Prod siehe.claude/reference/security/PRODUCTION_CHECKLIST.md. - Hohe Verfügbarkeit: Eclipse Mosquitto bietet kein natives Clustering wie ein DB-Cluster; HA ist eine Betriebsentscheidung (z. B. VM/K8s, Bridge/Federation, getrennte Edge-Broker) — hier keine Architektur-Umschreibung des App-Codes vorschlagen, sondern Anforderungen an Verfügbarkeit/Recovery klar benennen.
- Koordination: Hub-and-Spoke über den Broker; geräteübergreifende Abläufe laufen über Server-Logik (REST, Regeln, MQTTCommandBridge), nicht über direkte ESP-zu-ESP-Kommunikation als Default.
- Last und Transport: Keepalive und Broker-Timeouts (
exceeded timeouto. ä.) als Symptom für Blockaden oder Netzprobleme werten; Bursts (z. B. viele parallelemeasure/Kalibrier-Kommandos) und Backpressure explizit mitdenken — Rate-Limits und UI-Disziplin mit Server/Firmware abstimmen. - Disconnect-Runbook (drei parallele Sichten): Bei Transport-Problemen Serial/Firmware-Log (
[MQTT], Circuit Breaker), Server-Log (god_kaiser, Resilience/MQTT-Logger) und Broker-Traffic (make mqtt-sub/mqtt-debug) gemeinsam auswerten — nicht nur eine Quelle.
Interne Vertiefung (eingecheckt):
.claude/reference/api/MQTT_TOPICS.md— Topics, Changelog,correlation_id-Regeln.El Servador/god_kaiser_server/docs/emergency-stop-mqtt-correlation.md— Not-Halt und MQTT-Korrelation.El Servador/god_kaiser_server/docs/finalitaet-http-mqtt-ws.md— Finalität über Schichten.El Trabajante/docs/system-flows/08-zone-assignment-flow.md— Zone-Assign inkl. Bridge-Erwartung.docs/mqtt-injection-analysis.md— Sicherheits-/Last-Sicht MQTT.
Soll-Verhalten (MQTT) — verbindlich für Agenten
- Pattern-First: Vor Änderungen mit
Grep/Globnach bestehenden Handlern,Publisher-Methoden,TopicBuilder/topic_builderund Tests suchen; gleiche Exception-/Log-Patterns und QoS wie der nächstliegende bestehende Codepfad. - Eine Hypothese pro Paket: Transport (Keepalive, TLS-URI, Broker-Timeout, TCP) vs. Nachrichtenvertrag (
correlation_id, JSON-Schema, Idempotenz) vs. Broker-Konfiguration (ACL, Listener,max_inflight_messages) getrennt diskutieren und belegen — nicht in einem Satz vermischen. - Offline-first: Neue Funktionen, die „online“ sind, kurz beschreiben: Was passiert nach 30 s Grace bzw. bei
OfflineMode::OFFLINE_ACTIVEauf der Firmware (offline_mode_manager.*)? Greifen lokale Regeln / Default-Zustände? - Keine Topic-Erfindung: Neue Segmente nur, wenn sie parallel in
topic_builder.*,constants.py/topics.py, ggf.MQTT_TOPICS.mdund Tests nachziehbar sind. - TLS-Profil: Build-Flags (
sdkconfig,platformio.ini,MQTT_*Umgebungsvariablen) und URI-Schema (mqtt://vs.mqtts://) müssen zu den beobachteten Logs passen; Dev (lokaler Broker 1883) vs. Prod (TLS, Zertifikate) explizit trennen. - Tests bevorzugen: Wo Pytest- oder Firmware-Tests existieren, Contracts dort erweitern (
tests/integration/test_*handler*.py,test_mqtt_command_bridge.py,tests/esp32/test_mqtt_*.py); reine Handtests nur ergänzend.
Kernkomponenten nicht umgehen oder duplizieren: MQTTCommandBridge (send_and_wait_ack / resolve_ack nur mit passender correlation_id), LWT (ESP-Will-Topic → lwt_handler), Circuit Breaker auf Server- und Firmware-MQTT-Client, Offline-Buffer (Server async, Firmware Ring), Registration Gate auf dem ESP vor dem regulären Publish.
Anti-Patterns (MQTT-spezifisch)
- Zweiten parallelen MQTT-Client im Server-Prozess einführen (bestehende Singleton-/Lifespan-Kette in
main.pyerweitern). - Blockierende Sensorik oder lange
delay/sleepim MQTT-Callback- oder Publish-Hotpath (Firmware: Regeln infirmware.mdc; Server: async statt blockierend). - QoS 2 pauschal „weil sicher“ — nur nutzen, wo Duplikate Schaden anrichten (siehe Abschnitt 2); sonst Last und Latenz erhöhen.
- Retained ohne Team-Policy setzen (LWT/Retained-Interaktionen und Broker-State verstehen, siehe
mqtt-debug-Skill /MQTT_TOPICS.md). - Kalibrier- oder Burst-Pfade (viele
measure-Kommandos) ohne Abstimmung mit Rate-Limits, UI-Throttling und Server-Last — Disconnects undexceeded timeoutriskieren.
0. Quick Reference - Was suche ich?
| Ich will... | Primäre Quelle | Code-Location |
|---|---|---|
| Topic hinzufügen (ESP32) | Section 3: Topics ESP32 | El Trabajante/src/utils/topic_builder.cpp |
| Topic hinzufügen (Server) | Section 4: Topics Server | El Servador/.../core/constants.py |
| Handler hinzufügen | Section 5: Handler Pattern | El Servador/.../mqtt/handlers/ |
| Publisher erweitern | Section 6: Publisher | El Servador/.../mqtt/publisher.py |
| QoS-Strategie verstehen | Section 2: QoS | Diese Datei |
| Offline-Verhalten | Section 7: Offline Buffer | mqtt_client.cpp, offline_buffer.py |
| Mosquitto Config | Section 8: Broker | docker/mosquitto/mosquitto.conf |
| Error-Codes | Section 9: Errors | error_codes.h, error_codes.py |
Ordnerstruktur
ESP32 (El Trabajante/src/)
├── utils/topic_builder.cpp ← Topic-Builder (snprintf-basiert)
├── utils/topic_builder.h ← Topic-Builder Header
├── services/communication/
│ └── mqtt_client.cpp ← MQTT Client (ESP-IDF default; PubSubClient wenn MQTT_USE_PUBSUBCLIENT=1)
│ - Circuit Breaker Pattern
│ - Offline Buffer (10 Messages)
│ - Registration Gate
│ - Last-Will (LWT)
└── models/error_codes.h ← Error-Codes 3010-3016
Server (El Servador/god_kaiser_server/src/)
├── mqtt/
│ ├── client.py ← MQTT Client (paho-mqtt)
│ ├── publisher.py ← High-Level Publisher
│ ├── subscriber.py ← Subscriber mit Handler-Registry
│ ├── offline_buffer.py ← Offline Buffer (asyncio)
│ ├── topics.py ← TopicBuilder (Python)
│ └── handlers/ ← Message-Handler (siehe Ordner; + base_handler)
│ ├── sensor_handler.py
│ ├── actuator_handler.py
│ ├── heartbeat_handler.py
│ └── ...
├── core/
│ ├── constants.py ← Topic-Konstanten
│ └── error_codes.py ← Error-Codes 5101-5107
└── main.py ← Handler-Registrierung (Zeile 201-307)
Docker
└── docker/mosquitto/mosquitto.conf ← Broker-Konfiguration
1. Topic-Architektur
Basis-Schema
kaiser/{kaiser_id}/esp/{esp_id}/{category}/{gpio}/{action}
- kaiser_id:
"god"(aktuell einziger Wert) - esp_id: ESP32 Device ID (z.B.
ESP_12AB34CD) - category:
sensor,actuator,system,config,subzone - gpio: Pin-Nummer (0-39) oder weggelassen bei System-Topics
- action:
data,command,status,response, etc.
Vollständiges Topic-Schema (26+ Topics)
| # | Topic | Richtung | QoS | ESP32 Builder | Server Konstante |
|---|---|---|---|---|---|
| 1 | sensor/{gpio}/data | ESP→Server | 1 | buildSensorDataTopic() | MQTT_TOPIC_ESP_SENSOR_DATA |
| 2 | sensor/batch | ESP→Server | 1 | buildSensorBatchTopic() | - |
| 3 | sensor/{gpio}/command | Server→ESP | 2 | buildSensorCommandTopic() | MQTT_TOPIC_ESP_SENSOR_COMMAND |
| 4 | sensor/{gpio}/response | ESP→Server | 1 | buildSensorResponseTopic() | MQTT_TOPIC_ESP_SENSOR_RESPONSE |
| 5 | actuator/{gpio}/command | Server→ESP | 2 | buildActuatorCommandTopic() | MQTT_TOPIC_ESP_ACTUATOR_COMMAND |
| 6 | actuator/{gpio}/status | ESP→Server | 1 | buildActuatorStatusTopic() | MQTT_TOPIC_ESP_ACTUATOR_STATUS |
| 7 | actuator/{gpio}/response | ESP→Server | 1 | buildActuatorResponseTopic() | - |
| 8 | actuator/{gpio}/alert | ESP→Server | 1 | buildActuatorAlertTopic() | - |
| 9 | actuator/emergency | Server→ESP | 1 | buildActuatorEmergencyTopic() | - |
| 9a | session/announce | ESP→Server | 1 | (direkt aus mqtt_client.cpp, kein TopicBuilder) | - |
| 10 | system/heartbeat | ESP→Server | 0 | buildSystemHeartbeatTopic() | MQTT_TOPIC_ESP_HEARTBEAT |
| 11 | system/heartbeat/ack | Server→ESP | 0 | buildSystemHeartbeatAckTopic() | MQTT_TOPIC_ESP_HEARTBEAT_ACK |
| 12 | system/command | Server→ESP | 2 | buildSystemCommandTopic() | MQTT_TOPIC_ESP_SYSTEM_COMMAND |
| 13 | system/diagnostics | ESP→Server | 0 | buildSystemDiagnosticsTopic() | - |
| 14 | system/error | ESP→Server | 1 | buildSystemErrorTopic() | - |
| 14a | system/queue_pressure | ESP→Server | 1 | buildQueuePressureTopic() (PKG-01a, Welle 2) | TopicBuilder.build_queue_pressure_topic (PKG-01) |
| 15 | system/intent_outcome | ESP→Server | 1 | buildIntentOutcomeTopic() | - |
| 15b | system/intent_outcome/lifecycle | ESP→Server | 1 | buildIntentOutcomeLifecycleTopic() | intent_outcome_lifecycle_handler.py |
| 16 | system/will | ESP→Server | 1 | (LWT bei connect) | - |
| 17 | config | Server→ESP | 2 | buildConfigTopic() | MQTT_TOPIC_ESP_CONFIG |
| 18 | config_response | ESP→Server | 2 | buildConfigResponseTopic() | MQTT_TOPIC_ESP_CONFIG_RESPONSE |
| 19 | subzone/assign | Server→ESP | 1 | buildSubzoneAssignTopic() | MQTT_TOPIC_SUBZONE_ASSIGN |
| 20 | subzone/remove | Server→ESP | 1 | buildSubzoneRemoveTopic() | MQTT_TOPIC_SUBZONE_REMOVE |
| 21 | subzone/ack | ESP→Server | 1 | buildSubzoneAckTopic() | MQTT_TOPIC_SUBZONE_ACK |
| 22 | subzone/status | ESP→Server | 1 | buildSubzoneStatusTopic() | MQTT_TOPIC_SUBZONE_STATUS |
| 23 | subzone/safe | Server→ESP | 1 | buildSubzoneSafeTopic() | MQTT_TOPIC_SUBZONE_SAFE |
| 24 | zone/assign | Server→ESP | 1 | buildZoneAssignTopic() | - |
| 25 | zone/ack | ESP→Server | 1 | buildZoneAckTopic() | - |
| B1 | kaiser/broadcast/emergency | Server→ALL | 2 | buildBroadcastEmergencyTopic() | - |
2. QoS-Strategie
Pattern: QoS steigt mit Kritikalität
QoS 0: Fire-and-Forget → Heartbeat, Diagnostics (kein Verlust-Risiko)
QoS 1: At Least Once → Sensor-Daten, Status (Duplikate harmlos)
QoS 2: Exactly Once → Commands (Duplikate = gefährlich!)
Vollständige QoS-Zuordnung
| QoS | Topics | Grund |
|---|---|---|
| 0 | system/heartbeat, system/heartbeat/ack, system/diagnostics | Regelmäßig, nächste Nachricht überschreibt |
| 1 | sensor/data, sensor/batch, sensor/response, actuator/status, actuator/response, actuator/alert, session/announce, system/error, system/intent_outcome, system/intent_outcome/lifecycle, system/will, alle subzone/* | Daten-Loss unerwünscht, Duplikate verarbeitbar |
| 2 | sensor/command, actuator/command, system/command, config, config_response, broadcast/emergency | Duplikate können Schaden verursachen |
Server-Konstanten (constants.py, u. a. QoS-Zeilen ~207–211)
python1QOS_SENSOR_DATA = 1 # At least once 2QOS_ACTUATOR_COMMAND = 2 # Exactly once 3QOS_SENSOR_COMMAND = 2 # Exactly once 4QOS_HEARTBEAT = 0 # At most once 5QOS_CONFIG = 2 # Exactly once
3. Topic-Builder (ESP32)
Dateien:
El Trabajante/src/utils/topic_builder.h(Interface)El Trabajante/src/utils/topic_builder.cpp(Implementation)
Architektur
cpp1class TopicBuilder { 2public: 3 // Konfiguration 4 static void setEspId(const char* esp_id); 5 static void setKaiserId(const char* kaiser_id); 6 7 // Topic-Methoden (geben Pointer auf internen Buffer zurück) 8 static const char* buildSensorDataTopic(uint8_t gpio); 9 static const char* buildActuatorCommandTopic(uint8_t gpio); 10 // ... 20 weitere Methoden 11 12private: 13 static char topic_buffer_[256]; // Shared buffer 14 static char esp_id_[32]; 15 static char kaiser_id_[64]; 16};
Implementation-Pattern (topic_builder.cpp)
cpp1// Pattern: snprintf → validateTopicBuffer 2const char* TopicBuilder::buildSensorDataTopic(uint8_t gpio) { 3 int written = snprintf(topic_buffer_, sizeof(topic_buffer_), 4 "kaiser/%s/esp/%s/sensor/%d/data", 5 kaiser_id_, esp_id_, gpio); 6 return validateTopicBuffer(written); 7} 8 9// Buffer-Validation gegen Overflow und Encoding-Errors 10const char* TopicBuilder::validateTopicBuffer(int snprintf_result) { 11 if (snprintf_result < 0) { 12 LOG_ERROR("TopicBuilder: snprintf encoding error!"); 13 return ""; 14 } 15 if (snprintf_result >= (int)sizeof(topic_buffer_)) { 16 LOG_ERROR("TopicBuilder: Topic truncated!"); 17 return ""; 18 } 19 return topic_buffer_; 20}
Neuen Topic hinzufügen (ESP32)
- Header erweitern (
topic_builder.h):
cpp1static const char* buildYourNewTopic(uint8_t gpio);
- Implementation hinzufügen (
topic_builder.cpp):
cpp1const char* TopicBuilder::buildYourNewTopic(uint8_t gpio) { 2 int written = snprintf(topic_buffer_, sizeof(topic_buffer_), 3 "kaiser/%s/esp/%s/your/new/%d/topic", 4 kaiser_id_, esp_id_, gpio); 5 return validateTopicBuffer(written); 6}
- Synchron halten: Server-Konstante + MQTT_TOPICS.md aktualisieren!
4. Topics (Server)
Dateien:
El Servador/god_kaiser_server/src/core/constants.py(Konstanten)El Servador/god_kaiser_server/src/mqtt/topics.py(TopicBuilder)
Konstanten-Pattern (constants.py)
python1# Topic-Pattern mit Placeholder 2MQTT_TOPIC_ESP_SENSOR_DATA = "kaiser/{kaiser_id}/esp/{esp_id}/sensor/{gpio}/data" 3MQTT_TOPIC_ESP_ACTUATOR_COMMAND = "kaiser/{kaiser_id}/esp/{esp_id}/actuator/{gpio}/command" 4 5# Subscription-Pattern (Wildcards) 6MQTT_SUBSCRIBE_ESP_SENSORS = "kaiser/{kaiser_id}/esp/+/sensor/+/data" 7MQTT_SUBSCRIBE_ESP_ALL = "kaiser/{kaiser_id}/esp/+/#" 8 9# Helper-Funktion 10def get_topic_with_kaiser_id(topic_template: str, **kwargs) -> str: 11 kaiser_id = get_kaiser_id() # Default: "god" 12 return topic_template.format(kaiser_id=kaiser_id, **kwargs)
Neuen Topic hinzufügen (Server)
- Konstante definieren (
constants.py):
python1MQTT_TOPIC_YOUR_NEW = "kaiser/{kaiser_id}/esp/{esp_id}/your/new/{gpio}/topic"
- TopicBuilder erweitern (
topics.py):
python1@staticmethod 2def build_your_new_topic(esp_id: str, gpio: int) -> str: 3 return constants.get_topic_with_kaiser_id( 4 constants.MQTT_TOPIC_YOUR_NEW, 5 esp_id=esp_id, 6 gpio=gpio 7 )
- Synchron halten: ESP32 TopicBuilder + MQTT_TOPICS.md aktualisieren!
5. Handler-Pattern
Datei: El Servador/god_kaiser_server/src/mqtt/handlers/
Handler-Registrierung (main.py lifespan, u. a. 203–310)
python1# lifespan() Funktion 2subscriber.register_handler( 3 "kaiser/+/esp/+/sensor/+/data", # Pattern mit Wildcard (multi-Kaiser support) 4 sensor_handler.handle_sensor_data # Handler-Funktion 5)
Handler-Übersicht
| Pattern | Handler | QoS | Zeile |
|---|---|---|---|
+/sensor/+/data | SensorDataHandler | 1 | 203 |
+/actuator/+/status | ActuatorStatusHandler | 1 | 207 |
+/actuator/+/response | ActuatorResponseHandler | 1 | 212 |
+/actuator/+/alert | ActuatorAlertHandler | 1 | 217 |
+/system/heartbeat | HeartbeatHandler | 0 | 221 |
+/config_response | ConfigHandler | 2 | 229 |
+/zone/ack | ZoneAckHandler | 1 | 234 |
+/subzone/ack | SubzoneAckHandler | 1 | 239 |
+/system/will | LWTHandler | 1 | 248 |
+/system/error | ErrorEventHandler | 1 | 256 |
+/system/intent_outcome | IntentOutcomeHandler | 1 | ~299 |
+/system/intent_outcome/lifecycle | IntentOutcomeLifecycleHandler | 1 | ~306 |
Neuen Handler hinzufügen
- Handler-Datei erstellen (
mqtt/handlers/your_handler.py):
python1from ..core.logging_config import get_logger 2from sqlalchemy.ext.asyncio import AsyncSession 3 4logger = get_logger(__name__) 5 6async def handle_your_event(topic: str, payload: dict) -> bool: 7 """ 8 Handle incoming your_event messages. 9 10 Args: 11 topic: MQTT topic string 12 payload: Parsed JSON payload 13 14 Returns: 15 True if successful, False otherwise 16 """ 17 try: 18 # Extract esp_id from topic 19 parts = topic.split("/") 20 esp_id = parts[3] # kaiser/{kaiser_id}/esp/{esp_id}/... 21 22 # Process payload 23 logger.info(f"Processing your_event from {esp_id}") 24 25 # DB operation with session 26 async with async_session_maker() as session: 27 # ... your logic 28 await session.commit() 29 30 return True 31 32 except Exception as e: 33 logger.error(f"Handle your_event failed: {e}", exc_info=True) 34 return False
- In main.py registrieren (~Zeile 260):
python1from .mqtt.handlers.your_handler import handle_your_event 2 3subscriber.register_handler( 4 f"kaiser/{kaiser_id}/esp/+/your/new/topic", 5 handle_your_event 6)
- Topic zu constants.py hinzufügen
6. Publisher-Pattern
Datei: El Servador/god_kaiser_server/src/mqtt/publisher.py
Publisher-Klasse
python1class Publisher: 2 def __init__(self, mqtt_client: Optional[MQTTClient] = None): 3 self.client = mqtt_client or MQTTClient.get_instance() 4 5 # Resilience Settings 6 self.max_retries = settings.resilience.retry_max_attempts 7 self.base_delay = settings.resilience.retry_base_delay 8 9 def publish_actuator_command( 10 self, 11 esp_id: str, 12 gpio: int, 13 command: str, 14 value: float, 15 duration: int = 0, 16 retry: bool = True, 17 correlation_id: Optional[str] = None, 18 ) -> bool: 19 topic = TopicBuilder.build_actuator_command_topic(esp_id, gpio) 20 payload = { 21 "command": command.upper(), 22 "value": value, 23 "duration": duration, 24 "timestamp": int(time.time()), 25 } 26 if correlation_id: 27 payload["correlation_id"] = correlation_id 28 payload["intent_id"] = correlation_id # IntentMetadata + command_intents.sent (Epic1-05) 29 30 return self._publish_with_retry(topic, payload, QOS_ACTUATOR_COMMAND, retry)
Not-Aus (REST): api/v1/actuators.py setzt correlation_id über build_emergency_actuator_correlation_id(incident_correlation_id, esp_id, gpio) (Format {incident}:{esp_id}:{gpio}). Bei erfolgreichem Publish: actuator_history.command_metadata enthält incident_correlation_id, correlation_id und mqtt_correlation_id (letztere beiden = MQTT-Wert). incident_correlation_id zusätzlich in REST EmergencyStopResponse. Referenz: El Servador/god_kaiser_server/docs/emergency-stop-mqtt-correlation.md; Finalität: El Servador/god_kaiser_server/docs/finalitaet-http-mqtt-ws.md.
Retry-Pattern mit Exponential Backoff
python1def _publish_with_retry(self, topic, payload, qos, retry) -> bool: 2 attempts = self.max_retries if retry else 1 3 4 for attempt in range(1, attempts + 1): 5 success = self.client.publish(topic, json.dumps(payload), qos) 6 7 if success: 8 return True 9 10 if attempt < attempts: 11 delay = calculate_backoff_delay( 12 attempt=attempt - 1, 13 base_delay=self.base_delay, 14 max_delay=self.max_delay, 15 exponential_base=self.exponential_base, 16 jitter=self.jitter_enabled, 17 ) 18 time.sleep(delay) 19 20 return False
Neue Publish-Methode hinzufügen
python1def publish_your_command( 2 self, 3 esp_id: str, 4 gpio: int, 5 data: Dict[str, Any], 6 retry: bool = True, 7) -> bool: 8 topic = TopicBuilder.build_your_topic(esp_id, gpio) 9 payload = { 10 **data, 11 "timestamp": int(time.time()), 12 } 13 14 qos = constants.QOS_YOUR_COMMAND # 0, 1, oder 2 15 16 logger.info(f"Publishing your command to {esp_id} GPIO {gpio}") 17 return self._publish_with_retry(topic, payload, qos, retry)
7. Offline Buffer & Circuit Breaker
ESP32 Circuit Breaker (mqtt_client.cpp)
cpp1// Konstruktor: CircuitBreaker-Konfiguration 2circuit_breaker_("MQTT", 5, 30000, 10000) 3// Parameter: name, failure_threshold, recovery_timeout_ms, half_open_timeout_ms 4 5// 5 Failures → OPEN state 6// 30s recovery timeout 7// 10s half-open test timeout
Publish mit Circuit Breaker:
cpp1bool MQTTClient::publish(const String& topic, const String& payload, uint8_t qos) { 2 // Circuit Breaker Check 3 if (!circuit_breaker_.allowRequest()) { 4 LOG_WARNING("MQTT publish blocked by Circuit Breaker (Service DOWN)"); 5 return false; 6 } 7 8 // Registration Gate Check (verhindert Publish vor Heartbeat-ACK) 9 // Whitelisted: heartbeat, config_response, zone/ack, subzone/ack 10 bool is_system_response = topic.indexOf("/config_response") != -1 || 11 topic.indexOf("/zone/ack") != -1 || 12 topic.indexOf("/subzone/ack") != -1; 13 if (!registration_confirmed_ && !is_heartbeat && !is_system_response) { 14 LOG_DEBUG("Publish blocked (awaiting registration)"); 15 return false; 16 } 17 18 // Empty Payload Guard 19 if (payload.length() == 0) { 20 LOG_ERROR("Empty payload blocked"); 21 return false; 22 } 23 24 // Actual publish 25 bool success = mqtt_.publish(topic.c_str(), payload.c_str(), qos == 1); 26 27 success ? circuit_breaker_.recordSuccess() : circuit_breaker_.recordFailure(); 28 29 if (!success) { 30 addToOfflineBuffer(topic, payload, qos); 31 } 32 33 return success; 34}
ESP32 Offline Buffer:
cpp1// Max 100 messages (Speicher-limitiert) 2static const uint16_t MAX_OFFLINE_MESSAGES = 100; 3 4struct OfflineMessage { 5 String topic; 6 String payload; 7 uint8_t qos; 8 unsigned long timestamp; 9}; 10 11OfflineMessage offline_buffer_[MAX_OFFLINE_MESSAGES];
Server Offline Buffer (offline_buffer.py)
python1@dataclass 2class BufferedMessage: 3 topic: str 4 payload: str # JSON string 5 qos: int 6 retain: bool 7 timestamp: float = field(default_factory=time.time) 8 attempts: int = 0 9 10class MQTTOfflineBuffer: 11 def __init__(self, max_size=1000, flush_batch_size=50): 12 self._buffer: Deque[BufferedMessage] = deque(maxlen=max_size) 13 self._lock = asyncio.Lock() 14 15 async def add(self, topic, payload, qos=1, retain=False) -> bool: 16 async with self._lock: 17 message = BufferedMessage(topic=topic, payload=payload, qos=qos, retain=retain) 18 self._buffer.append(message) 19 return True 20 21 async def flush(self, mqtt_client) -> int: 22 """Flush messages when connection restored.""" 23 flushed_count = 0 24 async with self._lock: 25 while self._buffer: 26 message = self._buffer.popleft() 27 success = mqtt_client.publish(message.topic, message.payload, message.qos) 28 if success: 29 flushed_count += 1 30 else: 31 # Re-queue failed (max 3 attempts) 32 if message.attempts < 3: 33 message.attempts += 1 34 self._buffer.appendleft(message) 35 break 36 return flushed_count
8. Mosquitto Broker Config
Datei: docker/mosquitto/mosquitto.conf
Dual-Listener
conf1# MQTT (für ESP32) 2listener 1883 3protocol mqtt 4 5# WebSocket (für Frontend) 6listener 9001 7protocol websockets
Development-Mode
conf1# WARNING: Only for development! 2allow_anonymous true 3 4# Production würde aktivieren: 5# allow_anonymous false 6# password_file /mosquitto/config/passwd 7# acl_file /mosquitto/config/acl
Limits
| Setting | Wert | Beschreibung |
|---|---|---|
max_inflight_messages | 20 | Gleichzeitig unbestätigte QoS 1/2 |
max_queued_messages | 1000 | Queue pro Client |
message_size_limit | 262144 | 256KB max Payload |
max_keepalive | 65535 | Max Keepalive-Interval |
Persistence
conf1persistence true 2persistence_location /mosquitto/data/
9. MQTT Error-Codes
ESP32 MQTT Errors (3010-3016)
| Code | Name | Beschreibung |
|---|---|---|
| 3010 | MQTT_INIT_FAILED | Failed to initialize MQTT client |
| 3011 | MQTT_CONNECT_FAILED | MQTT broker connection failed |
| 3012 | MQTT_PUBLISH_FAILED | Failed to publish MQTT message |
| 3013 | MQTT_SUBSCRIBE_FAILED | Failed to subscribe to topic |
| 3014 | MQTT_DISCONNECT | MQTT disconnected from broker |
| 3015 | MQTT_BUFFER_FULL | Offline buffer is full |
| 3016 | MQTT_PAYLOAD_INVALID | Payload is invalid or malformed |
Server MQTT Errors (5101-5107)
| Code | Name | Beschreibung |
|---|---|---|
| 5101 | PUBLISH_FAILED | MQTT publish operation failed |
| 5102 | TOPIC_BUILD_FAILED | Failed to build MQTT topic |
| 5103 | PAYLOAD_SERIALIZATION_FAILED | Failed to serialize payload |
| 5104 | CONNECTION_LOST | MQTT connection lost |
| 5105 | RETRY_EXHAUSTED | Retry attempts exhausted |
| 5106 | BROKER_UNAVAILABLE | Broker is unavailable |
| 5107 | AUTHENTICATION_FAILED | Authentication failed |
10. Kritische Dateipfade
ESP32
| Datei | Beschreibung |
|---|---|
El Trabajante/src/utils/topic_builder.cpp | Topic-Builder Implementation |
El Trabajante/src/utils/topic_builder.h | Topic-Builder Header |
El Trabajante/src/services/communication/mqtt_client.cpp | MQTT Client mit Circuit Breaker |
El Trabajante/src/services/communication/mqtt_client.h | MQTT Client Header |
El Trabajante/src/services/safety/offline_mode_manager.cpp | Grace 30s → OFFLINE_ACTIVE, MQTT-Disconnect-Verhalten |
El Trabajante/src/services/safety/offline_mode_manager.h | Offline-Modus-Enum und Konstanten (OFFLINE_ACTIVATION_DELAY_MS) |
El Trabajante/src/models/error_codes.h | Error-Codes 3010-3016 |
Server
| Datei | Beschreibung |
|---|---|
El Servador/god_kaiser_server/src/core/constants.py | Topic-Konstanten, QoS |
El Servador/god_kaiser_server/src/mqtt/topics.py | TopicBuilder Python |
El Servador/god_kaiser_server/src/mqtt/publisher.py | Publisher mit Retry |
El Servador/god_kaiser_server/src/mqtt/subscriber.py | Subscriber + Handler-Routing |
El Servador/god_kaiser_server/src/mqtt/client.py | MQTT Client Wrapper |
El Servador/god_kaiser_server/src/mqtt/offline_buffer.py | Async Offline Buffer |
El Servador/god_kaiser_server/src/services/mqtt_command_bridge.py | MQTTCommandBridge: ACK-Warten (send_and_wait_ack), resolve_ack |
El Servador/god_kaiser_server/src/mqtt/handlers/ | Message-Handler-Module + base_handler.py (IST: Ordner listen) |
El Servador/god_kaiser_server/src/main.py | Handler-Registrierung (lifespan, u. a. intent_outcome + lifecycle) |
El Servador/god_kaiser_server/src/core/error_codes.py | Error-Codes 5101-5107 |
Docker
| Datei | Beschreibung |
|---|---|
docker/mosquitto/mosquitto.conf | Broker-Konfiguration |
docker-compose.yml | Service-Definition mqtt-broker |
Referenzen
| Datei | Beschreibung |
|---|---|
.claude/reference/api/MQTT_TOPICS.md | Vollständige Topic-Dokumentation |
.claude/reference/errors/ERROR_CODES.md | Error-Code Referenz |
11. Make-Targets
bash1# MQTT-Traffic beobachten (alle kaiser Topics) 2make mqtt-sub 3# → docker exec -it automationone-mqtt mosquitto_sub -h localhost -t "kaiser/#" -v -C 10 -W 30 4 5# Broker-Logs anzeigen 6make logs-mqtt 7# → docker compose logs mqtt-broker 8 9# Broker neustarten 10make restart-mqtt 11# → docker compose restart mqtt-broker
12. QoS-Entscheidung Flowchart
Neuer Topic/Message?
│
▼
Kann ein Duplikat Schaden anrichten?
(Actuator wird zweimal aktiviert, Config wird doppelt angewendet)
│
┌────┴────┐
│ JA │ NEIN
▼ ▼
QoS 2 Ist Message-Loss kritisch?
│
┌────┴────┐
│ JA │ NEIN
▼ ▼
QoS 1 QoS 0
13. Bekannte Lücken / TODOs
Synchronisations-Issues
-
MQTT_TOPICS.md Zeilennummern
- Zeilennummern in Code-Referenzen teilweise veraltet nach Refactoring
- Empfehlung: Bei Änderungen MQTT_TOPICS.md aktualisieren
-
Bridge/ACL nicht implementiert
- Mosquitto läuft im anonymous-Mode
- Production benötigt
password_file+acl_file - Siehe:
.claude/reference/security/PRODUCTION_CHECKLIST.md
-
Keine trace-id in MQTT-Payloads
- Korrelation nur via
esp_id+gpio+timestamp - End-to-End-Tracking nur über
correlation_idin Actuator-Commands
- Korrelation nur via
Fehlende Server-Enum-Einträge
- I2C Bus Recovery Codes (1015-1018) nicht in
ESP32HardwareErrorenum - DS18B20 Codes (1060-1063) nicht in
ESP32HardwareErrorenum INVALID_PAYLOAD_FORMATfehlt inValidationErrorCode
14. Workflow
1. ANALYSE → Topic-Schema + QoS bestimmen
2. ESP32 → topic_builder.cpp/h erweitern
3. SERVER → constants.py + topics.py + Handler
4. SYNCHRON → MQTT_TOPICS.md aktualisieren
5. VERIFY → make mqtt-sub, Logs prüfen
15. Regeln
NIEMALS
- Topics ohne Synchronisation zwischen ESP32/Server ändern
- QoS für Actuator-Commands unter 2 setzen
- Empty-Payloads publishen (werden blockiert)
- Handler ohne Error-Handling implementieren
- Blocking-Code in async Handlers
IMMER
- Topic-Schema
kaiser/{kaiser_id}/esp/{esp_id}/...einhalten - QoS-Strategie befolgen (Section 2)
- Offline-Verhalten berücksichtigen
- Error-Codes aus Ranges verwenden (ESP32: 3010-3016, Server: 5101-5107)
- MQTT_TOPICS.md bei Topic-Änderungen aktualisieren
Kompakter Skill für MQTT-Entwicklung. Details in MQTT_TOPICS.md und ERROR_CODES.md