/*
 * Copyright(C) 2020. Huawei Technologies Co.,Ltd. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <csignal>
#include <mutex>
#include <thread>

#include "MxBase/ConfigUtil/ConfigUtil.h"
#include "MxBase/Log/Log.h"
#include "MxStream/DataType/DataHelper.h"
#include "MxStream/Stream/SequentialStream.h"

using namespace MxBase;
using namespace MxStream;

namespace
{
const std::string CONFIG_PATH = "./config/setup.config";
const int SIGNAL_CHECK_TIMESTEP = 10000;
bool g_signalQuit = false;
std::mutex g_mutex;
std::vector<std::thread> g_threads;
std::map<std::string, std::shared_ptr<SequentialStream>> g_sequentialStreams;

static void SigHandler(int signal)
{
    if (signal == SIGINT)
    {
        g_signalQuit = true;
    }
}

struct StreamConfig
{
    size_t channelCount;
    std::string deviceId;
    std::vector<std::string> rtspUrls;
    std::string inputVideoFormat;
    std::string outputImageFormat;
    std::string inputFormat;
    std::string outputFormat;
    std::string videoEncodeWidth;
    std::string videoEncodeHeight;
    std::string videoEncodeFpsMode;
    std::string iFrameInterval;
    std::string resizeWidth;
    std::string resizeHeight;
    std::string fpsMode;

    const std::map<std::string, std::string> QueueProperties = {{"max-size-buffers", "50"}};

    std::map<std::string, std::string> GetRtspProperties(size_t chlIdx) const
    {
        std::map<std::string, std::string> properties = {
            {"rtspUrl", rtspUrls[chlIdx]},
            {"channelId", std::to_string(chlIdx)},
            {"fpsMode", fpsMode},
        };
        return properties;
    }

    std::map<std::string, std::string> GetVideoDecoderProperties(size_t chlIdx) const
    {
        std::map<std::string, std::string> properties = {{"inputVideoFormat", inputVideoFormat},
                                                         {"outputImageFormat", outputImageFormat}};
        return properties;
    }

    std::map<std::string, std::string> GetImageResizeProperties() const
    {
        std::map<std::string, std::string> properties = {{"resizeWidth", resizeWidth}, {"resizeHeight", resizeHeight}};
        return properties;
    }

    std::map<std::string, std::string> GetVideoEncodeProperties() const
    {
        std::map<std::string, std::string> properties = {
            {"imageHeight", videoEncodeHeight}, {"imageWidth", videoEncodeWidth}, {"inputFormat", inputFormat},
            {"outputFormat", outputFormat},     {"fps", videoEncodeFpsMode},      {"iFrameInterval", iFrameInterval}};
        return properties;
    }
};

APP_ERROR GetValueFromFile(const ConfigData &configData, StreamConfig &config)
{
    if (configData.GetFileValue("stream.channelCount", config.channelCount) != APP_ERR_OK)
    {
        return APP_ERR_COMM_INVALID_PARAM;
    }
    if (configData.GetFileValue("stream.deviceId", config.deviceId) != APP_ERR_OK)
    {
        return APP_ERR_COMM_INVALID_PARAM;
    }
    if (configData.GetFileValue("stream.fpsMode", config.fpsMode) != APP_ERR_OK)
    {
        return APP_ERR_COMM_INVALID_PARAM;
    }
    if (configData.GetFileValue("stream.resizeWidth", config.resizeWidth) != APP_ERR_OK)
    {
        return APP_ERR_COMM_INVALID_PARAM;
    }
    if (configData.GetFileValue("stream.resizeHeight", config.resizeHeight) != APP_ERR_OK)
    {
        return APP_ERR_COMM_INVALID_PARAM;
    }
    if (configData.GetFileValue("VideoDecoder.inputVideoFormat", config.inputVideoFormat) != APP_ERR_OK)
    {
        return APP_ERR_COMM_INVALID_PARAM;
    }
    if (configData.GetFileValue("VideoDecoder.outputImageFormat", config.outputImageFormat) != APP_ERR_OK)
    {
        return APP_ERR_COMM_INVALID_PARAM;
    }
    if (configData.GetFileValue("VideoEncoder.inputFormat", config.inputFormat) != APP_ERR_OK)
    {
        return APP_ERR_COMM_INVALID_PARAM;
    }
    if (configData.GetFileValue("VideoEncoder.outputFormat", config.outputFormat) != APP_ERR_OK)
    {
        return APP_ERR_COMM_INVALID_PARAM;
    }
    if (configData.GetFileValue("VideoEncoder.imageWidth", config.videoEncodeWidth) != APP_ERR_OK)
    {
        return APP_ERR_COMM_INVALID_PARAM;
    }
    if (configData.GetFileValue("VideoEncoder.imageHeight", config.videoEncodeHeight) != APP_ERR_OK)
    {
        return APP_ERR_COMM_INVALID_PARAM;
    }
    if (configData.GetFileValue("VideoEncoder.fpsMode", config.videoEncodeFpsMode) != APP_ERR_OK)
    {
        return APP_ERR_COMM_INVALID_PARAM;
    }
    if (configData.GetFileValue("VideoEncoder.iFrameInterval", config.iFrameInterval) != APP_ERR_OK)
    {
        return APP_ERR_COMM_INVALID_PARAM;
    }
    return APP_ERR_OK;
}

APP_ERROR ParseFromConfig(const std::string &path, StreamConfig &config)
{
    MxBase::ConfigUtil util;
    MxBase::ConfigData configData;
    util.LoadConfiguration(path, configData, MxBase::CONFIGFILE);

    if (GetValueFromFile(configData, config) != APP_ERR_OK)
    {
        LogError << "Some fields are empty or invalid in the config file.";
        return APP_ERR_COMM_INVALID_PARAM;
    }
    for (size_t i = 0; i < config.channelCount; ++i)
    {
        auto name = "stream.ch" + std::to_string(i);
        std::string value;
        auto ret = configData.GetFileValue(name, value);
        if (ret != APP_ERR_OK)
        {
            LogError << "Please check rtsp param.";
            return APP_ERR_COMM_INVALID_PARAM;
        }
        config.rtspUrls.push_back(value);
    }

    if (config.channelCount > config.rtspUrls.size())
    {
        LogError << "Please check param.";
        return APP_ERR_COMM_INVALID_PARAM;
    }

    return APP_ERR_OK;
}

APP_ERROR CreateSingleStream(const StreamConfig &config, size_t chlIdx)
{
    auto streamName = "stream" + std::to_string(chlIdx);
    std::shared_ptr<SequentialStream> sequentialStream(new SequentialStream(streamName));
    if (sequentialStream == nullptr)
    {
        return APP_ERR_COMM_FAILURE;
    }
    sequentialStream->SetDeviceId(config.deviceId);
    sequentialStream->Add(
        PluginNode("mxpi_rtspsrc", config.GetRtspProperties(chlIdx), "mxpi_rtspsrc" + std::to_string(chlIdx)));
    sequentialStream->Add(PluginNode("queue", config.QueueProperties));
    sequentialStream->Add(PluginNode("mxpi_videodecoder", config.GetVideoDecoderProperties(chlIdx)));
    sequentialStream->Add(PluginNode("queue", config.QueueProperties));
    sequentialStream->Add(PluginNode("mxpi_imageresize", config.GetImageResizeProperties()));
    sequentialStream->Add(PluginNode("queue", config.QueueProperties));
    sequentialStream->Add(PluginNode("mxpi_videoencoder", config.GetVideoEncodeProperties()));
    sequentialStream->Add(PluginNode("queue", config.QueueProperties));
    sequentialStream->Add(PluginNode("fakesink"));

    auto ret = sequentialStream->Build();
    if (ret != APP_ERR_OK)
    {
        g_signalQuit = true;
        LogError << GetErrorInfo(ret) << "Failed to build stream.";
        return ret;
    }

    std::lock_guard<std::mutex> lock(g_mutex);
    g_sequentialStreams[streamName] = sequentialStream;
    return APP_ERR_OK;
}

APP_ERROR CreateMultiStreams(const StreamConfig &config)
{
    g_threads.resize(config.channelCount);
    for (auto i = 0; i < config.channelCount; ++i)
    {
        g_threads[i] = std::thread(CreateSingleStream, config, i);
        if (g_signalQuit)
        {
            break;
        }
    }
    LogInfo << "Totally " << config.channelCount << " streams were created.";

    return APP_ERR_OK;
}

APP_ERROR StopMultiStreams()
{
    std::lock_guard<std::mutex> lock(g_mutex);
    std::vector<std::thread> threads(g_sequentialStreams.size());
    int i = 0;
    for (auto iter = g_sequentialStreams.begin(); iter != g_sequentialStreams.end(); ++iter)
    {
        threads[i] = std::thread([](std::shared_ptr<SequentialStream> stream) { stream->Stop(); }, iter->second);
        ++i;
    }
    for (auto &th : threads)
    {
        th.join();
    }
    g_sequentialStreams.clear();
    LogInfo << "All streams were stoppped successfully.";
    return APP_ERR_OK;
}
}  // namespace

int main(int argc, char *argv[])
{
    StreamConfig config;
    APP_ERROR ret = ParseFromConfig(CONFIG_PATH, config);
    if (ret != APP_ERR_OK)
    {
        LogError << "Parse config file failed.";
        return APP_ERR_COMM_FAILURE;
    }

    ret = CreateMultiStreams(config);
    if (ret != APP_ERR_OK)
    {
        LogError << "Create Streams failed.";
        return APP_ERR_COMM_FAILURE;
    }

    signal(SIGINT, SigHandler);

    while (!g_signalQuit)
    {
        usleep(SIGNAL_CHECK_TIMESTEP);
    }

    ret = StopMultiStreams();
    if (ret != APP_ERR_OK)
    {
        LogError << "Stop Streams failed.";
        return APP_ERR_COMM_FAILURE;
    }

    for (auto &th : g_threads)
    {
        th.join();
    }

    return 0;
}