/*
 * Copyright (c) 2024 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <condition_variable>
#include <mutex>
#include <thread>

#if defined(__OHOS_PLATFORM__) && !defined(BUILD_PUBLIC_VERSION)
#include <qos.h>
#include <sys/syscall.h>
#include <unistd.h>

#include "res_sched_client.h"
#endif

#include <base/containers/vector.h>

#include <meta/base/interface_macros.h>
#include <meta/base/namespace.h>
#include <meta/ext/object.h>
#include <meta/interface/intf_task_queue.h>
#include <meta/interface/intf_task_queue_registry.h>

#include "future.h"
#include "task_queue.h"

META_BEGIN_NAMESPACE()

constexpr uint32_t RES_TYPE_EXT_ENGINE_SET_QOS = 10028;

class ThreadedTaskQueue
    : public IntroduceInterfaces<MetaObject, IThreadedTaskQueue, ITaskQueueThreadInfo, TaskQueueImpl> {
    META_OBJECT(ThreadedTaskQueue, ClassId::ThreadedTaskQueue, IntroduceInterfaces)
public:
    using Token = ITaskQueue::Token;

    META_NO_COPY_MOVE(ThreadedTaskQueue)

    ThreadedTaskQueue() = default;
    ~ThreadedTaskQueue() override
    {
        Shutdown();
    }

    uint64_t CurrentThread() const
    {
        return std::hash<std::thread::id>{}(std::this_thread::get_id());
    }
    bool CurrentThreadIsExecutionThread() const override
    {
        return CurrentThread() == threadId_;
    }

    bool SetExecutionThread()
    {
        auto id = CurrentThread();
        if (threadId_ != 0) {
            return (threadId_ == id);
        }
        threadId_ = id;
        return true;
    }

    bool Build(const IMetadata::Ptr& data) override
    {
        bool ret = Super::Build(data);
        if (ret) {
            self_ = GetSelf<ITaskQueue>();
            thread_ = std::thread([this]() {
                SetExecutionThread();
                ProcessTasks();
            });
        }
        return ret;
    }

    bool InvokeTask(const ITaskQueueTask::Ptr& task) override
    {
        if (task) {
            auto q = GetTaskQueueRegistry().SetCurrentTaskQueue(self_);
            auto ret = task->Invoke();
            GetTaskQueueRegistry().SetCurrentTaskQueue(q);
            return ret;
        }
        return false;
    }

    void Shutdown() override
    {
        Close();
        addCondition_.notify_one();
        if (thread_.joinable()) {
            thread_.join();
        }
    }

    void CancelTask(Token token) override
    {
        TaskQueueImpl::CancelTask(token);
    }

    Token AddTask(ITaskQueueTask::Ptr p) override
    {
        return AddTask(BASE_NS::move(p), TimeSpan::Milliseconds(0));
    }

    Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay) override
    {
        auto t = TaskQueueImpl::AddTask(BASE_NS::move(p), delay, Time() + delay);
        if (t) {
            addCondition_.notify_one();
        }
        return t;
    }

    IFuture::Ptr AddWaitableTask(ITaskQueueWaitableTask::Ptr p) override
    {
        IPromise::Ptr promise(new Promise);
        BASE_NS::shared_ptr<PromisedQueueTask> task(new PromisedQueueTask(BASE_NS::move(p), promise));
        auto f = task->GetFuture();
        AddTask(BASE_NS::move(task));
        return f;
    }

    void ProcessTasks()
    {
        std::unique_lock lock{mutex_};
        execThread_ = std::this_thread::get_id();

#if defined(__OHOS_PLATFORM__) && !defined(BUILD_PUBLIC_VERSION)
        int ret = OHOS::QOS::SetThreadQos(OHOS::QOS::QosLevel::QOS_USER_INTERACTIVE);
        CORE_LOG_I("set engine child thread qos %s", ret == 0 ? "success" : "failed");
        auto tid = syscall(SYS_gettid);
        if (tid > 0) {
            std::unordered_map<std::string, std::string> mapPayload{
                {"pid", std::to_string(getpid())}, {"tid", std::to_string(tid)}};
            CORE_LOG_I("ReportEngineResType %s %s", mapPayload["pid"].c_str(), mapPayload["tid"].c_str());
            OHOS::ResourceSchedule::ResSchedClient::GetInstance().ReportData(
                RES_TYPE_EXT_ENGINE_SET_QOS, 1, mapPayload);
        }
#endif

        while (!terminate_) {
            if (!tasks_.empty()) {
                TimeSpan delta = tasks_.back().executeTime - Time();
                // wait for next execute time (or trigger which ever is first). and see if we can now process things..
                // technically we will always be a bit late here. "it's a best effort"
                if (delta > TimeSpan::Microseconds(0)) {
                    addCondition_.wait_for(lock, std::chrono::microseconds(delta.ToMicroseconds()));
                }
            } else {
                // infinite wait, since the queue is empty..
                addCondition_.wait(lock);
            }
            auto curTime = Time();
            TaskQueueImpl::ProcessTasks(lock, curTime);
        }

#if defined(__OHOS_PLATFORM__) && !defined(BUILD_PUBLIC_VERSION)
        std::unordered_map<std::string, std::string> mapPayload{
            {"pid", std::to_string(getpid())}, {"tid", std::to_string(tid)}};
        CORE_LOG_I("ReportEngineResType %s %s", mapPayload["pid"].c_str(), mapPayload["tid"].c_str());
        OHOS::ResourceSchedule::ResSchedClient::GetInstance().ReportData(RES_TYPE_EXT_ENGINE_SET_QOS, 0, mapPayload);
#endif
    }

private:
    uint64_t threadId_{0};
    std::condition_variable addCondition_;
    std::thread thread_;
};

namespace Internal {

IObjectFactory::Ptr GetThreadedTaskQueueFactory()
{
    return ThreadedTaskQueue::GetFactory();
}

}  // namespace Internal

META_END_NAMESPACE()