* Copyright (C) 2026 Xiaomi Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* This file contains code derived from MimiClaw (https://github.com/memovai/mimiclaw)
* Copyright (c) 2026 Ziboyan Wang, licensed under the MIT License.
* See NOTICE file for the original MIT License terms.
*/
#include "infra/cron_service.h"
#include "core/message_bus.h"
#include "tools/tool_registry.h"
#include "agent_config.h"
#include "cJSON.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
static const char* TAG = "cron";
#define MAX_CRON_JOBS AGENT_CRON_MAX_JOBS
static bool cron_sanitize_destination(cron_job_t* job);
static void cron_generate_id(char* id_buf);
static int cron_load_jobs(void);
static int cron_save_jobs(void);
static int cron_parse_job_item(cJSON* item, cron_job_t* job);
static void cron_fire_job(cron_job_t* job, time_t now);
static void cron_process_due_jobs(void);
static void compute_initial_next_run(cron_job_t* job);
static void* cron_task_main(void* arg);
static cron_job_t s_jobs[MAX_CRON_JOBS];
static int s_job_count;
static volatile bool s_cron_running;
static pthread_mutex_t s_cron_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t s_cron_wake = PTHREAD_COND_INITIALIZER;
static bool cron_sanitize_destination(cron_job_t* job)
{
bool changed = false;
if (!job) {
return false;
}
if (job->channel[0] == '\0') {
strncpy(job->channel, AGENT_CHAN_SYSTEM, sizeof(job->channel) - 1);
changed = true;
}
if (strcmp(job->channel, AGENT_CHAN_FEISHU) == 0) {
if (job->chat_id[0] == '\0' || strcmp(job->chat_id, "cron") == 0) {
syslog(LOG_WARNING,
"[%s] Job %s: bad feishu chat_id, "
"fallback to system:cron\n",
TAG, job->id[0] ? job->id : "<new>");
strncpy(job->channel, AGENT_CHAN_SYSTEM, sizeof(job->channel) - 1);
strncpy(job->chat_id, "cron", sizeof(job->chat_id) - 1);
changed = true;
}
} else if (job->chat_id[0] == '\0') {
strncpy(job->chat_id, "cron", sizeof(job->chat_id) - 1);
changed = true;
}
return changed;
}
static void cron_generate_id(char* id_buf)
{
uint32_t r = ((uint32_t)rand());
snprintf(id_buf, AGENT_CRON_ID_LEN, "%08x", (unsigned int)r);
}
* Returns OK on success, ERROR if the item should be skipped. */
static int cron_parse_job_item(cJSON* item, cron_job_t* job)
{
const char* id = cJSON_GetStringValue(cJSON_GetObjectItem(item, "id"));
const char* name = cJSON_GetStringValue(cJSON_GetObjectItem(item, "name"));
const char* kind_str = cJSON_GetStringValue(cJSON_GetObjectItem(item, "kind"));
const char* message = cJSON_GetStringValue(cJSON_GetObjectItem(item, "message"));
const char* channel = cJSON_GetStringValue(cJSON_GetObjectItem(item, "channel"));
const char* chat_id = cJSON_GetStringValue(cJSON_GetObjectItem(item, "chat_id"));
if (!id || !name || !kind_str || !message) {
return ERROR;
}
memset(job, 0, sizeof(cron_job_t));
strncpy(job->id, id, sizeof(job->id) - 1);
strncpy(job->name, name, sizeof(job->name) - 1);
strncpy(job->message, message, sizeof(job->message) - 1);
strncpy(job->channel, channel ? channel : AGENT_CHAN_SYSTEM,
sizeof(job->channel) - 1);
strncpy(job->chat_id, chat_id ? chat_id : "cron", sizeof(job->chat_id) - 1);
cJSON* enabled_j = cJSON_GetObjectItem(item, "enabled");
job->enabled = enabled_j ? cJSON_IsTrue(enabled_j) : true;
cJSON* delete_j = cJSON_GetObjectItem(item, "delete_after_run");
job->delete_after_run = delete_j ? cJSON_IsTrue(delete_j) : false;
if (strcmp(kind_str, "every") == 0) {
job->kind = CRON_KIND_EVERY;
cJSON* interval = cJSON_GetObjectItem(item, "interval_s");
job->interval_s = (interval && cJSON_IsNumber(interval))
? (uint32_t)interval->valuedouble
: 0;
} else if (strcmp(kind_str, "at") == 0) {
job->kind = CRON_KIND_AT;
cJSON* at_epoch = cJSON_GetObjectItem(item, "at_epoch");
job->at_epoch = (at_epoch && cJSON_IsNumber(at_epoch))
? (int64_t)at_epoch->valuedouble
: 0;
} else {
return ERROR;
}
cJSON* last_run = cJSON_GetObjectItem(item, "last_run");
job->last_run = (last_run && cJSON_IsNumber(last_run))
? (int64_t)last_run->valuedouble
: 0;
cJSON* next_run = cJSON_GetObjectItem(item, "next_run");
job->next_run = (next_run && cJSON_IsNumber(next_run))
? (int64_t)next_run->valuedouble
: 0;
const char* action = cJSON_GetStringValue(cJSON_GetObjectItem(item, "action"));
if (action) {
strncpy(job->action, action, sizeof(job->action) - 1);
}
const char* action_args = cJSON_GetStringValue(cJSON_GetObjectItem(item, "action_args"));
if (action_args) {
strncpy(job->action_args, action_args, sizeof(job->action_args) - 1);
}
return OK;
}
static int cron_load_jobs(void)
{
FILE* f = fopen(AGENT_CRON_FILE, "r");
if (!f) {
syslog(LOG_INFO, "[%s] No cron file, starting fresh\n", TAG);
s_job_count = 0;
return OK;
}
fseek(f, 0, SEEK_END);
long fsize = ftell(f);
fseek(f, 0, SEEK_SET);
if (fsize <= 0 || fsize > AGENT_CRON_FILE_MAX_SIZE) {
syslog(LOG_WARNING, "[%s] Cron file invalid size: %ld\n", TAG, fsize);
fclose(f);
s_job_count = 0;
return OK;
}
char* buf = malloc(fsize + 1);
if (!buf) {
fclose(f);
return ERROR;
}
size_t n = fread(buf, 1, fsize, f);
buf[n] = '\0';
fclose(f);
cJSON* root = cJSON_Parse(buf);
free(buf);
if (!root) {
syslog(LOG_WARNING, "[%s] Failed to parse cron JSON\n", TAG);
s_job_count = 0;
return OK;
}
cJSON* jobs_arr = cJSON_GetObjectItem(root, "jobs");
if (!jobs_arr || !cJSON_IsArray(jobs_arr)) {
cJSON_Delete(root);
s_job_count = 0;
return OK;
}
s_job_count = 0;
bool repaired = false;
cJSON* item;
cJSON_ArrayForEach(item, jobs_arr)
{
if (s_job_count >= MAX_CRON_JOBS) {
break;
}
cron_job_t* job = &s_jobs[s_job_count];
if (cron_parse_job_item(item, job) != OK) {
continue;
}
if (cron_sanitize_destination(job)) {
repaired = true;
}
s_job_count++;
}
cJSON_Delete(root);
if (repaired) {
cron_save_jobs();
}
syslog(LOG_INFO, "[%s] Loaded %d cron jobs\n", TAG, s_job_count);
return OK;
}
static int cron_save_jobs(void)
{
cJSON* root = cJSON_CreateObject();
cJSON* jobs_arr = cJSON_CreateArray();
for (int i = 0; i < s_job_count; i++) {
cron_job_t* job = &s_jobs[i];
cJSON* item = cJSON_CreateObject();
cJSON_AddStringToObject(item, "id", job->id);
cJSON_AddStringToObject(item, "name", job->name);
cJSON_AddBoolToObject(item, "enabled", job->enabled);
cJSON_AddStringToObject(item, "kind",
job->kind == CRON_KIND_EVERY ? "every" : "at");
if (job->kind == CRON_KIND_EVERY) {
cJSON_AddNumberToObject(item, "interval_s", job->interval_s);
} else {
cJSON_AddNumberToObject(item, "at_epoch", (double)job->at_epoch);
}
cJSON_AddStringToObject(item, "message", job->message);
cJSON_AddStringToObject(item, "channel", job->channel);
cJSON_AddStringToObject(item, "chat_id", job->chat_id);
cJSON_AddNumberToObject(item, "last_run", (double)job->last_run);
cJSON_AddNumberToObject(item, "next_run", (double)job->next_run);
cJSON_AddBoolToObject(item, "delete_after_run", job->delete_after_run);
if (job->action[0] != '\0') {
cJSON_AddStringToObject(item, "action", job->action);
cJSON_AddStringToObject(item, "action_args", job->action_args);
}
cJSON_AddItemToArray(jobs_arr, item);
}
cJSON_AddItemToObject(root, "jobs", jobs_arr);
char* json_str = cJSON_Print(root);
cJSON_Delete(root);
if (!json_str) {
syslog(LOG_ERR, "[%s] Failed to serialize cron jobs\n", TAG);
return ERROR;
}
FILE* f = fopen(AGENT_CRON_FILE, "w");
if (!f) {
syslog(LOG_ERR, "[%s] Cannot open %s for write\n", TAG, AGENT_CRON_FILE);
free(json_str);
return ERROR;
}
size_t len = strlen(json_str);
size_t written = fwrite(json_str, 1, len, f);
fclose(f);
free(json_str);
if (written != len) {
syslog(LOG_ERR, "[%s] Cron save incomplete: %d/%d bytes\n", TAG,
(int)written, (int)len);
return ERROR;
}
syslog(LOG_INFO, "[%s] Saved %d cron jobs\n", TAG, s_job_count);
return OK;
}
static void cron_fire_job(cron_job_t* job, time_t now)
{
syslog(LOG_INFO, "[%s] Cron job firing: %s (%s)\n", TAG, job->name, job->id);
if (job->action[0] != '\0') {
char tool_output[512];
tool_output[0] = '\0';
syslog(LOG_INFO, "[%s] Executing action: %s args=%.128s\n",
TAG, job->action, job->action_args);
int err = tool_registry_execute(job->action, job->action_args,
tool_output, sizeof(tool_output));
if (err != OK) {
syslog(LOG_WARNING, "[%s] Action %s failed: %s\n",
TAG, job->action, tool_output);
} else {
syslog(LOG_INFO, "[%s] Action %s OK: %.128s\n",
TAG, job->action, tool_output);
}
* Skip voice notification when the action itself failed —
* speaking the failure message through voice channel can
* trigger another round of audio close cascade errors. */
if (err != OK
&& strcmp(job->channel, AGENT_CHAN_VOICE) == 0) {
syslog(LOG_WARNING,
"[%s] Skipping voice notification for failed action %s\n",
TAG, job->action);
} else if (job->channel[0] != '\0') {
agent_msg_t notify;
memset(¬ify, 0, sizeof(notify));
strncpy(notify.channel, job->channel, sizeof(notify.channel) - 1);
strncpy(notify.chat_id, job->chat_id, sizeof(notify.chat_id) - 1);
notify.content = strdup(job->message);
if (notify.content) {
if (message_bus_push_outbound(¬ify) != OK) {
free(notify.content);
}
}
}
} else {
agent_msg_t msg;
memset(&msg, 0, sizeof(msg));
strncpy(msg.channel, job->channel, sizeof(msg.channel) - 1);
strncpy(msg.chat_id, job->chat_id, sizeof(msg.chat_id) - 1);
msg.content = strdup(job->message);
if (msg.content) {
int err = message_bus_push_outbound(&msg);
if (err != OK) {
syslog(LOG_WARNING, "[%s] Failed to push cron message\n", TAG);
free(msg.content);
}
}
}
job->last_run = now;
}
static void cron_process_due_jobs(void)
{
time_t now = time(NULL);
bool changed = false;
pthread_mutex_lock(&s_cron_lock);
for (int i = 0; i < s_job_count; i++) {
cron_job_t* job = &s_jobs[i];
syslog(LOG_DEBUG,
"[%s] Check job[%d] '%s': enabled=%d next_run=%lld now=%lld\n", TAG,
i, job->name, job->enabled, (long long)job->next_run,
(long long)now);
if (!job->enabled || job->next_run <= 0 || job->next_run > now) {
continue;
}
cron_fire_job(job, now);
if (job->kind == CRON_KIND_AT) {
if (job->delete_after_run) {
syslog(LOG_INFO, "[%s] Deleting one-shot: %s\n", TAG, job->name);
for (int j = i; j < s_job_count - 1; j++) {
s_jobs[j] = s_jobs[j + 1];
}
s_job_count--;
i--;
} else {
job->enabled = false;
job->next_run = 0;
}
} else {
job->next_run = now + job->interval_s;
}
changed = true;
}
if (changed) {
cron_save_jobs();
}
pthread_mutex_unlock(&s_cron_lock);
}
static void* cron_task_main(void* arg)
{
(void)arg;
while (s_cron_running) {
* wake us immediately when a new job is added. This also
* survives NuttX deep-sleep better than plain usleep. */
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += AGENT_CRON_CHECK_INTERVAL_MS / 1000;
pthread_mutex_lock(&s_cron_lock);
pthread_cond_timedwait(&s_cron_wake, &s_cron_lock, &ts);
pthread_mutex_unlock(&s_cron_lock);
cron_process_due_jobs();
}
return NULL;
}
static void compute_initial_next_run(cron_job_t* job)
{
time_t now = time(NULL);
if (job->kind == CRON_KIND_EVERY) {
job->next_run = now + job->interval_s;
} else if (job->kind == CRON_KIND_AT) {
if (job->at_epoch > now) {
job->next_run = job->at_epoch;
} else {
job->next_run = 0;
job->enabled = false;
}
}
}
int cron_service_init(void) { return cron_load_jobs(); }
int cron_service_start(void)
{
pthread_mutex_lock(&s_cron_lock);
if (s_cron_running) {
pthread_mutex_unlock(&s_cron_lock);
syslog(LOG_WARNING, "[%s] Cron task already running\n", TAG);
return OK;
}
time_t now = time(NULL);
bool changed = false;
for (int i = 0; i < s_job_count; i++) {
cron_job_t* job = &s_jobs[i];
if (!job->enabled) {
continue;
}
* interval_s is relative to the previous action completion
* (e.g. music playback end), so rescheduling to now+interval
* would produce wrong timing. Disable instead. */
if (job->next_run > 0 && job->next_run <= now) {
syslog(LOG_WARNING,
"[%s] disabling stale job: %s (next_run %lld <= now %lld)\n",
TAG, job->name, (long long)job->next_run, (long long)now);
job->enabled = false;
job->next_run = 0;
changed = true;
continue;
}
if (job->next_run <= 0) {
if (job->kind == CRON_KIND_EVERY) {
job->next_run = now + job->interval_s;
} else if (job->kind == CRON_KIND_AT && job->at_epoch > now) {
job->next_run = job->at_epoch;
}
}
}
if (changed) {
cron_save_jobs();
}
s_cron_running = true;
pthread_mutex_unlock(&s_cron_lock);
int err = agent_task_create(cron_task_main, "cron", AGENT_CRON_STACK,
NULL, AGENT_CRON_PRIO);
if (err != OK) {
s_cron_running = false;
syslog(LOG_ERR, "[%s] Failed to create cron task\n", TAG);
return ERROR;
}
syslog(LOG_INFO, "[%s] Cron started (%d jobs, interval %ds)\n", TAG,
s_job_count, AGENT_CRON_CHECK_INTERVAL_MS / 1000);
return OK;
}
void cron_service_stop(void)
{
if (s_cron_running) {
s_cron_running = false;
syslog(LOG_INFO, "[%s] Cron service stopping\n", TAG);
}
}
int cron_add_job(cron_job_t* job)
{
pthread_mutex_lock(&s_cron_lock);
if (s_job_count >= MAX_CRON_JOBS) {
syslog(LOG_WARNING, "[%s] Max cron jobs reached (%d)\n", TAG,
MAX_CRON_JOBS);
pthread_mutex_unlock(&s_cron_lock);
return ERROR;
}
cron_generate_id(job->id);
cron_sanitize_destination(job);
job->enabled = true;
job->last_run = 0;
compute_initial_next_run(job);
s_jobs[s_job_count] = *job;
s_job_count++;
cron_save_jobs();
pthread_cond_signal(&s_cron_wake);
pthread_mutex_unlock(&s_cron_lock);
syslog(LOG_INFO,
"[%s] Added job: %s (%s) kind=%s "
"next=%lld ch=%s:%s\n",
TAG, job->name, job->id, job->kind == CRON_KIND_EVERY ? "every" : "at",
(long long)job->next_run, job->channel, job->chat_id);
return OK;
}
int cron_remove_job(const char* job_id)
{
pthread_mutex_lock(&s_cron_lock);
for (int i = 0; i < s_job_count; i++) {
if (strcmp(s_jobs[i].id, job_id) == 0) {
syslog(LOG_INFO, "[%s] Removing job: %s (%s)\n", TAG, s_jobs[i].name,
job_id);
for (int j = i; j < s_job_count - 1; j++) {
s_jobs[j] = s_jobs[j + 1];
}
s_job_count--;
cron_save_jobs();
pthread_mutex_unlock(&s_cron_lock);
return OK;
}
}
pthread_mutex_unlock(&s_cron_lock);
syslog(LOG_WARNING, "[%s] Cron job not found: %s\n", TAG, job_id);
return ERROR;
}
int cron_list_jobs(cron_job_t* out, int max_count)
{
int count;
pthread_mutex_lock(&s_cron_lock);
count = s_job_count < max_count ? s_job_count : max_count;
memcpy(out, s_jobs, count * sizeof(cron_job_t));
pthread_mutex_unlock(&s_cron_lock);
return count;
}