9#include <Arduino_JSON.h>
10#include <PubSubClient.h>
12#include "ustd_platform.h"
13#include "ustd_array.h"
86 WiFiClient wifiClient;
87 PubSubClient mqttClient;
91 uint16_t mqttServerPort;
97 String outDomainToken;
101 ustd::array<String> ownedPrefixes;
103 String outDomainPrefix;
107 ustd::array<String> subsList;
108 ustd::array<String> outgoingBlockList;
109 ustd::array<String> incomingBlockList;
114 bool bMqInit =
false;
115 bool bWarned =
false;
116 bool bCheckConnection =
false;
117 bool mqttConnected =
false;
118 ustd::timeout mqttTickerTimeout = 5000L;
128 mqttClient = wifiClient;
134 void begin(Scheduler *_pSched, String _mqttServer =
"", uint16_t _mqttServerPort = 1883,
135 bool _mqttRetained =
false, String _clientName =
"${hostname}",
136 String _domainToken =
"mu", String _outDomainToken =
"omu",
137 String _mqttUsername =
"", String _mqttPassword =
"", String _willTopic =
"",
138 String _willMessage =
"") {
184 mqttServer = conf.readString(
"mqtt/host", _mqttServer);
185 mqttServerPort = (uint16_t)conf.readLong(
"mqtt/port", 1, 65535, _mqttServerPort);
186 mqttUsername = conf.readString(
"mqtt/username", _mqttUsername);
187 mqttPassword = conf.readString(
"mqtt/password", _mqttPassword);
188 mqttRetained = conf.readBool(
"mqtt/alwaysRetained", _mqttRetained);
189 clientName = conf.readString(
"mqtt/clientName", 1, _clientName);
190 domainToken = conf.readString(
"mqtt/domainToken", 1, _domainToken);
191 outDomainToken = conf.readString(
"mqtt/outDomainToken", _outDomainToken);
192 lwTopic = conf.readString(
"mqtt/lastWillTopic", _willTopic);
193 lwMsg = conf.readString(
"mqtt/lastWillMessage", _willMessage);
196 conf.readStringArray(
"mqtt/subscriptions", subsList);
197 conf.readStringArray(
"mqtt/outgoingBlackList", outgoingBlockList);
198 conf.readStringArray(
"mqtt/incomingBlackList", incomingBlockList);
209 tID = pSched->add([
this]() { this->loop(); },
"mqtt");
212 pSched->subscribe(tID,
"#", [
this](String topic, String msg, String originator) {
213 this->subsMsg(topic, msg, originator);
216 if (mqttServer.length()) {
218 pSched->publish(
"net/network/get");
220 DBG(
"mqtt: WARNING - no server defined.");
226 bMqInit = configureMqttClient();
228 bStateRetained =
false;
229 bCheckConnection =
false;
230 mqttConnected =
false;
231 mqttTickerTimeout = 5000L;
259 pSched->subscribe(taskID, topic, subs, originator);
260 for (
unsigned int i = 0; i < subsList.length(); i++) {
261 if (topic == subsList[i])
265 mqttClient.subscribe(topic.c_str());
282 bool ret = pSched->unsubscribe(subscriptionHandle);
283 for (
unsigned int i = 0; i < subsList.length(); i++) {
284 if (topic == subsList[i])
298 for (
unsigned int i = 0; i < outgoingBlockList.length(); i++) {
299 if (outgoingBlockList[i] == topic)
302 if (outgoingBlockList.add(topic) == -1)
315 for (
unsigned int i = 0; i < outgoingBlockList.length(); i++) {
316 if (outgoingBlockList[i] == topic) {
317 if (!outgoingBlockList.erase(i))
333 for (
unsigned int i = 0; i < incomingBlockList.length(); i++) {
334 if (incomingBlockList[i] == topic)
337 if (incomingBlockList.add(topic) == -1)
350 for (
unsigned int i = 0; i < incomingBlockList.length(); i++) {
351 if (incomingBlockList[i] == topic) {
352 if (!incomingBlockList.erase(i))
361 inline void publishState() {
362 pSched->publish(
"mqtt/state", mqttConnected ?
"connected" :
"disconnected");
366 if (!isOn || !netUp || mqttServer.length() == 0) {
372 if (bCheckConnection || mqttTickerTimeout.test()) {
373 mqttTickerTimeout.reset();
374 bCheckConnection =
false;
375 if (!mqttClient.connected()) {
377 const char *usr = mqttUsername.length() ? mqttUsername.c_str() : NULL;
378 const char *pwd = mqttPassword.length() ? mqttPassword.c_str() : NULL;
379 bool conRes = mqttClient.connect(clientName.c_str(), usr, pwd, lwTopic.c_str(), 0,
380 true, lwMsg.c_str());
382 DBG2(
"Connected to mqtt server");
383 mqttConnected =
true;
384 mqttClient.subscribe((clientName +
"/#").c_str());
385 mqttClient.subscribe((domainToken +
"/#").c_str());
386 for (
unsigned int i = 0; i < subsList.length(); i++) {
387 mqttClient.subscribe(subsList[i].c_str());
390 pSched->publish(
"mqtt/config", outDomainPrefix +
"+" + lwTopic +
"+" + lwMsg);
393 mqttConnected =
false;
397 DBG2(
"MQTT disconnected.");
404 void mqttReceive(
char *ctopic,
unsigned char *payload,
unsigned int length) {
409 topic = (
const char *)ctopic;
410 if (length && payload) {
411 char *szBuffer = (
char *)malloc(length + 1);
413 memcpy(szBuffer, payload, length);
414 szBuffer[length] = 0;
418 DBG(
"mqtt: ERROR - message body lost due to memory outage");
422 for (
unsigned int i = 0; i < incomingBlockList.length(); i++) {
423 if (Scheduler::mqttmatch(topic, incomingBlockList[i])) {
425 DBG2(
"mqtt: Blocked " + topic);
429 for (
unsigned int i = 0; i < subsList.length(); i++) {
430 if (Scheduler::mqttmatch(topic, subsList[i])) {
431 DBG2(
"mqtt: subscribed topic " + topic);
432 pSched->publish(topic, msg,
"mqtt");
437 for (
unsigned int i = 0; i < ownedPrefixes.length(); i++) {
438 if (ownedPrefixes[i].length() <= topic.length()) {
442 if (ownedPrefixes[i] == topic.substring(0, ownedPrefixes[i].length())) {
443 topic = (
const char *)(ctopic + ownedPrefixes[i].length());
444 pSched->publish(topic, msg,
"mqtt");
450 void subsMsg(String topic, String msg, String originator) {
451 if (originator ==
"mqtt") {
457 unsigned int len = msg.length() + 1;
458 for (
unsigned int i = 0; i < outgoingBlockList.length(); i++) {
459 if (Scheduler::mqttmatch(topic, outgoingBlockList[i])) {
465 if (topic.c_str()[0] ==
'!') {
466 tpc = &(topic.c_str()[1]);
468 tpc = outDomainPrefix +
"/" + topic;
471 bool bRetain = mqttRetained;
472 if (tpc.c_str()[0] ==
'!') {
474 tpc = &(topic.c_str()[2]);
477 if (!bRetain && bStateRetained && topic ==
"mqtt/state") {
482 DBG3(
"mqtt: publishing...");
483 if (mqttClient.publish(tpc.c_str(), msg.c_str(), bRetain)) {
484 DBG2(
"mqtt publish: " + topic +
" | " + msg);
486 DBG(
"mqtt: ERROR len=" + String(len) +
", not published: " + topic +
" | " + msg);
488 DBG(
"mqtt: FATAL ERROR: you need to re-compile the PubSubClient library "
490 "increase #define MQTT_MAX_PACKET_SIZE.");
494 DBG2(
"mqtt: NO CONNECTION, not published: " + topic +
" | " + msg);
498 if (topic ==
"mqtt/state/get") {
500 }
else if (topic ==
"mqtt/config/get") {
501 pSched->publish(
"mqtt/config", outDomainPrefix +
"+" + lwTopic +
"+" + lwMsg);
502 }
else if (topic ==
"mqtt/outgoingblock/set") {
504 }
else if (topic ==
"mqtt/outgoingblock/remove") {
506 }
else if (topic ==
"mqtt/incomingblock/set") {
508 }
else if (topic ==
"mqtt/incomingblock/remove") {
510 }
else if (topic ==
"net/network") {
512 JSONVar jsonState = JSON.parse(msg);
513 if (JSON.typeof(jsonState) !=
"object") {
514 DBG(
"mqtt: Received broken network state " + msg);
517 String state = (
const char *)jsonState[
"state"];
518 String hostname = (
const char *)jsonState[
"hostname"];
519 String mac = (
const char *)jsonState[
"mac"];
520 if (state ==
"connected") {
521 DBG3(
"mqtt: received network connect");
523 DBG2(
"mqtt: net state online");
524 finalizeConfiguration(hostname, mac);
526 bCheckConnection =
true;
530 mqttConnected =
false;
532 DBG2(
"mqtt: net state offline");
537 bool configureMqttClient() {
538 if (mqttServer.length() == 0) {
539 DBG2(
"mqtt: No mqtt host defined. Ignoring configuration...");
542 bCheckConnection =
true;
543 mqttClient.setServer(mqttServer.c_str(), mqttServerPort);
546 mqttClient.setCallback([
this](
char *topic,
unsigned char *msg,
unsigned int len) {
547 this->mqttReceive(topic, msg, len);
554 void finalizeConfiguration(String &hostname, String &mac) {
556 if (hostname.length() == 0) {
557#if defined(__ESP32__)
558 String hostname = WiFi.getHostname();
560 String hostname = WiFi.hostname();
563 if (mac.length() == 0) {
564 mac = WiFi.macAddress();
566 mac.replace(
":",
"");
569 clientName = replaceVars(clientName, hostname, mac);
570 if (outDomainToken.length()) {
571 outDomainPrefix = outDomainToken +
"/" + clientName;
573 outDomainPrefix = clientName;
575 if (lwTopic.length()) {
576 lwMsg = replaceVars(lwMsg, hostname, mac);
578 lwTopic = outDomainPrefix +
"/mqtt/state";
579 lwMsg =
"disconnected";
580 bStateRetained =
true;
582 String clientPrefix = clientName +
"/";
583 String domainPrefix = domainToken +
"/";
584 ownedPrefixes.erase();
585 ownedPrefixes.add(clientPrefix);
586 ownedPrefixes.add(domainPrefix);
589 String replaceVars(String val, String &hostname, String &macAddress) {
590 val.replace(
"${hostname}", hostname);
591 val.replace(
"${mac}", macAddress);
592 val.replace(
"${macls}", macAddress.substring(6));
593 val.replace(
"${macfs}", macAddress.substring(0, 5));
munet MQTT Gateway Class
Definition: mqtt.h:79
int addSubscription(int taskID, String topic, T_SUBS subs, String originator="")
Definition: mqtt.h:236
bool removeSubscription(int subscriptionHandle, String topic)
Definition: mqtt.h:271
void begin(Scheduler *_pSched, String _mqttServer="", uint16_t _mqttServerPort=1883, bool _mqttRetained=false, String _clientName="${hostname}", String _domainToken="mu", String _outDomainToken="omu", String _mqttUsername="", String _mqttPassword="", String _willTopic="", String _willMessage="")
Definition: mqtt.h:134
bool outgoingBlockSet(String topic)
Definition: mqtt.h:290
bool outgoingBlockRemove(String topic)
Definition: mqtt.h:307
bool incomingBlockRemove(String topic)
Definition: mqtt.h:342
bool incomingBlockSet(String topic)
Definition: mqtt.h:325
Mqtt()
Definition: mqtt.h:121
The muwerk namespace.
Definition: mqtt.h:21