muwerk Scheduler Library
A low-resource cooperative scheduler with MQTT-like queues for Arduinos, ATtiny up to ESP32
Loading...
Searching...
No Matches
scheduler.h
1// scheduler.h - the muwerk simple scheduler
2
3#pragma once
4
5#include "ustd_platform.h"
6#include "ustd_array.h"
7#include "ustd_queue.h"
8#include "ustd_functional.h"
9#include "muwerk.h"
10
11#include <stdio.h>
12
13#if defined(__ESP__) || defined(__ESP32__) || defined(__UNIXOID__) || defined(__RP_PICO__)
14#include <functional>
15#endif
16
17namespace ustd {
18
19#define SCHEDULER_MAIN 0
20
25enum T_PRIO {
26 PRIO_SYSTEMCRITICAL = 0,
31 PRIO_LOWEST = 5
32};
33
36 MSG_NONE = 0,
37 MSG_DIRECT = 1,
38 MSG_SUBSCRIBE = 2,
39 MSG_UNSUBSCRIBE = 3,
40 MSG_PUBLISH = 4,
41 MSG_PUBLISHRAW = 5
42};
43
45#if defined(__ESP__) || defined(__ESP32__) || defined(__UNIXOID__) || defined(__RP_PICO__)
46typedef std::function<void()> T_TASK;
47#elif defined(__ATTINY__)
48typedef void (*T_TASK)();
49#else
50typedef ustd::function<void()> T_TASK;
51#endif
52
53typedef struct {
54 char *originator;
55 char *topic;
56 char *msg;
57} T_MSG;
58
60#if defined(__ESP__) || defined(__ESP32__) || defined(__UNIXOID__) || defined(__RP_PICO__)
61typedef std::function<void(String topic, String msg, String originator)> T_SUBS;
62#elif defined(__ATTINY__)
63typedef void (*T_SUBS)(String topic, String msg, String originator);
64#else
65typedef ustd::function<void(String topic, String msg, String originator)> T_SUBS;
66#endif
67
68typedef struct {
69 int subscriptionHandle;
70 int taskID;
71 char *originator;
72 char *topic;
73 T_SUBS subs;
74} T_SUBSCRIPTION;
75
76typedef struct {
77 int taskID;
78 char *szName;
79 T_TASK task;
80 T_PRIO prio;
81 unsigned long minMicros;
82 unsigned long lastCall;
83#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
84 unsigned long lateTime;
85 unsigned long cpuTime;
86 unsigned long callCount;
87#endif
88} T_TASKENTRY;
89
90// forward declaration
91class Console;
92
200 private:
201 friend class Console;
202 ustd::array<T_TASKENTRY> taskList;
203 ustd::queue<T_MSG *> msgqueue;
204 ustd::array<T_SUBSCRIPTION> subscriptionList;
205 int subscriptionHandle;
206 int taskID;
207 bool bSingleTaskMode = false;
208 int singleTaskID = -1;
209#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
210 bool bGenStats = false;
211 unsigned long statIntervallMs = 0;
212 unsigned long statTimer;
213 unsigned long systemTimer;
214 unsigned long systemTime = 0;
215 unsigned long appTimer;
216 unsigned long appTime = 0;
217 unsigned long mainTime = 0; // Time spent with SCHEDULER_MAIN id.
218#endif
219 unsigned long upTime = 0; // Seconds system is running
220 unsigned long upTimeTicker = 0;
221 int currentTaskID = -2; // TaskID that is currently been executed
222
223 public:
224 Scheduler(int nTaskListSize = 2, int queueSize = 2, int nSubscriptionListSize = 2)
225 : taskList(nTaskListSize), msgqueue(queueSize), subscriptionList(nSubscriptionListSize) {
231 subscriptionHandle = 0;
232 taskID = 0; // 0 is SCHEDULER_MAIN
233 upTime = 0;
234 upTimeTicker = micros();
235#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
236 resetStats(true);
237#endif
238
239#if defined(__ESP__) && !defined(__ESP32__) && !defined(__ESP32_RISC__)
240 ESP.wdtDisable();
241 ESP.wdtEnable(WDTO_8S);
242#endif
243 }
244
245#ifndef __ATTINY__
246 virtual ~Scheduler() {
247 for (unsigned int i = 0; i < taskList.length(); i++) {
248 if (taskList[i].szName != nullptr)
249 free(taskList[i].szName);
250 }
251 int l = msgqueue.length();
252 for (int i = 0; i < l; i++) {
253 msgqueue.pop();
254 }
255 }
256#endif
257
258 static bool mqttmatch(const String pubstr, const String substr) {
267 if (pubstr == substr)
268 return true;
269
270 // Attiny compile: core needs to be extended, add c_str():
271 // In WString.h add: const char *c_str() const { return buffer; }
272 const char *pub = (const char *)pubstr.c_str();
273 const char *sub = (const char *)substr.c_str();
274
275 int lp = strlen(pub);
276 int ls = strlen(sub);
277
278 bool wPos = true; // sub wildcard is legal now
279 int ps = 0;
280 for (int pp = 0; pp < lp; pp++) {
281 if (pub[pp] == '+' || pub[pp] == '#') {
282 return false; // Illegal wildcards in pub
283 }
284 if (wPos) {
285 wPos = false;
286 if (sub[ps] == '#') {
287 if (ps == ls - 1) {
288 return true;
289 } else {
290 return false; // In sub, # must not be followed by
291 // anything else
292 }
293 }
294 if (sub[ps] == '+') {
295 while (pp < lp && pub[pp] != '/')
296 ++pp;
297 ++ps;
298 if (pp == lp) {
299 if (ps == ls) {
300 return true;
301 } else if (!strcmp(&sub[ps], "/#")) {
302 return true;
303 }
304 }
305 }
306 } else {
307 if (sub[ps] == '+' || sub[ps] == '#') {
308 return false; // Illegal wildcard-position
309 }
310 }
311 if (pub[pp] != sub[ps] && strcmp(&sub[ps], "/#")) {
312 return false;
313 }
314 if (pub[pp] == '/')
315 wPos = true;
316 if (pp == lp - 1) {
317 if (ps == ls - 1) {
318 return true;
319 }
320 if (!strcmp(&sub[ps + 1], "/#") || !strcmp(&sub[ps + 1], "#") ||
321 !strcmp(&sub[ps + 1], "+")) {
322 return true;
323 }
324 return false;
325 }
326 ++ps;
327 }
328 if (ps == ls) {
329 return true;
330 } else {
331 return false;
332 }
333 }
334
335 private:
336 int getIndexFromTaskID(int taskID) {
337 for (unsigned int i = 0; i < taskList.length(); i++) {
338 if (taskList[i].taskID == taskID) {
339 return i;
340 }
341 }
342 return -1;
343 }
344
345#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
346 bool schedReceive(const char *topic, const char *msg) {
347 const char *p0, *p1;
348 p0 = topic ? topic : "";
349 p1 = strchr(p0, '/');
350 if (p1) {
351 ++p1;
352 if (!strcmp(p1, "stat/get")) {
353 statIntervallMs = msg ? atoi(msg) : 0;
354 if (statIntervallMs) {
355 bGenStats = true;
356 resetStats(true);
357 } else
358 bGenStats = false;
359 return true;
360 }
361 }
362 return false;
363 }
364#endif
365
366 public:
367 bool publish(String topic, String msg = "", String originator = "") {
375#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
376 if (!strncmp(topic.c_str(), "$SYS", 4))
377 if (schedReceive(topic.c_str(), msg.c_str()))
378 return true;
379#endif
380 T_MSG *pMsg = (T_MSG *)malloc(sizeof(T_MSG) +
381 (3 + originator.length() + topic.length() + msg.length()) *
382 sizeof(char));
383 if (pMsg) {
384 pMsg->originator = (char *)(&pMsg[1]);
385 pMsg->topic = pMsg->originator + ((originator.length() + 1) * sizeof(char));
386 pMsg->msg = pMsg->topic + ((topic.length() + 1) * sizeof(char));
387 strcpy(pMsg->originator, originator.c_str());
388 strcpy(pMsg->topic, topic.c_str());
389 strcpy(pMsg->msg, msg.c_str());
390 return msgqueue.push(pMsg);
391 }
392 return false;
393 }
394
395 int subscribe(int taskID, String topic, T_SUBS subs, String originator = "") {
409 T_SUBSCRIPTION sub = {};
410 sub.taskID = taskID;
411 sub.subs = subs;
412 sub.subscriptionHandle = subscriptionHandle + 1;
413 sub.topic = (char *)malloc((topic.length() + originator.length() + 2) * sizeof(char));
414 if (sub.topic) {
415 sub.originator = sub.topic + ((topic.length() + 1) * sizeof(char));
416 strcpy(sub.topic, topic.c_str());
417 strcpy(sub.originator, originator.c_str());
418 if (subscriptionList.add(sub) != -1) {
419 ++subscriptionHandle;
420 return subscriptionHandle;
421 }
422 // free up memory
423 free(sub.topic);
424 }
425 return -1;
426 }
427
428 bool unsubscribe(int subscriptionHandle) {
436 for (unsigned int i = 0; i < subscriptionList.length(); i++) {
437 if (subscriptionList[i].subscriptionHandle == subscriptionHandle) {
438 free(subscriptionList[i].topic);
439 subscriptionList.erase(i);
440 return true;
441 }
442 }
443 return false;
444 }
445
446 private:
447 void checkMsgQueue() {
448 T_MSG *pMsg;
449 while ((pMsg = msgqueue.pop()) != nullptr) {
450 for (unsigned int i = 0; i < subscriptionList.length(); i++) {
451 if (mqttmatch(pMsg->topic, subscriptionList[i].topic)) {
452 if (*(pMsg->originator) != 0)
453 if (strcmp(subscriptionList[i].originator, pMsg->originator) == 0) {
454 continue;
455 }
456 subscriptionList[i].subs(pMsg->topic, pMsg->msg, pMsg->originator);
457
458#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
459 unsigned long startTime = micros();
460 if (subscriptionList[i].taskID != SCHEDULER_MAIN) {
461 int ind = getIndexFromTaskID(subscriptionList[i].taskID);
462 if (ind != -1)
463 taskList[ind].cpuTime += timeDiff(startTime, micros());
464 } else {
465 mainTime += timeDiff(startTime, micros());
466 }
467#endif
468 }
469 }
470 free(pMsg);
471 }
472 }
473
474 public:
475 int add(T_TASK task, String name, unsigned long minMicroSecs = 100000L,
476 T_PRIO prio = PRIO_NORMAL) {
488 T_TASKENTRY taskEnt = {};
489 taskEnt.taskID = taskID + 1;
490 taskEnt.task = task;
491 taskEnt.minMicros = minMicroSecs;
492 taskEnt.prio = prio;
493 if (name.length()) {
494 taskEnt.szName = (char *)malloc(name.length() + 1);
495 if (!taskEnt.szName) {
496 return -1;
497 }
498 strcpy(taskEnt.szName, name.c_str());
499 } else {
500 taskEnt.szName = nullptr;
501 }
502 if (taskList.add(taskEnt) >= 0) {
503 ++taskID;
504 return taskID;
505 }
506 if (taskEnt.szName) {
507 free(taskEnt.szName);
508 }
509 return -1;
510 }
511
512 bool remove(int taskID) {
521 if (currentTaskID == taskID) {
522 return false; // A task can't delete itself.
523 }
524 for (unsigned int i = 0; i < taskList.length(); i++) {
525 if (taskList[i].taskID == taskID) {
526 if (taskList[i].szName != nullptr)
527 free(taskList[i].szName);
528 taskList.erase(i);
529 return true;
530 }
531 }
532 return false;
533 }
534
535 bool reschedule(int taskID, unsigned long minMicroSecs = 100000L, T_PRIO prio = PRIO_NORMAL) {
545 for (unsigned int i = 0; i < taskList.length(); i++) {
546 if (taskList[i].taskID == taskID) {
547 taskList[i].minMicros = minMicroSecs;
548 return true;
549 }
550 }
551 return false;
552 }
553
554 unsigned long getUptime() {
559 return upTime;
560 }
561
562 void singleTaskMode(int _singleTaskID) {
570 singleTaskID = _singleTaskID;
571 if (_singleTaskID == -1) {
572 bSingleTaskMode = false;
573 } else {
574 bSingleTaskMode = true;
575 }
576 }
577
578 private:
579 void runTask(T_TASKENTRY *pTaskEnt) {
580 unsigned long startTime = micros();
581 unsigned long tDelta = timeDiff(pTaskEnt->lastCall, startTime);
582 if (tDelta >= pTaskEnt->minMicros && pTaskEnt->minMicros) {
583 currentTaskID = pTaskEnt->taskID; // prevent task() to delete itself.
584 pTaskEnt->task();
585 currentTaskID = -2;
586 pTaskEnt->lastCall = startTime;
587#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
588 pTaskEnt->lateTime += tDelta - pTaskEnt->minMicros;
589 pTaskEnt->cpuTime += timeDiff(startTime, micros());
590 ++pTaskEnt->callCount;
591#endif
592 }
593 }
594
595#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
596 void resetStats(bool bHard = false) {
597 for (unsigned int i = 0; i < taskList.length(); i++) {
598 taskList[i].cpuTime = 0;
599 taskList[i].lateTime = 0;
600 taskList[i].callCount = 0;
601 }
602 statTimer = micros();
603 if (bHard)
604 systemTimer = micros();
605 systemTime = 0;
606 appTime = 0;
607 mainTime = 0;
608 }
609
610 void checkStats() {
611 if (!bGenStats || !statIntervallMs)
612 return;
613 unsigned long now = micros();
614 unsigned long tDelta = timeDiff(statTimer, now);
615#ifdef USTD_FEATURE_FREE_MEMORY
616 unsigned long mem = (unsigned long)freeMemory();
617#else
618 unsigned long mem = 0;
619#pragma message("freeMemory() is not implemented for this platform.")
620#endif
621 if (tDelta > statIntervallMs * 1000) {
622 // local stats
623 for (unsigned int i = 0; i < taskList.length(); i++) {
624 }
625 // mqtt stats
626 const char *null_name = "<null>";
627 const char *skeleton_head =
628 "{\"dt\":%ld,\"syt\":%ld,\"apt\":%ld,"
629 "\"mat\":%ld,\"upt\":%ld,\"mem\":%ld,\"tsks\":%ld,\"tdt\":[";
630 const char *skeleton_tail = "]}";
631 const char *bone = "[\"%s\",%ld,%ld,%ld,%ld,%ld],";
632 unsigned long memreq =
633 strlen(skeleton_head) + 7 * 7 + (strlen(bone) + 7 * 5) * taskList.length();
634 for (unsigned int i = 0; i < taskList.length(); i++) {
635 if (taskList[i].szName == nullptr)
636 memreq += strlen(null_name);
637 else
638 memreq += strlen(taskList[i].szName);
639 }
640 memreq += 1;
641 char *jsonstr = (char *)malloc(memreq);
642 if (jsonstr != nullptr) {
643 memset(jsonstr, 0, memreq);
644 sprintf(jsonstr, skeleton_head, tDelta, systemTime, appTime, mainTime, upTime, mem,
645 (long)taskList.length());
646 for (unsigned int i = 0; i < taskList.length(); i++) {
647 char *p = &jsonstr[strlen(jsonstr)];
648 if (taskList[i].szName == nullptr) {
649 sprintf(p, bone, null_name, (long)taskList[i].taskID, taskList[i].minMicros,
650 taskList[i].callCount, taskList[i].cpuTime, taskList[i].lateTime);
651 } else {
652 sprintf(p, bone, taskList[i].szName, (long)taskList[i].taskID,
653 taskList[i].minMicros, taskList[i].callCount, taskList[i].cpuTime,
654 taskList[i].lateTime);
655 }
656 }
657 char *p = &jsonstr[strlen(jsonstr)];
658 if (taskList.length() > 0)
659 --p; // no final ','
660 strcpy(p, skeleton_tail);
661
662 //#ifdef USE_SERIAL_DBG
663 // Serial.print(memreq);
664 // Serial.print(" ");
665 // Serial.println(strlen(jsonstr));
666 // Serial.println(jsonstr);
667 // char brz[50];
668 // sprintf(brz, "%ld,%ld", memreq, strlen(jsonstr));
669 // String msg = "JSONMEM: " + String(brz);
670 // publish("$SYS/stat", msg, "scheduler-dbg");
671 //#endif
672 publish("$SYS/stat", jsonstr, "scheduler");
673 free(jsonstr);
674 }
675 resetStats(false);
676 }
677 }
678#endif
679
680 public:
681 void loop() {
686 unsigned long current = micros();
687 if (timeDiff(upTimeTicker, current) > 1000000L) {
688 upTime += 1;
689 upTimeTicker += 1000000;
690 }
691#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
692 systemTime += timeDiff(systemTimer, current);
693 appTimer = current;
694#endif
695 if (!bSingleTaskMode) {
696#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
697 checkStats();
698#endif
699 checkMsgQueue();
700 }
701 for (unsigned int i = 0; i < taskList.length(); i++) {
702 if (!bSingleTaskMode) {
703 checkMsgQueue();
704 runTask(&taskList[i]);
705 } else {
706 if (taskList[i].taskID == singleTaskID) {
707 runTask(&taskList[i]);
708 }
709 }
710#if defined(__ESP__) && !defined(__ESP32__)
711 appTime += timeDiff(appTimer, micros());
712 systemTimer = micros();
713 yield();
714 systemTime += timeDiff(systemTimer, micros());
715 appTimer = micros();
716#endif
717 }
718#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
719 appTime += timeDiff(appTimer, micros());
720 systemTimer = micros();
721#endif
722#if defined(__ESP__) && !defined(__ESP32__) && !defined(__ESP32_RISC__)
723 ESP.wdtFeed();
724#endif
725 }
726}; // namespace ustd
727} // namespace ustd
muwerk Console Class
Definition console.h:88
muwerk Scheduler Class
Definition scheduler.h:199
Scheduler(int nTaskListSize=2, int queueSize=2, int nSubscriptionListSize=2)
Definition scheduler.h:224
void singleTaskMode(int _singleTaskID)
Definition scheduler.h:562
int add(T_TASK task, String name, unsigned long minMicroSecs=100000L, T_PRIO prio=PRIO_NORMAL)
Definition scheduler.h:475
static bool mqttmatch(const String pubstr, const String substr)
Definition scheduler.h:258
unsigned long getUptime()
Definition scheduler.h:554
bool publish(String topic, String msg="", String originator="")
Definition scheduler.h:367
int subscribe(int taskID, String topic, T_SUBS subs, String originator="")
Definition scheduler.h:395
bool reschedule(int taskID, unsigned long minMicroSecs=100000L, T_PRIO prio=PRIO_NORMAL)
Definition scheduler.h:535
void loop()
Definition scheduler.h:681
bool unsubscribe(int subscriptionHandle)
Definition scheduler.h:428
bool remove(int taskID)
Definition scheduler.h:512
The muwerk namespace.
Definition console.h:15
T_MSGTYPE
Scheduler Message Type.
Definition scheduler.h:35
std::function< void()> T_TASK
Scheduler Task Function.
Definition scheduler.h:46
std::function< void(String topic, String msg, String originator)> T_SUBS
Scheduler Subscription Function.
Definition scheduler.h:61
T_PRIO
Scheduler Task Priority.
Definition scheduler.h:25
@ PRIO_TIMECRITICAL
System critical priority.
Definition scheduler.h:27
@ PRIO_NORMAL
High Priority.
Definition scheduler.h:29
@ PRIO_LOW
Standard Priority (default for all tasks)
Definition scheduler.h:30
@ PRIO_LOWEST
Low priority.
Definition scheduler.h:31
@ PRIO_HIGH
Time critical priority.
Definition scheduler.h:28
unsigned long timeDiff(unsigned long first, unsigned long second)
Definition muwerk.h:44