* Copyright (C) 2026 Xiaomi Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "node/node_manager.h"
#include "core/message_bus.h"
#include "cJSON.h"
#include "tools/tool_registry.h"
#include "agent_compat.h"
#include "agent_config.h"
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <time.h>
#include <unistd.h>
static const char* TAG = "node_mgr";
typedef struct {
bool active;
int ws_fd;
char node_id[NODE_ID_LEN];
char display_name[64];
char platform[16];
char device_family[16];
char commands[NODE_MAX_COMMANDS][NODE_CMD_LEN];
int cmd_count;
} node_entry_t;
static node_entry_t s_nodes[NODE_MAX_NODES];
static pthread_mutex_t s_mtx = PTHREAD_MUTEX_INITIALIZER;
#define MAX_PENDING 4
#define INVOKE_TIMEOUT_S 30
typedef struct {
bool active;
char invoke_id[48];
bool done;
bool ok;
char* result_json;
pthread_mutex_t mtx;
pthread_cond_t cond;
} pending_invoke_t;
static pending_invoke_t s_pending[MAX_PENDING];
* Without this, parallel tool threads invoking the same node
* interleave header + payload bytes on the fd, corrupting the
* WebSocket frame stream and causing the remote side to see
* garbled data (observed as "JSON parse failed" + disconnect). */
static pthread_mutex_t s_ws_write_mtx = PTHREAD_MUTEX_INITIALIZER;
static int ws_send_text_to_fd(int fd, const char* data, size_t len)
{
unsigned char hdr[10];
int hdr_len = 0;
hdr[0] = 0x81;
if (len < 126) {
hdr[1] = (unsigned char)len;
hdr_len = 2;
} else if (len < 65536) {
hdr[1] = 126;
hdr[2] = (unsigned char)(len >> 8);
hdr[3] = (unsigned char)(len & 0xFF);
hdr_len = 4;
} else {
return -1;
}
pthread_mutex_lock(&s_ws_write_mtx);
int ret = 0;
if (send(fd, hdr, hdr_len, 0) != hdr_len) {
ret = -1;
} else if (send(fd, data, len, 0) != (int)len) {
ret = -1;
}
pthread_mutex_unlock(&s_ws_write_mtx);
return ret;
}
static void gen_id(char* buf, size_t cap)
{
unsigned char rnd[4];
if (agent_secure_random(rnd, sizeof(rnd)) == 0) {
snprintf(buf, cap, "%02x%02x%02x%02x-%04x",
rnd[0], rnd[1], rnd[2], rnd[3],
(unsigned)((rnd[0] << 8) | rnd[1]) & 0xFFFF);
} else {
snprintf(buf, cap, "%08lx-%04x", (unsigned long)time(NULL),
(unsigned)((uintptr_t)buf & 0xFFFF));
}
}
static int send_event(int fd, const char* event, cJSON* payload)
{
cJSON* frame = cJSON_CreateObject();
cJSON_AddStringToObject(frame, "type", "evt");
cJSON_AddStringToObject(frame, "event", event);
if (payload)
cJSON_AddItemToObject(frame, "payload", payload);
char* json = cJSON_PrintUnformatted(frame);
int ret = ws_send_text_to_fd(fd, json, strlen(json));
free(json);
cJSON_Delete(frame);
return ret;
}
static int send_response(int fd, const char* req_id, bool ok, cJSON* payload,
cJSON* error)
{
cJSON* frame = cJSON_CreateObject();
cJSON_AddStringToObject(frame, "type", "res");
cJSON_AddStringToObject(frame, "id", req_id);
cJSON_AddBoolToObject(frame, "ok", ok);
if (ok && payload)
cJSON_AddItemToObject(frame, "payload", payload);
else if (!ok && error)
cJSON_AddItemToObject(frame, "error", error);
char* json = cJSON_PrintUnformatted(frame);
int ret = ws_send_text_to_fd(fd, json, strlen(json));
free(json);
cJSON_Delete(frame);
return ret;
}
static void sanitize_tool_name_part(char* s)
{
bool changed = false;
for (char* p = s; *p; p++) {
if (!((*p >= 'a' && *p <= 'z') || (*p >= 'A' && *p <= 'Z')
|| (*p >= '0' && *p <= '9') || *p == '_' || *p == '-')) {
if (!changed)
syslog(LOG_WARNING, "[%s] Sanitizing tool name part: '%s' "
"(illegal char '%c' at pos %d)\n",
TAG, s, *p, (int)(p - s));
*p = '_';
changed = true;
}
}
if (changed)
syslog(LOG_INFO, "[%s] Sanitized result: '%s'\n", TAG, s);
}
static node_entry_t* find_node_by_fd(int fd)
{
for (int i = 0; i < NODE_MAX_NODES; i++) {
if (s_nodes[i].active && s_nodes[i].ws_fd == fd)
return &s_nodes[i];
}
return NULL;
}
static node_entry_t* find_node_by_id(const char* id)
{
for (int i = 0; i < NODE_MAX_NODES; i++) {
if (s_nodes[i].active && strcmp(s_nodes[i].node_id, id) == 0)
return &s_nodes[i];
}
return NULL;
}
static pending_invoke_t* alloc_pending(const char* invoke_id)
{
for (int i = 0; i < MAX_PENDING; i++) {
if (!s_pending[i].active) {
pending_invoke_t* p = &s_pending[i];
memset(p, 0, sizeof(*p));
p->active = true;
strncpy(p->invoke_id, invoke_id, sizeof(p->invoke_id) - 1);
pthread_mutex_init(&p->mtx, NULL);
pthread_cond_init(&p->cond, NULL);
return p;
}
}
return NULL;
}
static void free_pending(pending_invoke_t* p)
{
p->active = false;
pthread_mutex_destroy(&p->mtx);
pthread_cond_destroy(&p->cond);
}
static pending_invoke_t* find_pending(const char* invoke_id)
{
for (int i = 0; i < MAX_PENDING; i++) {
if (s_pending[i].active && strcmp(s_pending[i].invoke_id, invoke_id) == 0)
return &s_pending[i];
}
return NULL;
}
static void handle_connect(int ws_fd, const char* req_id, cJSON* params)
{
cJSON* client = cJSON_GetObjectItem(params, "client");
const char* node_id = cJSON_GetStringValue(cJSON_GetObjectItem(client, "id"));
const char* display = cJSON_GetStringValue(cJSON_GetObjectItem(client, "displayName"));
const char* platform = cJSON_GetStringValue(cJSON_GetObjectItem(client, "platform"));
const char* family = cJSON_GetStringValue(cJSON_GetObjectItem(client, "deviceFamily"));
if (!node_id || !node_id[0]) {
cJSON* err = cJSON_CreateObject();
cJSON_AddStringToObject(err, "code", "INVALID_CLIENT");
cJSON_AddStringToObject(err, "message", "missing client.id");
send_response(ws_fd, req_id, false, NULL, err);
return;
}
char safe_id[NODE_ID_LEN];
strncpy(safe_id, node_id, NODE_ID_LEN - 1);
safe_id[NODE_ID_LEN - 1] = '\0';
sanitize_tool_name_part(safe_id);
pthread_mutex_lock(&s_mtx);
node_entry_t* existing = find_node_by_id(safe_id);
if (existing) {
syslog(LOG_INFO, "[%s] Node %s reconnected (old fd=%d, new fd=%d)\n", TAG,
node_id, existing->ws_fd, ws_fd);
existing->ws_fd = ws_fd;
}
node_entry_t* node = existing;
if (!node) {
for (int i = 0; i < NODE_MAX_NODES; i++) {
if (!s_nodes[i].active) {
node = &s_nodes[i];
break;
}
}
}
if (!node) {
pthread_mutex_unlock(&s_mtx);
cJSON* err = cJSON_CreateObject();
cJSON_AddStringToObject(err, "code", "MAX_NODES");
cJSON_AddStringToObject(err, "message", "node limit reached");
send_response(ws_fd, req_id, false, NULL, err);
return;
}
memset(node, 0, sizeof(*node));
node->active = true;
node->ws_fd = ws_fd;
strncpy(node->node_id, safe_id, NODE_ID_LEN - 1);
if (display)
strncpy(node->display_name, display, sizeof(node->display_name) - 1);
if (platform)
strncpy(node->platform, platform, sizeof(node->platform) - 1);
if (family)
strncpy(node->device_family, family, sizeof(node->device_family) - 1);
cJSON* cmds = cJSON_GetObjectItem(params, "commands");
if (cmds && cJSON_IsArray(cmds)) {
cJSON* c = NULL;
cJSON_ArrayForEach(c, cmds)
{
if (node->cmd_count >= NODE_MAX_COMMANDS)
break;
if (cJSON_IsString(c)) {
strncpy(node->commands[node->cmd_count], c->valuestring,
NODE_CMD_LEN - 1);
sanitize_tool_name_part(node->commands[node->cmd_count]);
node->cmd_count++;
}
}
}
pthread_mutex_unlock(&s_mtx);
syslog(LOG_INFO, "[%s] Node connected: %s (%s/%s) with %d commands\n", TAG,
node_id, platform ? platform : "?", family ? family : "?",
node->cmd_count);
tool_registry_invalidate();
cJSON* payload = cJSON_CreateObject();
cJSON_AddStringToObject(payload, "type", "hello-ok");
cJSON* server = cJSON_CreateObject();
char conn_id[32];
gen_id(conn_id, sizeof(conn_id));
cJSON_AddStringToObject(server, "connId", conn_id);
cJSON_AddItemToObject(payload, "server", server);
send_response(ws_fd, req_id, true, payload, NULL);
tool_registry_rebuild_json();
}
static void handle_invoke_result(cJSON* params)
{
const char* invoke_id = cJSON_GetStringValue(cJSON_GetObjectItem(params, "id"));
if (!invoke_id)
return;
pending_invoke_t* p = find_pending(invoke_id);
if (!p) {
syslog(LOG_WARNING, "[%s] No pending invoke for id=%s\n", TAG, invoke_id);
return;
}
pthread_mutex_lock(&p->mtx);
p->ok = cJSON_IsTrue(cJSON_GetObjectItem(params, "ok"));
if (p->ok) {
const char* pj = cJSON_GetStringValue(cJSON_GetObjectItem(params, "payloadJSON"));
if (pj)
p->result_json = strdup(pj);
} else {
cJSON* err = cJSON_GetObjectItem(params, "error");
const char* msg = cJSON_GetStringValue(cJSON_GetObjectItem(err, "message"));
p->result_json = strdup(msg ? msg : "node error");
}
p->done = true;
pthread_cond_signal(&p->cond);
pthread_mutex_unlock(&p->mtx);
}
int node_manager_init(void)
{
memset(s_nodes, 0, sizeof(s_nodes));
memset(s_pending, 0, sizeof(s_pending));
tool_registry_register_provider("node",
node_manager_get_tools_json,
node_manager_execute);
syslog(LOG_INFO, "[%s] Node manager initialized (max %d nodes)\n", TAG,
NODE_MAX_NODES);
return OK;
}
bool node_manager_handle_message(int ws_fd, const char* data, int len)
{
cJSON* root = cJSON_ParseWithLength(data, len);
if (!root) {
syslog(LOG_DEBUG, "[%s] handle_message: JSON parse failed fd=%d len=%d\n",
TAG, ws_fd, len);
return false;
}
const char* type = cJSON_GetStringValue(cJSON_GetObjectItem(root, "type"));
if (!type) {
cJSON_Delete(root);
return false;
}
if (strcmp(type, "req") == 0) {
const char* method = cJSON_GetStringValue(cJSON_GetObjectItem(root, "method"));
const char* req_id = cJSON_GetStringValue(cJSON_GetObjectItem(root, "id"));
cJSON* params = cJSON_GetObjectItem(root, "params");
if (!method || !req_id) {
cJSON_Delete(root);
return false;
}
if (strcmp(method, "connect") == 0) {
const char* role = cJSON_GetStringValue(cJSON_GetObjectItem(params, "role"));
if (!role || strcmp(role, "node") != 0) {
cJSON_Delete(root);
return false;
}
handle_connect(ws_fd, req_id, params);
cJSON_Delete(root);
return true;
}
if (strcmp(method, "node.invoke.result") == 0) {
if (params)
handle_invoke_result(params);
cJSON_Delete(root);
return true;
}
* fired on a node that has no local feishu credentials). */
if (strcmp(method, "chat.forward") == 0) {
cJSON* pl = params ? cJSON_GetObjectItem(params, "payload") : NULL;
if (!pl)
pl = params;
if (pl) {
const char* ch = cJSON_GetStringValue(cJSON_GetObjectItem(pl, "channel"));
const char* cid = cJSON_GetStringValue(cJSON_GetObjectItem(pl, "chat_id"));
const char* ct = cJSON_GetStringValue(cJSON_GetObjectItem(pl, "content"));
if (ch && cid && ct) {
syslog(LOG_INFO, "[%s] chat.forward from node: %s:%s\n", TAG, ch, cid);
agent_msg_t msg;
memset(&msg, 0, sizeof(msg));
strncpy(msg.channel, ch, sizeof(msg.channel) - 1);
strncpy(msg.chat_id, cid, sizeof(msg.chat_id) - 1);
msg.content = strdup(ct);
if (msg.content) {
if (message_bus_push_outbound(&msg) != OK)
free(msg.content);
}
}
}
cJSON* ack = cJSON_CreateObject();
cJSON_AddStringToObject(ack, "type", "ok");
send_response(ws_fd, req_id, true, ack, NULL);
cJSON_Delete(root);
return true;
}
cJSON_Delete(root);
return false;
}
if (strcmp(type, "res") == 0) {
cJSON* payload = cJSON_GetObjectItem(root, "payload");
if (payload) {
const char* id = cJSON_GetStringValue(cJSON_GetObjectItem(payload, "id"));
if (id && find_pending(id)) {
handle_invoke_result(payload);
cJSON_Delete(root);
return true;
}
}
cJSON_Delete(root);
return false;
}
cJSON_Delete(root);
return false;
}
void node_manager_on_disconnect(int ws_fd)
{
pthread_mutex_lock(&s_mtx);
node_entry_t* node = find_node_by_fd(ws_fd);
if (node) {
syslog(LOG_INFO, "[%s] Node disconnected: %s (%d commands removed)\n", TAG,
node->node_id, node->cmd_count);
node->active = false;
}
pthread_mutex_unlock(&s_mtx);
if (node) {
tool_registry_invalidate();
}
* instead of waiting for the full 30s timeout. */
for (int i = 0; i < MAX_PENDING; i++) {
if (!s_pending[i].active) {
continue;
}
pthread_mutex_lock(&s_pending[i].mtx);
if (!s_pending[i].done) {
s_pending[i].done = true;
s_pending[i].ok = false;
s_pending[i].result_json = strdup("node disconnected");
pthread_cond_signal(&s_pending[i].cond);
}
pthread_mutex_unlock(&s_pending[i].mtx);
}
}
int node_manager_execute(const char* tool_name, const char* input_json,
char* output, size_t output_size)
{
if (strncmp(tool_name, "node.", 5) != 0)
return ERROR;
const char* rest = tool_name + 5;
const char* sep = strrchr(rest, '.');
if (!sep) {
snprintf(output, output_size, "Invalid node tool format: %s", tool_name);
return ERROR;
}
char node_id[NODE_ID_LEN];
size_t id_len = (size_t)(sep - rest);
if (id_len >= NODE_ID_LEN)
id_len = NODE_ID_LEN - 1;
memcpy(node_id, rest, id_len);
node_id[id_len] = '\0';
const char* command = sep + 1;
pthread_mutex_lock(&s_mtx);
node_entry_t* node = find_node_by_id(node_id);
if (!node) {
pthread_mutex_unlock(&s_mtx);
snprintf(output, output_size, "Node '%s' not connected", node_id);
return ERROR;
}
int ws_fd = node->ws_fd;
pthread_mutex_unlock(&s_mtx);
char invoke_id[48];
gen_id(invoke_id, sizeof(invoke_id));
pending_invoke_t* p = alloc_pending(invoke_id);
if (!p) {
snprintf(output, output_size, "Too many pending invocations");
return ERROR;
}
cJSON* payload = cJSON_CreateObject();
cJSON_AddStringToObject(payload, "id", invoke_id);
cJSON_AddStringToObject(payload, "nodeId", node_id);
cJSON_AddStringToObject(payload, "command", command);
* (e.g. "at_epoch": "1775654400" instead of "at_epoch": 1775654400).
* Scan all string values and convert those that look like numbers. */
const char* params_str = input_json ? input_json : "{}";
cJSON* params_obj = cJSON_Parse(params_str);
if (params_obj && cJSON_IsObject(params_obj)) {
int count = cJSON_GetArraySize(params_obj);
char** keys = NULL;
int nkeys = 0;
if (count > 0) {
keys = malloc(sizeof(char*) * count);
cJSON* item = NULL;
cJSON_ArrayForEach(item, params_obj)
{
if (cJSON_IsString(item) && item->valuestring && item->valuestring[0]) {
char* endp = NULL;
double dval = strtod(item->valuestring, &endp);
if (endp && *endp == '\0' && endp != item->valuestring) {
keys[nkeys++] = strdup(item->string);
(void)dval;
}
}
}
for (int i = 0; i < nkeys; i++) {
cJSON* old = cJSON_GetObjectItem(params_obj, keys[i]);
if (old && cJSON_IsString(old)) {
double dv = strtod(old->valuestring, NULL);
cJSON_ReplaceItemInObject(params_obj, keys[i],
cJSON_CreateNumber(dv));
}
free(keys[i]);
}
free(keys);
}
char* normalized = cJSON_PrintUnformatted(params_obj);
cJSON_AddStringToObject(payload, "paramsJSON",
normalized ? normalized : params_str);
free(normalized);
cJSON_Delete(params_obj);
} else {
if (params_obj)
cJSON_Delete(params_obj);
cJSON_AddStringToObject(payload, "paramsJSON", params_str);
}
int send_ret = send_event(ws_fd, "node.invoke.request", payload);
syslog(LOG_INFO, "[%s] Invoke %s on node %s (id=%.8s) fd=%d send=%d\n",
TAG, command, node_id, invoke_id, ws_fd, send_ret);
if (send_ret != 0) {
syslog(LOG_ERR, "[%s] Failed to send invoke request to node %s\n",
TAG, node_id);
free_pending(p);
snprintf(output, output_size, "Failed to send invoke to node '%s'", node_id);
return ERROR;
}
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += INVOKE_TIMEOUT_S;
pthread_mutex_lock(&p->mtx);
while (!p->done) {
int rc = pthread_cond_timedwait(&p->cond, &p->mtx, &ts);
if (rc == ETIMEDOUT)
break;
}
pthread_mutex_unlock(&p->mtx);
int ret;
if (!p->done) {
syslog(LOG_WARNING, "[%s] Invoke %.8s TIMEOUT (fd=%d)\n", TAG, invoke_id,
ws_fd);
snprintf(output, output_size, "Node invoke timeout after %ds",
INVOKE_TIMEOUT_S);
ret = ERROR;
} else if (p->ok) {
syslog(LOG_INFO, "[%s] Invoke %.8s OK: %.200s\n", TAG, invoke_id,
p->result_json ? p->result_json : "{}");
strncpy(output, p->result_json ? p->result_json : "{}", output_size - 1);
output[output_size - 1] = '\0';
ret = OK;
} else {
syslog(LOG_WARNING, "[%s] Invoke %.8s FAILED: %s\n", TAG, invoke_id,
p->result_json ? p->result_json : "unknown");
snprintf(output, output_size, "Node error: %s",
p->result_json ? p->result_json : "unknown");
ret = ERROR;
}
free(p->result_json);
free_pending(p);
return ret;
}
char* node_manager_get_tools_json(void)
{
cJSON* arr = cJSON_CreateArray();
pthread_mutex_lock(&s_mtx);
for (int i = 0; i < NODE_MAX_NODES; i++) {
if (!s_nodes[i].active)
continue;
node_entry_t* n = &s_nodes[i];
for (int j = 0; j < n->cmd_count; j++) {
char full_name[128];
snprintf(full_name, sizeof(full_name), "node.%s.%s", n->node_id,
n->commands[j]);
char desc[256];
snprintf(desc, sizeof(desc), "[Remote %s/%s] %s on %s", n->platform,
n->device_family, n->commands[j],
n->display_name[0] ? n->display_name : n->node_id);
cJSON* tool = cJSON_CreateObject();
cJSON_AddStringToObject(tool, "name", full_name);
cJSON_AddStringToObject(tool, "description", desc);
cJSON* schema = cJSON_CreateObject();
cJSON_AddStringToObject(schema, "type", "object");
cJSON_AddItemToObject(schema, "properties", cJSON_CreateObject());
cJSON_AddItemToObject(tool, "input_schema", schema);
cJSON_AddItemToArray(arr, tool);
}
}
pthread_mutex_unlock(&s_mtx);
if (cJSON_GetArraySize(arr) == 0) {
cJSON_Delete(arr);
return NULL;
}
char* json = cJSON_PrintUnformatted(arr);
cJSON_Delete(arr);
return json;
}
int node_manager_list(char* buf, size_t size)
{
int off = 0;
int count = 0;
pthread_mutex_lock(&s_mtx);
for (int i = 0; i < NODE_MAX_NODES; i++) {
if (!s_nodes[i].active)
continue;
node_entry_t* n = &s_nodes[i];
count++;
off += snprintf(buf + off, size - off, " [%d] %s (%s/%s) fd=%d cmds=%d\n",
i, n->node_id, n->platform, n->device_family, n->ws_fd,
n->cmd_count);
for (int j = 0; j < n->cmd_count && off < (int)size - 32; j++) {
off += snprintf(buf + off, size - off, " - %s\n", n->commands[j]);
}
}
pthread_mutex_unlock(&s_mtx);
if (count == 0) {
snprintf(buf, size, " (no nodes connected)\n");
}
return count;
}
* Send connect.challenge to a newly connected WebSocket client.
* Called from ws_server when a new client connects, so Nodes
* can identify themselves.
*/
void node_manager_send_challenge(int ws_fd)
{
cJSON* payload = cJSON_CreateObject();
char nonce[32];
gen_id(nonce, sizeof(nonce));
cJSON_AddStringToObject(payload, "nonce", nonce);
send_event(ws_fd, "connect.challenge", payload);
}
int node_manager_active_count(void)
{
int count = 0;
pthread_mutex_lock(&s_mtx);
for (int i = 0; i < NODE_MAX_NODES; i++) {
if (s_nodes[i].active)
count++;
}
pthread_mutex_unlock(&s_mtx);
return count;
}
int node_manager_broadcast_chat(const char* channel, const char* chat_id,
const char* content)
{
if (!channel || !chat_id || !content)
return 0;
cJSON* payload = cJSON_CreateObject();
cJSON_AddStringToObject(payload, "channel", channel);
cJSON_AddStringToObject(payload, "chat_id", chat_id);
cJSON_AddStringToObject(payload, "content", content);
int sent = 0;
pthread_mutex_lock(&s_mtx);
for (int i = 0; i < NODE_MAX_NODES; i++) {
if (s_nodes[i].active) {
cJSON* dup = cJSON_Duplicate(payload, true);
if (dup && send_event(s_nodes[i].ws_fd, "chat.forward", dup) == 0) {
syslog(LOG_INFO, "[%s] chat.forward → node %s\n", TAG,
s_nodes[i].node_id);
sent++;
} else {
cJSON_Delete(dup);
}
}
}
pthread_mutex_unlock(&s_mtx);
cJSON_Delete(payload);
syslog(LOG_INFO, "[%s] Broadcast chat.forward to %d node(s)\n", TAG, sent);
return sent;
}