#include "xsched/types.h"
#include "xsched/utils/log.h"
#include "xsched/utils/str.h"
#include "xsched/utils/xassert.h"
#include "xsched/protocol/def.h"
#include "xsched/protocol/names.h"
#include "xsched/preempt/sched/agent.h"
#include "xsched/preempt/sched/executor.h"
#include "xsched/sched/scheduler/local.h"
#include "xsched/xsched.h"
#include <signal.h>
#include <fcntl.h>
#include <unistd.h>
#include <cerrno>
#include <cstring>
#include <string>
#include <thread>
#include <atomic>
#include <mutex>
#include <poll.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/stat.h>
using namespace xsched::sched;
using namespace xsched::protocol;
namespace xsched::preempt
{
#define XSCHED_CTL_FILE "/tmp/xsched-policy"
static std::mutex g_sched_mutex;
static std::atomic<XPolicyType> g_last_policy{kPolicyUnknown};
static std::atomic<Timeslice> g_last_ts{0};
static std::atomic<size_t> g_last_kd{1};
static int g_sig_pipe_r = -1;
static int g_sig_pipe_w = -1;
static std::atomic<bool> g_shutdown{false};
static std::string xsched_execute_line(const char *key, const char *val)
{
if (std::strcmp(key, "POLICY") == 0) {
if (std::strcmp(val, "?") == 0 || std::strcmp(val, "help") == 0) {
std::string help;
help += "POLICY=HPF|HHPF|UP|PUP|KEDF|LAX|AWF|CHPF\n";
help += "TIMESLICE=100..100000 (us, for UP/PUP)\n";
help += "KDEADLINE=1..N (concurrency, for KEDF)\n";
help += "STATUS show current config\n";
help += "HELP this message\n";
help += "example: POLICY=KEDF KDEADLINE=3";
return help;
}
XPolicyType type = GetPolicyType(val);
if (type == kPolicyUnknown)
return std::string("ERR: unknown policy '") + val + "'";
XResult ret = SchedAgent::SetScheduler(kSchedulerLocal, type);
if (ret == kXSchedSuccess) g_last_policy = type;
XINFO("xsched: policy -> %s", val);
std::string resp;
if (ret == kXSchedSuccess) {
resp = std::string("OK: policy switched to ") + val;
if (type == kPolicyUtilizationPartition || type == kPolicyProcessUtilizationPartition)
resp += "\nhint: use TIMESLICE=<us> to set timeslice (current: " +
(g_last_ts > 0 ? std::to_string(g_last_ts) : "0") + " us)";
else if (type == kPolicyKEarliestDeadlineFirst)
resp += "\nhint: use KDEADLINE=<N> to set concurrency (current: " +
std::to_string(g_last_kd) + ")";
} else {
resp = "ERR: policy switch failed";
}
return resp;
}
if (std::strcmp(key, "TIMESLICE") == 0) {
long ts = std::atol(val);
if (ts < 100 || ts > 100000)
return std::string("ERR: TIMESLICE out of range [100, 100000]: ") + val;
XResult ret = XHintTimeslice((Timeslice)ts);
if (ret == kXSchedSuccess) g_last_ts = (Timeslice)ts;
XINFO("xsched: TIMESLICE -> %ld us", ts);
return (ret == kXSchedSuccess)
? std::string("OK: TIMESLICE set to ") + val + " us"
: "ERR: TIMESLICE set failed";
}
if (std::strcmp(key, "KDEADLINE") == 0) {
long k = std::atol(val);
if (k < 1)
return std::string("ERR: KDEADLINE must be >= 1: ") + val;
XResult ret = XHintKDeadline((size_t)k);
if (ret == kXSchedSuccess) g_last_kd = (size_t)k;
XINFO("xsched: KDEADLINE -> %ld", k);
return (ret == kXSchedSuccess)
? std::string("OK: KDEADLINE set to ") + val
: "ERR: KDEADLINE set failed";
}
if (std::strcmp(key, "STATUS") == 0) {
std::string s;
const char *names[] = {
"HPF", "HHPF", "UP", "PUP", "KEDF", "LAX", "AWF", "CHPF", "WRR", "UNKNOWN"
};
int idx = (g_last_policy >= kPolicyHighestPriorityFirst &&
g_last_policy <= kPolicyWeightedRoundRobin)
? (int)g_last_policy - 1 : 9;
s += "POLICY=" + std::string(names[idx]);
s += " TIMESLICE=" + (g_last_ts > 0 ? std::to_string(g_last_ts) : "0");
s += " KDEADLINE=" + std::to_string(g_last_kd);
return s;
}
return std::string("ERR: unknown command '") + key + "'";
}
static void xsched_process_config_file()
{
int fd = ::open(XSCHED_CTL_FILE, O_RDONLY);
if (fd < 0) {
XWARN("xsched: cannot open %s (%s)", XSCHED_CTL_FILE, ::strerror(errno));
return;
}
struct stat file_stat;
if (::fstat(fd, &file_stat) == 0 && file_stat.st_uid != ::getuid()) {
XWARN("xsched: %s owned by uid %d, ignoring", XSCHED_CTL_FILE, (int)file_stat.st_uid);
::close(fd);
return;
}
char buf[512];
ssize_t n = ::read(fd, buf, sizeof(buf) - 1);
::close(fd);
if (n <= 0) {
XWARN("xsched: empty config file");
return;
}
buf[n] = '\0';
char *line = buf;
char *saveptr = nullptr;
while ((line = strtok_r(line, "\n\r", &saveptr)) != nullptr) {
if (line[0] == '\0' || line[0] == '#') { line = nullptr; continue; }
char *eq = std::strchr(line, '=');
if (!eq) { line = nullptr; continue; }
*eq = '\0';
char *key = line, *val = eq + 1;
line = nullptr;
while (*key == ' ' || *key == '\t') key++;
while (*val == ' ' || *val == '\t') val++;
xsched_execute_line(key, val);
}
}
static void xsched_sigusr1_handler(int sig) {
(void)sig;
char byte = 1;
ssize_t r = ::write(g_sig_pipe_w, &byte, 1);
(void)r;
}
static std::string xsched_socket_name()
{
return std::string("xsched-") + std::to_string(getpid());
}
static void xsched_socket_listener()
{
const std::string name = xsched_socket_name();
int fd = ::socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) {
XWARN("xsched: failed to create socket (%s)", ::strerror(errno));
return;
}
struct sockaddr_un addr;
std::memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
std::strncpy(addr.sun_path + 1, name.c_str(), sizeof(addr.sun_path) - 2);
socklen_t addr_len = offsetof(struct sockaddr_un, sun_path) + 1 + name.size();
if (::bind(fd, (struct sockaddr *)&addr, addr_len) < 0) {
XWARN("xsched: bind(@%s) failed (%s)", name.c_str(), ::strerror(errno));
::close(fd);
return;
}
if (::listen(fd, 5) < 0) {
XWARN("xsched: listen failed (%s)", ::strerror(errno));
::close(fd);
return;
}
XINFO("xsched: listening on @%s", name.c_str());
struct pollfd fds[2];
fds[0].fd = fd;
fds[0].events = POLLIN;
fds[1].fd = g_sig_pipe_r;
fds[1].events = POLLIN;
while (!g_shutdown.load()) {
int ret = ::poll(fds, 2, 500);
if (ret < 0) {
if (errno == EINTR) continue;
break;
}
if (fds[0].revents & POLLIN) {
int cli = ::accept(fd, nullptr, nullptr);
if (cli < 0) {
XWARN("xsched: accept failed (%s)", ::strerror(errno));
continue;
}
struct ucred peer;
socklen_t peer_len = sizeof(peer);
if (::getsockopt(cli, SOL_SOCKET, SO_PEERCRED, &peer, &peer_len) == 0 &&
peer.uid != ::getuid()) {
XWARN("xsched: connection rejected from uid %d", (int)peer.uid);
::close(cli);
continue;
}
char buf[4096];
ssize_t n = ::read(cli, buf, sizeof(buf) - 1);
if (n < 0) {
XWARN("xsched: read error (%s)", ::strerror(errno));
::close(cli);
continue;
}
if (n == 0) {
::close(cli);
continue;
}
buf[n] = '\0';
while (n > 0 && (buf[n-1] == '\n' || buf[n-1] == '\r' || buf[n-1] == ' '))
buf[--n] = '\0';
char *eq = std::strchr(buf, '=');
std::string resp;
if (eq) {
*eq = '\0';
char *k = buf, *v = eq + 1;
while (*k == ' ' || *k == '\t') k++;
while (*v == ' ' || *v == '\t') v++;
resp = xsched_execute_line(k, v);
} else {
resp = xsched_execute_line(buf, "");
}
resp += '\n';
const char *wp = resp.c_str();
size_t remain = resp.size();
while (remain > 0) {
ssize_t written = ::write(cli, wp, remain);
if (written <= 0) break;
wp += written;
remain -= written;
}
::close(cli);
}
if (fds[1].revents & POLLIN) {
char byte;
while (::read(g_sig_pipe_r, &byte, 1) > 0) {}
xsched_process_config_file();
}
}
::close(fd);
}
SchedAgent SchedAgent::g_sched_agent;
SchedAgent::SchedAgent()
{
if (scheduler_ != nullptr) return;
XSchedulerType scheduler_type = kSchedulerUnknown;
XPolicyType policy_type = kPolicyUnknown;
char *scheduler_str = std::getenv(XSCHED_SCHEDULER_ENV_NAME);
char *policy_str = std::getenv(XSCHED_POLICY_ENV_NAME);
if (scheduler_str) scheduler_type = GetSchedulerType(scheduler_str);
if (policy_str) policy_type = GetPolicyType(policy_str);
SchedExecutor::Start();
InitScheduler(scheduler_type, policy_type);
if (policy_type != kPolicyUnknown) g_last_policy = policy_type;
static bool pipe_initialized = false;
if (!pipe_initialized) {
pipe_initialized = true;
int pipe_fds[2];
if (::pipe(pipe_fds) == 0) {
g_sig_pipe_r = pipe_fds[0];
g_sig_pipe_w = pipe_fds[1];
int flags = ::fcntl(g_sig_pipe_r, F_GETFL, 0);
::fcntl(g_sig_pipe_r, F_SETFL, flags | O_NONBLOCK);
} else {
XWARN("xsched: failed to create self-pipe (%s)", ::strerror(errno));
}
if (g_sig_pipe_r >= 0) {
struct sigaction sa;
sa.sa_handler = xsched_sigusr1_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
if (sigaction(SIGUSR1, &sa, nullptr) == 0) {
XINFO("xsched: policy switch via kill -USR1 %d (%s)", getpid(), XSCHED_CTL_FILE);
} else {
XWARN("xsched: failed to register SIGUSR1 handler (%s)", strerror(errno));
}
}
}
try {
std::thread(xsched_socket_listener).detach();
} catch (...) {
XWARN("xsched: failed to start socket listener thread");
}
}
SchedAgent::~SchedAgent()
{
if (scheduler_ == nullptr) return;
g_shutdown.store(true);
if (g_sig_pipe_w != -1) {
char byte = 1;
::write(g_sig_pipe_w, &byte, 1);
}
{
std::lock_guard<std::mutex> lock(g_sched_mutex);
SchedExecutor::Stop();
FiniScheduler();
}
}
void SchedAgent::RelayHint(std::shared_ptr<const sched::Hint> hint)
{
if (scheduler_ == nullptr) {
XWARN("scheduler not initialized, hint type(%d) dropped", hint->Type());
return;
}
scheduler_->RecvEvent(std::make_shared<HintEvent>(hint));
}
void SchedAgent::RelayEvent(std::shared_ptr<const Event> event)
{
if (scheduler_ == nullptr) {
XWARN("scheduler not initialized, event type(%d) dropped", event->Type());
return;
}
scheduler_->RecvEvent(event);
}
void SchedAgent::InitScheduler(XSchedulerType scheduler_type, XPolicyType policy_type)
{
if (scheduler_ != nullptr) return;
auto tmp_scheduler = CreateScheduler(scheduler_type, policy_type);
tmp_scheduler->SetExecutor(SchedExecutor::Execute);
tmp_scheduler->Run();
static const std::string kCmdline = []() -> std::string {
std::string cmdline;
#if defined(_WIN32)
LPWSTR cmdlineW = GetCommandLineW();
int size = WideCharToMultiByte(CP_UTF8, 0, cmdlineW, -1, NULL, 0, NULL, NULL);
if (size > 0) {
std::vector<char> buffer(size);
WideCharToMultiByte(CP_UTF8, 0, cmdlineW, -1, buffer.data(), size, NULL, NULL);
cmdline = buffer.data();
}
#elif defined(__linux__)
std::ifstream cmdline_file("/proc/self/cmdline");
if (cmdline_file.good()) {
std::string arg;
while (std::getline(cmdline_file, arg, '\0') && !arg.empty()) {
cmdline = cmdline + arg + " ";
}
}
cmdline_file.close();
#endif
return ShrinkString(cmdline, ProcessCreateEvent::CmdlineCapacity() - 1);
}();
auto event = std::make_shared<ProcessCreateEvent>(kCmdline);
tmp_scheduler->RecvEvent(event);
scheduler_ = tmp_scheduler;
}
void SchedAgent::FiniScheduler()
{
if (scheduler_ == nullptr) return;
auto tmp_scheduler = scheduler_;
scheduler_ = nullptr;
auto event = std::make_shared<ProcessDestroyEvent>();
tmp_scheduler->RecvEvent(event);
tmp_scheduler->Stop();
}
XResult SchedAgent::ReplaceScheduler(XSchedulerType scheduler_type, XPolicyType policy_type)
{
std::lock_guard<std::mutex> lock(g_sched_mutex);
if (scheduler_type <= kSchedulerUnknown || scheduler_type >= kSchedulerMax) {
XWARN("invalid scheduler type: %d", scheduler_type);
return kXSchedErrorInvalidValue;
}
if (scheduler_ == nullptr) {
XWARN("scheduler not initialized, replace scheduler failed");
return kXSchedErrorUnknown;
}
if (scheduler_->GetType() == scheduler_type) {
if (scheduler_type != kSchedulerLocal) return kXSchedSuccess;
if (policy_type <= kPolicyUnknown || policy_type >= kPolicyMax) {
XWARN("invalid policy type: %d", policy_type);
return kXSchedErrorInvalidValue;
}
auto lcl_scheduler = std::dynamic_pointer_cast<LocalScheduler>(scheduler_);
XASSERT(lcl_scheduler != nullptr, "scheduler is not a local scheduler");
lcl_scheduler->SetPolicy(policy_type);
return kXSchedSuccess;
}
FiniScheduler();
InitScheduler(scheduler_type, policy_type);
return kXSchedSuccess;
}
}