/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

# pragma once

#include <mutex>
#include <condition_variable>
#include <atomic>
#include "DelayedWorkQueue.h"
#include "common.h"

class ScheduledTaskExecutor {
public:
    explicit ScheduledTaskExecutor(size_t numThreads) {
        workQueue = new DelayedWorkQueue();
        initWorks(numThreads);
    }

    void WorkerThreadProc() {
        ScheduledFutureTask* task;
        while (!stop.load()) {
            task = workQueue->Take();
            if (task == nullptr) {
                continue;
            }
            task->Run();
            if (!stop.load() && task->IsPeriodic() && !task->IsCancelled()) {
                task->SetNextRuntime();
                workQueue->Offer(task);
            }
        }
    }

    void initWorks(size_t numThreads) {
        for (size_t i = 0; i < numThreads; ++i) {
            workers.emplace_back([this] {WorkerThreadProc();});
        }
    }

    void Shutdown() {
        if (stop.exchange(true)) {
            return;
        }
        workQueue->Shutdown();
        workQueue->NotifyAll();
        for (std::thread& worker : workers) {
            if (worker.joinable()) {
                worker.join();
            }
        }
    }

    ScheduledFutureTask* Schedule(omnistream::Runnable* task, long initialDelay) {
        auto* futureTask = new ScheduledFutureTask(initialDelay, task);
        workQueue->Offer(futureTask);
        return futureTask;
    }

    ScheduledFutureTask* ScheduleWithFixedDelay(omnistream::Runnable* task, long initialDelay, long period) {
        ScheduledFutureTask* futureTask = new ScheduledFutureTask(initialDelay, period, task);
        workQueue->Offer(futureTask);
        return futureTask;
    }

    ScheduledFutureTask* ScheduleAtFixedRate(omnistream::Runnable* task, long initialDelay, long period) {
        NOT_IMPL_EXCEPTION;
    }

    ~ScheduledTaskExecutor() {
        Shutdown();
        delete workQueue;
    }
private:
    std::vector<std::thread> workers;
    std::atomic<bool> stop = false;
    DelayedWorkQueue* workQueue = nullptr;
};