5#include "ustd_platform.h"
8#include "ustd_functional.h"
13#if defined(__ESP__) || defined(__ESP32__) || defined(__UNIXOID__) || defined(__RP_PICO__)
19#define SCHEDULER_MAIN 0
26 PRIO_SYSTEMCRITICAL = 0,
45#if defined(__ESP__) || defined(__ESP32__) || defined(__UNIXOID__) || defined(__RP_PICO__)
47#elif defined(__ATTINY__)
50typedef ustd::function<void()>
T_TASK;
60#if defined(__ESP__) || defined(__ESP32__) || defined(__UNIXOID__) || defined(__RP_PICO__)
61typedef std::function<void(String topic, String msg, String originator)>
T_SUBS;
62#elif defined(__ATTINY__)
63typedef void (*
T_SUBS)(String topic, String msg, String originator);
65typedef ustd::function<void(String topic, String msg, String originator)>
T_SUBS;
69 int subscriptionHandle;
81 unsigned long minMicros;
82 unsigned long lastCall;
83#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
84 unsigned long lateTime;
85 unsigned long cpuTime;
86 unsigned long callCount;
202 ustd::array<T_TASKENTRY> taskList;
203 ustd::queue<T_MSG *> msgqueue;
204 ustd::array<T_SUBSCRIPTION> subscriptionList;
205 int subscriptionHandle;
207 bool bSingleTaskMode =
false;
208 int singleTaskID = -1;
209#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
210 bool bGenStats =
false;
211 unsigned long statIntervallMs = 0;
212 unsigned long statTimer;
213 unsigned long systemTimer;
214 unsigned long systemTime = 0;
215 unsigned long appTimer;
216 unsigned long appTime = 0;
217 unsigned long mainTime = 0;
219 unsigned long upTime = 0;
220 unsigned long upTimeTicker = 0;
221 int currentTaskID = -2;
224 Scheduler(
int nTaskListSize = 2,
int queueSize = 2,
int nSubscriptionListSize = 2)
225 : taskList(nTaskListSize), msgqueue(queueSize), subscriptionList(nSubscriptionListSize) {
231 subscriptionHandle = 0;
234 upTimeTicker = micros();
235#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
239#if defined(__ESP__) && !defined(__ESP32__) && !defined(__ESP32_RISC__)
241 ESP.wdtEnable(WDTO_8S);
247 for (
unsigned int i = 0; i < taskList.length(); i++) {
248 if (taskList[i].szName !=
nullptr)
249 free(taskList[i].szName);
251 int l = msgqueue.length();
252 for (
int i = 0; i < l; i++) {
258 static bool mqttmatch(
const String pubstr,
const String substr) {
267 if (pubstr == substr)
272 const char *pub = (
const char *)pubstr.c_str();
273 const char *sub = (
const char *)substr.c_str();
275 int lp = strlen(pub);
276 int ls = strlen(sub);
280 for (
int pp = 0; pp < lp; pp++) {
281 if (pub[pp] ==
'+' || pub[pp] ==
'#') {
286 if (sub[ps] ==
'#') {
294 if (sub[ps] ==
'+') {
295 while (pp < lp && pub[pp] !=
'/')
301 }
else if (!strcmp(&sub[ps],
"/#")) {
307 if (sub[ps] ==
'+' || sub[ps] ==
'#') {
311 if (pub[pp] != sub[ps] && strcmp(&sub[ps],
"/#")) {
320 if (!strcmp(&sub[ps + 1],
"/#") || !strcmp(&sub[ps + 1],
"#") ||
321 !strcmp(&sub[ps + 1],
"+")) {
336 int getIndexFromTaskID(
int taskID) {
337 for (
unsigned int i = 0; i < taskList.length(); i++) {
338 if (taskList[i].taskID == taskID) {
345#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
346 bool schedReceive(
const char *topic,
const char *msg) {
348 p0 = topic ? topic :
"";
349 p1 = strchr(p0,
'/');
352 if (!strcmp(p1,
"stat/get")) {
353 statIntervallMs = msg ? atoi(msg) : 0;
354 if (statIntervallMs) {
367 bool publish(String topic, String msg =
"", String originator =
"") {
375#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
376 if (!strncmp(topic.c_str(),
"$SYS", 4))
377 if (schedReceive(topic.c_str(), msg.c_str()))
380 T_MSG *pMsg = (T_MSG *)malloc(
sizeof(T_MSG) +
381 (3 + originator.length() + topic.length() + msg.length()) *
384 pMsg->originator = (
char *)(&pMsg[1]);
385 pMsg->topic = pMsg->originator + ((originator.length() + 1) *
sizeof(
char));
386 pMsg->msg = pMsg->topic + ((topic.length() + 1) *
sizeof(
char));
387 strcpy(pMsg->originator, originator.c_str());
388 strcpy(pMsg->topic, topic.c_str());
389 strcpy(pMsg->msg, msg.c_str());
390 return msgqueue.push(pMsg);
409 T_SUBSCRIPTION sub = {};
412 sub.subscriptionHandle = subscriptionHandle + 1;
413 sub.topic = (
char *)malloc((topic.length() + originator.length() + 2) *
sizeof(char));
415 sub.originator = sub.topic + ((topic.length() + 1) *
sizeof(
char));
416 strcpy(sub.topic, topic.c_str());
417 strcpy(sub.originator, originator.c_str());
418 if (subscriptionList.add(sub) != -1) {
419 ++subscriptionHandle;
420 return subscriptionHandle;
436 for (
unsigned int i = 0; i < subscriptionList.length(); i++) {
437 if (subscriptionList[i].subscriptionHandle == subscriptionHandle) {
438 free(subscriptionList[i].topic);
439 subscriptionList.erase(i);
447 void checkMsgQueue() {
449 while ((pMsg = msgqueue.pop()) !=
nullptr) {
450 for (
unsigned int i = 0; i < subscriptionList.length(); i++) {
451 if (
mqttmatch(pMsg->topic, subscriptionList[i].topic)) {
452 if (*(pMsg->originator) != 0)
453 if (strcmp(subscriptionList[i].originator, pMsg->originator) == 0) {
456 subscriptionList[i].subs(pMsg->topic, pMsg->msg, pMsg->originator);
458#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
459 unsigned long startTime = micros();
460 if (subscriptionList[i].taskID != SCHEDULER_MAIN) {
461 int ind = getIndexFromTaskID(subscriptionList[i].taskID);
463 taskList[ind].cpuTime +=
timeDiff(startTime, micros());
465 mainTime +=
timeDiff(startTime, micros());
475 int add(
T_TASK task, String name,
unsigned long minMicroSecs = 100000L,
488 T_TASKENTRY taskEnt = {};
489 taskEnt.taskID = taskID + 1;
491 taskEnt.minMicros = minMicroSecs;
494 taskEnt.szName = (
char *)malloc(name.length() + 1);
495 if (!taskEnt.szName) {
498 strcpy(taskEnt.szName, name.c_str());
500 taskEnt.szName =
nullptr;
502 if (taskList.add(taskEnt) >= 0) {
506 if (taskEnt.szName) {
507 free(taskEnt.szName);
521 if (currentTaskID == taskID) {
524 for (
unsigned int i = 0; i < taskList.length(); i++) {
525 if (taskList[i].taskID == taskID) {
526 if (taskList[i].szName !=
nullptr)
527 free(taskList[i].szName);
545 for (
unsigned int i = 0; i < taskList.length(); i++) {
546 if (taskList[i].taskID == taskID) {
547 taskList[i].minMicros = minMicroSecs;
570 singleTaskID = _singleTaskID;
571 if (_singleTaskID == -1) {
572 bSingleTaskMode =
false;
574 bSingleTaskMode =
true;
579 void runTask(T_TASKENTRY *pTaskEnt) {
580 unsigned long startTime = micros();
581 unsigned long tDelta =
timeDiff(pTaskEnt->lastCall, startTime);
582 if (tDelta >= pTaskEnt->minMicros && pTaskEnt->minMicros) {
583 currentTaskID = pTaskEnt->taskID;
586 pTaskEnt->lastCall = startTime;
587#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
588 pTaskEnt->lateTime += tDelta - pTaskEnt->minMicros;
589 pTaskEnt->cpuTime +=
timeDiff(startTime, micros());
590 ++pTaskEnt->callCount;
595#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
596 void resetStats(
bool bHard =
false) {
597 for (
unsigned int i = 0; i < taskList.length(); i++) {
598 taskList[i].cpuTime = 0;
599 taskList[i].lateTime = 0;
600 taskList[i].callCount = 0;
602 statTimer = micros();
604 systemTimer = micros();
611 if (!bGenStats || !statIntervallMs)
613 unsigned long now = micros();
614 unsigned long tDelta =
timeDiff(statTimer, now);
615#ifdef USTD_FEATURE_FREE_MEMORY
616 unsigned long mem = (
unsigned long)freeMemory();
618 unsigned long mem = 0;
619#pragma message("freeMemory() is not implemented for this platform.")
621 if (tDelta > statIntervallMs * 1000) {
623 for (
unsigned int i = 0; i < taskList.length(); i++) {
626 const char *null_name =
"<null>";
627 const char *skeleton_head =
628 "{\"dt\":%ld,\"syt\":%ld,\"apt\":%ld,"
629 "\"mat\":%ld,\"upt\":%ld,\"mem\":%ld,\"tsks\":%ld,\"tdt\":[";
630 const char *skeleton_tail =
"]}";
631 const char *bone =
"[\"%s\",%ld,%ld,%ld,%ld,%ld],";
632 unsigned long memreq =
633 strlen(skeleton_head) + 7 * 7 + (strlen(bone) + 7 * 5) * taskList.length();
634 for (
unsigned int i = 0; i < taskList.length(); i++) {
635 if (taskList[i].szName ==
nullptr)
636 memreq += strlen(null_name);
638 memreq += strlen(taskList[i].szName);
641 char *jsonstr = (
char *)malloc(memreq);
642 if (jsonstr !=
nullptr) {
643 memset(jsonstr, 0, memreq);
644 sprintf(jsonstr, skeleton_head, tDelta, systemTime, appTime, mainTime, upTime, mem,
645 (
long)taskList.length());
646 for (
unsigned int i = 0; i < taskList.length(); i++) {
647 char *p = &jsonstr[strlen(jsonstr)];
648 if (taskList[i].szName ==
nullptr) {
649 sprintf(p, bone, null_name, (
long)taskList[i].taskID, taskList[i].minMicros,
650 taskList[i].callCount, taskList[i].cpuTime, taskList[i].lateTime);
652 sprintf(p, bone, taskList[i].szName, (
long)taskList[i].taskID,
653 taskList[i].minMicros, taskList[i].callCount, taskList[i].cpuTime,
654 taskList[i].lateTime);
657 char *p = &jsonstr[strlen(jsonstr)];
658 if (taskList.length() > 0)
660 strcpy(p, skeleton_tail);
672 publish(
"$SYS/stat", jsonstr,
"scheduler");
686 unsigned long current = micros();
687 if (
timeDiff(upTimeTicker, current) > 1000000L) {
689 upTimeTicker += 1000000;
691#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
692 systemTime +=
timeDiff(systemTimer, current);
695 if (!bSingleTaskMode) {
696#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
701 for (
unsigned int i = 0; i < taskList.length(); i++) {
702 if (!bSingleTaskMode) {
704 runTask(&taskList[i]);
706 if (taskList[i].taskID == singleTaskID) {
707 runTask(&taskList[i]);
710#if defined(__ESP__) && !defined(__ESP32__)
711 appTime +=
timeDiff(appTimer, micros());
712 systemTimer = micros();
714 systemTime +=
timeDiff(systemTimer, micros());
718#if USTD_FEATURE_MEMORY > USTD_FEATURE_MEM_512B
719 appTime +=
timeDiff(appTimer, micros());
720 systemTimer = micros();
722#if defined(__ESP__) && !defined(__ESP32__) && !defined(__ESP32_RISC__)
muwerk Console Class
Definition console.h:88
muwerk Scheduler Class
Definition scheduler.h:199
Scheduler(int nTaskListSize=2, int queueSize=2, int nSubscriptionListSize=2)
Definition scheduler.h:224
void singleTaskMode(int _singleTaskID)
Definition scheduler.h:562
int add(T_TASK task, String name, unsigned long minMicroSecs=100000L, T_PRIO prio=PRIO_NORMAL)
Definition scheduler.h:475
static bool mqttmatch(const String pubstr, const String substr)
Definition scheduler.h:258
unsigned long getUptime()
Definition scheduler.h:554
bool publish(String topic, String msg="", String originator="")
Definition scheduler.h:367
int subscribe(int taskID, String topic, T_SUBS subs, String originator="")
Definition scheduler.h:395
bool reschedule(int taskID, unsigned long minMicroSecs=100000L, T_PRIO prio=PRIO_NORMAL)
Definition scheduler.h:535
void loop()
Definition scheduler.h:681
bool unsubscribe(int subscriptionHandle)
Definition scheduler.h:428
bool remove(int taskID)
Definition scheduler.h:512
The muwerk namespace.
Definition console.h:15
T_MSGTYPE
Scheduler Message Type.
Definition scheduler.h:35
std::function< void()> T_TASK
Scheduler Task Function.
Definition scheduler.h:46
std::function< void(String topic, String msg, String originator)> T_SUBS
Scheduler Subscription Function.
Definition scheduler.h:61
T_PRIO
Scheduler Task Priority.
Definition scheduler.h:25
@ PRIO_TIMECRITICAL
System critical priority.
Definition scheduler.h:27
@ PRIO_NORMAL
High Priority.
Definition scheduler.h:29
@ PRIO_LOW
Standard Priority (default for all tasks)
Definition scheduler.h:30
@ PRIO_LOWEST
Low priority.
Definition scheduler.h:31
@ PRIO_HIGH
Time critical priority.
Definition scheduler.h:28
unsigned long timeDiff(unsigned long first, unsigned long second)
Definition muwerk.h:44