munet Network Library for muwerk Scheduler
A muwerk network library supporting WiFi, NTP, OTA and MQTT for ESP8266 and ESP32 and serial links for all platforms
Loading...
Searching...
No Matches
muserial.h
1// muserial.h
2#pragma once
3
4#include "ustd_platform.h"
5#include "ustd_array.h"
6#include "ustd_map.h"
7
8#include "scheduler.h"
9//#include <Arduino_JSON.h>
10
11#ifdef __ATTINY__
12#define HardwareSerial TinySoftwareSerial
13#endif
14
15namespace ustd {
16
76class MuSerial {
77 private:
78 Scheduler *pSched;
79 int tID;
80
81 String name;
82 HardwareSerial *pSerial;
83 unsigned long baudRate;
84 uint8_t connectionLed;
85 unsigned long ledTimer = 0;
86
87 enum LinkState { SYNC, HEADER, MSG, MUCRC };
88 bool bCheckLink = false;
89 uint8_t blockNum = 0;
90 LinkState linkState;
91 unsigned long lastRead = 0;
92 unsigned long lastMsg = 0;
93 unsigned long lastPingSent = 0;
94 bool linkConnected = false;
95 unsigned long readTimeout = 5; // sec
96 unsigned long pingReceiveTimeout = 10; // sec
97 unsigned long pingPeriod = 5; // sec
98 String remoteName = "";
99 ustd::array<String> outgoingBlockList;
100 ustd::array<String> incomingBlockList;
101
102 const uint8_t SOH = 0x01, STX = 0x02, ETX = 0x03, EOT = 0x04;
103 const uint8_t VER = 0x01;
104
106 enum LinkCmd {
107 MUPING,
109 MQTT
110 };
111
113 typedef struct t_header {
114 uint8_t soh;
115 uint8_t ver;
116 uint8_t num;
117 uint8_t cmd;
118 uint8_t hLen;
119 uint8_t lLen;
120 uint8_t stx;
121 uint8_t pad;
122 } T_HEADER;
123
125 typedef struct t_footer {
126 uint8_t etx;
127 uint8_t pad2;
128 uint8_t crc;
130 uint8_t eot;
131 } T_FOOTER;
132
133 public:
134 bool activeLogic = false;
137 200;
138
139 MuSerial(String name, HardwareSerial *pSerial, unsigned long baudRate = 115200,
140 uint8_t connectionLed = -1)
141 : name(name), pSerial(pSerial), baudRate(baudRate), connectionLed(connectionLed) {
151 }
152
153 ~MuSerial() {
154 }
155
156 void begin(Scheduler *_pSched) {
161 pSched = _pSched;
162 pSerial->begin(baudRate);
163#ifdef __ARDUINO__
164 while (!*pSerial) {
165 }
166#endif
167
168 auto ft = [=]() { this->loop(); };
169 tID = pSched->add(ft, "serlink", 20000L); // check every 20ms
170 auto fnall = [=](String topic, String msg, String originator) {
171 this->subsMsg(topic, msg, originator);
172 };
173 pSched->subscribe(tID, "#", fnall);
174 bCheckLink = true;
175 linkState = SYNC;
176 if (connectionLed != -1) {
177 pinMode(connectionLed, OUTPUT);
178 digitalWrite(connectionLed, activeLogic);
179 }
180 ping();
181 }
182
183 bool outgoingBlockSet(String topic) {
192 for (unsigned int i = 0; i < outgoingBlockList.length(); i++) {
193 if (outgoingBlockList[i] == topic)
194 return false;
195 }
196 if (outgoingBlockList.add(topic) == -1)
197 return false;
198 return true;
199 }
200
201 bool outgoingBlockRemove(String topic) {
210 for (unsigned int i = 0; i < outgoingBlockList.length(); i++) {
211 if (outgoingBlockList[i] == topic) {
212 if (!outgoingBlockList.erase(i))
213 return false;
214 return true;
215 }
216 }
217 return false;
218 }
219
220 bool incomingBlockSet(String topic) {
228 for (unsigned int i = 0; i < incomingBlockList.length(); i++) {
229 if (incomingBlockList[i] == topic)
230 return false;
231 }
232 if (incomingBlockList.add(topic) == -1)
233 return false;
234 return true;
235 }
236
237 bool incomingBlockRemove(String topic) {
246 for (unsigned int i = 0; i < incomingBlockList.length(); i++) {
247 if (incomingBlockList[i] == topic) {
248 if (!incomingBlockList.erase(i))
249 return false;
250 return true;
251 }
252 }
253 return false;
254 }
255
256 private:
257 unsigned char crc(const unsigned char *buf, unsigned int len, unsigned char init = 0) {
258 unsigned char c = init;
259 for (unsigned int i = 0; i < len; i++)
260 c = c ^ buf[i];
261 return c;
262 }
263
264 void ping() {
265 char strTime[16];
266#if defined(__ARDUINO__) || defined(__ARM__) || defined(__RISC_V__)
267 ltoa(pSched->getUptime(), strTime, 15);
268#else
269 ltoa(time(nullptr), strTime, 15);
270#endif
271 sendOut(strTime, name, LinkCmd::MUPING);
272 lastPingSent = pSched->getUptime();
273 }
274
275 void handleTime(uint64_t remoteTime) {
276 // XXX do maybe something?
277 }
278
279 void sendOut(String topic, String msg, LinkCmd cmd = LinkCmd::MQTT) {
280 T_HEADER th = {};
281 T_FOOTER tf = {};
282 unsigned char ccrc;
283 unsigned char nul = 0x0;
284
285 // Serial.println("Sending " + topic + ", " + msg);
286 th.soh = SOH;
287 th.ver = VER;
288 th.num = blockNum++;
289 th.cmd = cmd;
290 unsigned int len = topic.length() + msg.length() + 2;
291 th.hLen = len / 256;
292 th.lLen = len % 256;
293 th.stx = STX;
294
295 tf.etx = ETX;
296 tf.eot = EOT;
297
298 ccrc = crc((const uint8_t *)&(th.ver), sizeof(th) - 1);
299 ccrc = crc((const uint8_t *)topic.c_str(), topic.length(), ccrc);
300 ccrc = crc((const uint8_t *)&nul, 1, ccrc);
301 ccrc = crc((const uint8_t *)msg.c_str(), msg.length(), ccrc);
302 ccrc = crc((const uint8_t *)&nul, 1, ccrc);
303 ccrc = crc((const uint8_t *)&tf, 2, ccrc);
304
305 tf.crc = ccrc;
306
307 pSerial->write((unsigned char *)&th, sizeof(th));
308 pSerial->write((unsigned char *)topic.c_str(), topic.length());
309 pSerial->write((uint8_t)nul);
310 pSerial->write((unsigned char *)msg.c_str(), msg.length());
311 pSerial->write((uint8_t)nul);
312 pSerial->write((unsigned char *)&tf, sizeof(tf));
313 }
314
315 private:
316 T_HEADER hd;
317 unsigned char *pHd;
318 uint16_t hLen;
319 uint16_t msgLen, curMsg;
320 unsigned char *msgBuf = nullptr;
321 bool allocated = false;
322 T_FOOTER fo;
323 unsigned char *pFo;
324 uint16_t cLen;
325
326 bool internalPub(String topic, String msg) {
327 for (unsigned int i = 0; i < incomingBlockList.length(); i++) {
328 if (Scheduler::mqttmatch(topic, incomingBlockList[i])) {
329 return false;
330 }
331 }
332
333 // Serial.println("In: " + topic);
334 String pre2 = remoteName + "/";
335 String pre1 = name + "/";
336 if (topic.substring(0, pre1.length()) == pre1) {
337 topic = topic.substring(pre1.length());
338 }
339 if (topic.substring(0, pre2.length()) == pre1) {
340 topic = topic.substring(pre2.length());
341 }
342
343 // Serial.println("InPub: " + topic);
344 pSched->publish(topic, msg, remoteName);
345 return true;
346 }
347
348 bool ld = false;
349 void loop() {
350 unsigned char ccrc;
351 unsigned char c;
352 if (bCheckLink) {
353 if (ledTimer) {
354 if (timeDiff(ledTimer, millis()) > connectionLedBlinkDurationMs) {
355 ledTimer = 0;
356 if (connectionLed != -1) {
357 digitalWrite(connectionLed, activeLogic);
358 }
359 }
360 }
361 if (pSched->getUptime() - lastPingSent > pingPeriod) {
362 ping();
363 }
364 while (pSerial->available() > 0) {
365 c = pSerial->read();
366 lastRead = pSched->getUptime();
367 switch (linkState) {
368 case SYNC:
369 if (c == SOH) {
370 hd.soh = SOH;
371 linkState = HEADER;
372 pHd = (unsigned char *)&hd;
373 hLen = 1;
374 }
375 continue;
376 break;
377 case HEADER:
378 pHd[hLen] = c;
379 hLen++;
380 if (hLen == sizeof(hd)) {
381 // XXX: check block number
382 if (hd.ver != VER || hd.stx != STX) {
383 linkState = SYNC;
384 } else {
385 msgLen = 256 * hd.hLen + hd.lLen;
386 if (msgLen < 1024) {
387 msgBuf = (unsigned char *)malloc(msgLen);
388 curMsg = 0;
389 if (msgBuf) {
390 linkState = MSG;
391 allocated = true;
392 } else {
393 linkState = SYNC;
394 }
395 } else {
396 linkState = SYNC;
397 }
398 }
399 }
400 continue;
401 break;
402 case MSG:
403 msgBuf[curMsg] = c;
404 ++curMsg;
405 if (curMsg == msgLen) {
406 linkState = MUCRC;
407 pFo = (unsigned char *)&fo;
408 cLen = 0;
409 }
410 continue;
411 break;
412 case MUCRC:
413 pFo[cLen] = c;
414 ++cLen;
415 if (cLen == sizeof(fo)) {
416 if (fo.etx != ETX || fo.eot != EOT) {
417 if (allocated && msgBuf != nullptr) {
418 free(msgBuf);
419 msgBuf = nullptr;
420 allocated = false;
421 }
422 linkState = SYNC;
423 continue;
424 } else {
425 ccrc = crc((const uint8_t *)&(hd.ver), sizeof(hd) - 1);
426 ccrc = crc(msgBuf, msgLen, ccrc);
427 ccrc = crc((const uint8_t *)&fo, 2, ccrc);
428 if (ccrc != fo.crc) {
429 if (allocated && msgBuf != nullptr) {
430 free(msgBuf);
431 msgBuf = nullptr;
432 allocated = false;
433 }
434 linkState = SYNC;
435 continue;
436 } else {
437 uint64_t remoteTime = 0;
438 // Serial.println("Msg received");
439 if (strlen((const char *)msgBuf) + 2 <= msgLen) {
440 lastMsg = pSched->getUptime();
441 const char *pM =
442 (const char *)&msgBuf[strlen((const char *)msgBuf) + 1];
443 if (strlen(pM) + strlen((const char *)msgBuf) + 2 <= msgLen) {
444 switch ((LinkCmd)hd.cmd) {
445 case LinkCmd::MUPING:
446 remoteName = pM;
447 // XXX: Y2031?
448 remoteTime = atol((const char *)msgBuf);
449 handleTime(remoteTime);
450 lastMsg = pSched->getUptime();
451 if (connectionLed != -1) {
452 digitalWrite(connectionLed, !activeLogic);
453 ledTimer = millis();
454 }
455 if (!linkConnected) {
456 linkConnected = true;
457 pSched->publish(name + "/link/" + remoteName,
458 "connected", name);
459 }
460 break;
461 case LinkCmd::MQTT:
462 internalPub((const char *)msgBuf, pM);
463 break;
464 }
465 }
466 }
467 if (allocated && msgBuf != nullptr) {
468 free(msgBuf);
469 msgBuf = nullptr;
470 allocated = false;
471 }
472 linkState = SYNC;
473 }
474 }
475 }
476 continue;
477 break;
478 }
479 }
480 if (linkConnected || linkState != SYNC) {
481 if (linkState != SYNC) {
482 if ((unsigned long)(pSched->getUptime() - lastRead) > readTimeout) {
483 linkState = SYNC;
484 if (linkConnected) {
485 pSched->publish(name + "/link/" + remoteName, "disconnected", name);
486 }
487 linkConnected = false;
488 if (allocated && msgBuf != nullptr) {
489 free(msgBuf);
490 msgBuf = nullptr;
491 allocated = false;
492 }
493 }
494 } else {
495 if ((unsigned long)(pSched->getUptime() - lastMsg) > pingReceiveTimeout) {
496 if (linkConnected) {
497 pSched->publish(name + "/link/" + remoteName, "disconnected", name);
498 }
499 linkConnected = false;
500 }
501 }
502 }
503 }
504 }
505
506 void subsMsg(String topic, String msg, String originator) {
507 if (originator == remoteName) {
508 // prevent loops;
509 // Serial.println("Loop prevented: " + topic + " - " + msg + " from: " + originator);
510 return;
511 }
512 // Serial.println("MQ-in: " + topic + " - " + msg + " from: " + originator);
513 for (unsigned int i = 0; i < outgoingBlockList.length(); i++) {
514 if (Scheduler::mqttmatch(topic, outgoingBlockList[i])) {
515 // Serial.println("blocked: " + topic + " - " + msg + " from: " + originator);
516 return;
517 }
518 }
519 String pre = remoteName + "/";
520 if (topic.substring(0, pre.length()) == pre) {
521 sendOut(topic, msg);
522 } else {
523 sendOut(remoteName + "/" + topic, msg);
524 }
525 };
526};
527
528} // namespace ustd
munet MuSerial Class
Definition: muserial.h:76
unsigned long connectionLedBlinkDurationMs
milli-secs the connectionLed is flashed on receiving a ping.
Definition: muserial.h:136
void begin(Scheduler *_pSched)
Definition: muserial.h:156
bool outgoingBlockRemove(String topic)
Definition: muserial.h:201
bool incomingBlockRemove(String topic)
Definition: muserial.h:237
MuSerial(String name, HardwareSerial *pSerial, unsigned long baudRate=115200, uint8_t connectionLed=-1)
Definition: muserial.h:139
bool incomingBlockSet(String topic)
Definition: muserial.h:220
bool activeLogic
Definition: muserial.h:134
bool outgoingBlockSet(String topic)
Definition: muserial.h:183
The muwerk namespace.
Definition: mqtt.h:21