munet Network Library for muwerk Scheduler
A muwerk network library supporting WiFi, NTP, OTA and MQTT for ESP8266 and ESP32 and serial links for all platforms
Loading...
Searching...
No Matches
mqtt.h
1// MQTT.h
2
3#pragma once
4
5// #if defined(__ESP__)
6
7#include <functional>
8
9#include <Arduino_JSON.h>
10#include <PubSubClient.h> // ESP32 requires v2.7 or later
11
12#include "ustd_platform.h"
13#include "ustd_array.h"
14#include "ustd_map.h"
15
16#include "scheduler.h"
17#include "timeout.h"
18
19#include "munet.h"
20
21namespace ustd {
22
79class Mqtt {
80 private:
81 // muwerk task management
82 Scheduler *pSched;
83 int tID;
84
85 // mqtt client
86 WiFiClient wifiClient;
87 PubSubClient mqttClient;
88
89 // active configuration
90 String mqttServer;
91 uint16_t mqttServerPort;
92 String mqttUsername;
93 String mqttPassword;
94 bool mqttRetained;
95 String clientName;
96 String domainToken;
97 String outDomainToken;
98 String lwTopic;
99 String lwMsg;
100 // computed configuration
101 ustd::array<String> ownedPrefixes;
102 bool bStateRetained;
103 String outDomainPrefix; // outDomainToken + '/' + clientName, or just clientName, if
104 // outDomainToken==""
105
106 // persistently initialized tables
107 ustd::array<String> subsList;
108 ustd::array<String> outgoingBlockList;
109 ustd::array<String> incomingBlockList;
110
111 // runtime control - state management
112 bool isOn = false;
113 bool netUp = false;
114 bool bMqInit = false;
115 bool bWarned = false;
116 bool bCheckConnection = false;
117 bool mqttConnected = false;
118 ustd::timeout mqttTickerTimeout = 5000L;
119
120 public:
128 mqttClient = wifiClient;
129 }
130
131 ~Mqtt() {
132 }
133
134 void begin(Scheduler *_pSched, String _mqttServer = "", uint16_t _mqttServerPort = 1883,
135 bool _mqttRetained = false, String _clientName = "${hostname}",
136 String _domainToken = "mu", String _outDomainToken = "omu",
137 String _mqttUsername = "", String _mqttPassword = "", String _willTopic = "",
138 String _willMessage = "") {
181 ustd::jsonfile conf;
182
183 // read configuration
184 mqttServer = conf.readString("mqtt/host", _mqttServer);
185 mqttServerPort = (uint16_t)conf.readLong("mqtt/port", 1, 65535, _mqttServerPort);
186 mqttUsername = conf.readString("mqtt/username", _mqttUsername);
187 mqttPassword = conf.readString("mqtt/password", _mqttPassword);
188 mqttRetained = conf.readBool("mqtt/alwaysRetained", _mqttRetained);
189 clientName = conf.readString("mqtt/clientName", 1, _clientName);
190 domainToken = conf.readString("mqtt/domainToken", 1, _domainToken);
191 outDomainToken = conf.readString("mqtt/outDomainToken", _outDomainToken);
192 lwTopic = conf.readString("mqtt/lastWillTopic", _willTopic);
193 lwMsg = conf.readString("mqtt/lastWillMessage", _willMessage);
194
195 // persistently initialized tables
196 conf.readStringArray("mqtt/subscriptions", subsList);
197 conf.readStringArray("mqtt/outgoingBlackList", outgoingBlockList);
198 conf.readStringArray("mqtt/incomingBlackList", incomingBlockList);
199
200 // This configuration is preliminary but it is ok. Currently we have no network connection
201 // and nothing can happen with this prelimiary information. As soon as a network connection
202 // is established, the configuration information will be finalized. This is not possible now
203 // since the replacement of placeholders must be able to access some network stack
204 // information like the mac id or the hostname but this information is inaccesible if the
205 // network stack has not been enabled and configured.
206
207 // init scheduler
208 pSched = _pSched;
209 tID = pSched->add([this]() { this->loop(); }, "mqtt");
210
211 // subscribe to all messages
212 pSched->subscribe(tID, "#", [this](String topic, String msg, String originator) {
213 this->subsMsg(topic, msg, originator);
214 });
215
216 if (mqttServer.length()) {
217 // query update from network stack
218 pSched->publish("net/network/get");
219 } else {
220 DBG("mqtt: WARNING - no server defined.");
221 }
222
223 // initialize runtime
224 isOn = true;
225 netUp = false;
226 bMqInit = configureMqttClient();
227 bWarned = false;
228 bStateRetained = false;
229 bCheckConnection = false;
230 mqttConnected = false;
231 mqttTickerTimeout = 5000L; // 5 seconds
232
233 publishState();
234 }
235
236 int addSubscription(int taskID, String topic, T_SUBS subs, String originator = "") {
258 int handle;
259 pSched->subscribe(taskID, topic, subs, originator);
260 for (unsigned int i = 0; i < subsList.length(); i++) {
261 if (topic == subsList[i])
262 return handle; // Already subbed via mqtt.
263 }
264 if (mqttConnected) {
265 mqttClient.subscribe(topic.c_str());
266 }
267 subsList.add(topic);
268 return handle;
269 }
270
271 bool removeSubscription(int subscriptionHandle, String topic) {
282 bool ret = pSched->unsubscribe(subscriptionHandle);
283 for (unsigned int i = 0; i < subsList.length(); i++) {
284 if (topic == subsList[i])
285 subsList.erase(i);
286 }
287 return ret;
288 }
289
290 bool outgoingBlockSet(String topic) {
298 for (unsigned int i = 0; i < outgoingBlockList.length(); i++) {
299 if (outgoingBlockList[i] == topic)
300 return true;
301 }
302 if (outgoingBlockList.add(topic) == -1)
303 return false;
304 return true;
305 }
306
307 bool outgoingBlockRemove(String topic) {
315 for (unsigned int i = 0; i < outgoingBlockList.length(); i++) {
316 if (outgoingBlockList[i] == topic) {
317 if (!outgoingBlockList.erase(i))
318 return false;
319 return true;
320 }
321 }
322 return false;
323 }
324
325 bool incomingBlockSet(String topic) {
333 for (unsigned int i = 0; i < incomingBlockList.length(); i++) {
334 if (incomingBlockList[i] == topic)
335 return true;
336 }
337 if (incomingBlockList.add(topic) == -1)
338 return false;
339 return true;
340 }
341
342 bool incomingBlockRemove(String topic) {
350 for (unsigned int i = 0; i < incomingBlockList.length(); i++) {
351 if (incomingBlockList[i] == topic) {
352 if (!incomingBlockList.erase(i))
353 return false;
354 return true;
355 }
356 }
357 return false;
358 }
359
360 private:
361 inline void publishState() {
362 pSched->publish("mqtt/state", mqttConnected ? "connected" : "disconnected");
363 }
364
365 void loop() {
366 if (!isOn || !netUp || mqttServer.length() == 0) {
367 return;
368 }
369 if (mqttConnected) {
370 mqttClient.loop();
371 }
372 if (bCheckConnection || mqttTickerTimeout.test()) {
373 mqttTickerTimeout.reset();
374 bCheckConnection = false;
375 if (!mqttClient.connected()) {
376 // Attempt to connect
377 const char *usr = mqttUsername.length() ? mqttUsername.c_str() : NULL;
378 const char *pwd = mqttPassword.length() ? mqttPassword.c_str() : NULL;
379 bool conRes = mqttClient.connect(clientName.c_str(), usr, pwd, lwTopic.c_str(), 0,
380 true, lwMsg.c_str());
381 if (conRes) {
382 DBG2("Connected to mqtt server");
383 mqttConnected = true;
384 mqttClient.subscribe((clientName + "/#").c_str());
385 mqttClient.subscribe((domainToken + "/#").c_str());
386 for (unsigned int i = 0; i < subsList.length(); i++) {
387 mqttClient.subscribe(subsList[i].c_str());
388 }
389 bWarned = false;
390 pSched->publish("mqtt/config", outDomainPrefix + "+" + lwTopic + "+" + lwMsg);
391 publishState();
392 } else {
393 mqttConnected = false;
394 if (!bWarned) {
395 bWarned = true;
396 publishState();
397 DBG2("MQTT disconnected.");
398 }
399 }
400 }
401 }
402 }
403
404 void mqttReceive(char *ctopic, unsigned char *payload, unsigned int length) {
405 String msg;
406 String topic;
407
408 // prepare message and topic
409 topic = (const char *)ctopic;
410 if (length && payload) {
411 char *szBuffer = (char *)malloc(length + 1);
412 if (szBuffer) {
413 memcpy(szBuffer, payload, length);
414 szBuffer[length] = 0;
415 msg = szBuffer;
416 free(szBuffer);
417 } else {
418 DBG("mqtt: ERROR - message body lost due to memory outage");
419 }
420 }
421
422 for (unsigned int i = 0; i < incomingBlockList.length(); i++) {
423 if (Scheduler::mqttmatch(topic, incomingBlockList[i])) {
424 // blocked incoming
425 DBG2("mqtt: Blocked " + topic);
426 return;
427 }
428 }
429 for (unsigned int i = 0; i < subsList.length(); i++) {
430 if (Scheduler::mqttmatch(topic, subsList[i])) {
431 DBG2("mqtt: subscribed topic " + topic);
432 pSched->publish(topic, msg, "mqtt");
433 return;
434 }
435 }
436 // strip the client name token or the domain token in messages for us
437 for (unsigned int i = 0; i < ownedPrefixes.length(); i++) {
438 if (ownedPrefixes[i].length() <= topic.length()) {
439 // basically this comparison is not really needed since at this point we could
440 // ONLY have messages that match either the domainToken or the clientName since
441 // we have exactly subscribed to those. But who knows....
442 if (ownedPrefixes[i] == topic.substring(0, ownedPrefixes[i].length())) {
443 topic = (const char *)(ctopic + ownedPrefixes[i].length());
444 pSched->publish(topic, msg, "mqtt");
445 }
446 }
447 }
448 }
449
450 void subsMsg(String topic, String msg, String originator) {
451 if (originator == "mqtt") {
452 return; // avoid loops
453 }
454
455 // router function
456 if (mqttConnected) {
457 unsigned int len = msg.length() + 1;
458 for (unsigned int i = 0; i < outgoingBlockList.length(); i++) {
459 if (Scheduler::mqttmatch(topic, outgoingBlockList[i])) {
460 // Item is blocked.
461 return;
462 }
463 }
464 String tpc;
465 if (topic.c_str()[0] == '!') {
466 tpc = &(topic.c_str()[1]);
467 } else {
468 tpc = outDomainPrefix + "/" + topic;
469 }
470
471 bool bRetain = mqttRetained;
472 if (tpc.c_str()[0] == '!') {
473 // remove second exclamation point
474 tpc = &(topic.c_str()[2]);
475 bRetain = true;
476 }
477 if (!bRetain && bStateRetained && topic == "mqtt/state") {
478 // the state topic shall always be retained
479 bRetain = true;
480 }
481
482 DBG3("mqtt: publishing...");
483 if (mqttClient.publish(tpc.c_str(), msg.c_str(), bRetain)) {
484 DBG2("mqtt publish: " + topic + " | " + msg);
485 } else {
486 DBG("mqtt: ERROR len=" + String(len) + ", not published: " + topic + " | " + msg);
487 if (len > 128) {
488 DBG("mqtt: FATAL ERROR: you need to re-compile the PubSubClient library "
489 "and "
490 "increase #define MQTT_MAX_PACKET_SIZE.");
491 }
492 }
493 } else {
494 DBG2("mqtt: NO CONNECTION, not published: " + topic + " | " + msg);
495 }
496
497 // internal processing
498 if (topic == "mqtt/state/get") {
499 publishState();
500 } else if (topic == "mqtt/config/get") {
501 pSched->publish("mqtt/config", outDomainPrefix + "+" + lwTopic + "+" + lwMsg);
502 } else if (topic == "mqtt/outgoingblock/set") {
503 outgoingBlockSet(msg);
504 } else if (topic == "mqtt/outgoingblock/remove") {
506 } else if (topic == "mqtt/incomingblock/set") {
507 incomingBlockSet(msg);
508 } else if (topic == "mqtt/incomingblock/remove") {
510 } else if (topic == "net/network") {
511 // network state received:
512 JSONVar jsonState = JSON.parse(msg);
513 if (JSON.typeof(jsonState) != "object") {
514 DBG("mqtt: Received broken network state " + msg);
515 return;
516 }
517 String state = (const char *)jsonState["state"];
518 String hostname = (const char *)jsonState["hostname"];
519 String mac = (const char *)jsonState["mac"];
520 if (state == "connected") {
521 DBG3("mqtt: received network connect");
522 if (!netUp) {
523 DBG2("mqtt: net state online");
524 finalizeConfiguration(hostname, mac);
525 netUp = true;
526 bCheckConnection = true;
527 }
528 } else {
529 netUp = false;
530 mqttConnected = false;
531 publishState();
532 DBG2("mqtt: net state offline");
533 }
534 }
535 }
536
537 bool configureMqttClient() {
538 if (mqttServer.length() == 0) {
539 DBG2("mqtt: No mqtt host defined. Ignoring configuration...");
540 return false;
541 }
542 bCheckConnection = true;
543 mqttClient.setServer(mqttServer.c_str(), mqttServerPort);
544
545 if (!bMqInit) {
546 mqttClient.setCallback([this](char *topic, unsigned char *msg, unsigned int len) {
547 this->mqttReceive(topic, msg, len);
548 });
549 bMqInit = true;
550 }
551 return true;
552 }
553
554 void finalizeConfiguration(String &hostname, String &mac) {
555 // get network information
556 if (hostname.length() == 0) {
557#if defined(__ESP32__)
558 String hostname = WiFi.getHostname();
559#else
560 String hostname = WiFi.hostname();
561#endif
562 }
563 if (mac.length() == 0) {
564 mac = WiFi.macAddress();
565 }
566 mac.replace(":", "");
567
568 // transform and integrate missing configuration data
569 clientName = replaceVars(clientName, hostname, mac);
570 if (outDomainToken.length()) {
571 outDomainPrefix = outDomainToken + "/" + clientName;
572 } else {
573 outDomainPrefix = clientName;
574 }
575 if (lwTopic.length()) {
576 lwMsg = replaceVars(lwMsg, hostname, mac);
577 } else {
578 lwTopic = outDomainPrefix + "/mqtt/state";
579 lwMsg = "disconnected";
580 bStateRetained = true;
581 }
582 String clientPrefix = clientName + "/";
583 String domainPrefix = domainToken + "/";
584 ownedPrefixes.erase();
585 ownedPrefixes.add(clientPrefix);
586 ownedPrefixes.add(domainPrefix);
587 }
588
589 String replaceVars(String val, String &hostname, String &macAddress) {
590 val.replace("${hostname}", hostname);
591 val.replace("${mac}", macAddress);
592 val.replace("${macls}", macAddress.substring(6));
593 val.replace("${macfs}", macAddress.substring(0, 5));
594 return val;
595 }
596};
597
598} // namespace ustd
599
600// #endif // defined(__ESP__)
munet MQTT Gateway Class
Definition: mqtt.h:79
int addSubscription(int taskID, String topic, T_SUBS subs, String originator="")
Definition: mqtt.h:236
bool removeSubscription(int subscriptionHandle, String topic)
Definition: mqtt.h:271
void begin(Scheduler *_pSched, String _mqttServer="", uint16_t _mqttServerPort=1883, bool _mqttRetained=false, String _clientName="${hostname}", String _domainToken="mu", String _outDomainToken="omu", String _mqttUsername="", String _mqttPassword="", String _willTopic="", String _willMessage="")
Definition: mqtt.h:134
bool outgoingBlockSet(String topic)
Definition: mqtt.h:290
bool outgoingBlockRemove(String topic)
Definition: mqtt.h:307
bool incomingBlockRemove(String topic)
Definition: mqtt.h:342
bool incomingBlockSet(String topic)
Definition: mqtt.h:325
Mqtt()
Definition: mqtt.h:121
The muwerk namespace.
Definition: mqtt.h:21