/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#pragma once
#include <Interpreters/Context_fwd.h>
#include <base/types.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool_fwd.h>

namespace local_engine
{

using JobId = String;
using Task = std::function<void()>;

class Job
{
    friend class JobScheduler;
public:
    explicit Job(const JobId& id)
        : id(id)
    {
    }

    void addTask(Task&& task)
    {
        tasks.emplace_back(task);
    }

private:
    JobId id;
    std::vector<Task> tasks;
};



struct JobSatus
{
    enum Status
    {
        RUNNING,
        FINISHED,
        FAILED
    };
    Status status;
    std::vector<String> messages;

    static JobSatus success()
    {
        return JobSatus{FINISHED};
    }

    static JobSatus running()
    {
        return JobSatus{RUNNING};
    }

    static JobSatus failed(const std::vector<std::string> & messages)
    {
        return JobSatus{FAILED, messages};
    }
};

struct TaskResult
{
    enum Status
    {
        SUCCESS,
        FAILED,
        RUNNING
    };
    Status status = RUNNING;
    String message;
};

class JobContext
{
public:
    Job job;
    std::unique_ptr<std::atomic_uint32_t> remain_tasks = std::make_unique<std::atomic_uint32_t>();
    std::vector<TaskResult> task_results;

    bool isFinished()
    {
        return remain_tasks->load(std::memory_order::relaxed) == 0;
    }
};

class JobScheduler
{
public:
    static JobScheduler & instance()
    {
        static JobScheduler global_job_scheduler;
        return global_job_scheduler;
    }

    static void initialize(const DB::ContextPtr & context);

    JobId scheduleJob(Job&& job);

    std::optional<JobSatus> getJobSatus(const JobId& job_id);

    void cleanupJob(const JobId& job_id);

    void addFinishedJob(const JobId& job_id);

    void cleanFinishedJobs();
    ~JobScheduler();
private:
    JobScheduler();
    std::unique_ptr<ThreadPool> thread_pool;
    std::unordered_map<JobId, JobContext> job_details;
    std::mutex job_details_mutex;

    std::vector<std::pair<JobId, Stopwatch>> finished_job;
    std::mutex finished_job_mutex;
    LoggerPtr logger = getLogger("JobScheduler");
};
}