#include <gtest/gtest.h>
#include <string>
#include <memory>

#include "runtime/checkpoint/TaskStateSnapshotSerializer.h"

TEST(TaskStateSnapshotSerializerTest, SerializerToJson) {
    std::unordered_map<OperatorID, std::shared_ptr<OperatorSubtaskState>> subtaskStatesByOperatorID;

    auto managedOperatorState = StateObjectCollection<OperatorStateHandle>::Empty();
    auto rawOperatorState = StateObjectCollection<OperatorStateHandle>::Empty();
    auto rawKeyedState = StateObjectCollection<KeyedStateHandle>::Empty();
    auto managedKeyedState = StateObjectCollection<KeyedStateHandle>::Empty();

    auto inputChannelState = StateObjectCollection<InputChannelStateHandle>::Empty();
    auto resultSubpartitionState = StateObjectCollection<ResultSubpartitionStateHandle>::Empty();
    auto inputRescalingDescriptor = InflightDataRescalingDescriptor::noRescale;
    auto outputRescalingDescriptor = InflightDataRescalingDescriptor::noRescale;

    std::string dirName = (std::filesystem::temp_directory_path() / "IncLocalKeyedHandleTest").string();
    std::string fileName = "metaState.bin";
    std::string content = "TestFileForIncLocalKeyedHandle";
    std::filesystem::remove_all(dirName);
    std::filesystem::create_directories(dirName);
    std::ofstream ofs(dirName + "/" + fileName, std::ios::binary);
    ofs << content;
    ofs.close();

    Path path(dirName);
    DirectoryStateHandle originalDirHandle = DirectoryStateHandle::forPathWithSize(path);
    long expectedDirSize = originalDirHandle.GetStateSize();

    std::string metaFilePath = dirName + "/" + fileName;
    Path metaPath(metaFilePath);
    long metaStateSize = std::filesystem::file_size(metaFilePath);

    nlohmann::json metaDataState = {
            {"@class", "org.apache.flink.runtime.state.filesystem.FileStateHandle"},
            {"stateHandleName", "FileStateHandle"},
            {"streamStateHandleID", {
                               {"@class", "org.apache.flink.runtime.state.PhysicalStateHandleID"},
                               {"keyString", metaPath.toString()}
                       }},
            {"stateSize", metaStateSize}
    };

    nlohmann::json jsonObj;
    jsonObj["checkpointId"] = 1234;
    jsonObj["backendIdentifier"] = "01234567-89ab-cdef-0123-456789abcdef";
    jsonObj["metaDataState"] = metaDataState;
    jsonObj["directoryStateHandle"] = {
            {"@class", "org.apache.flink.runtime.state.DirectoryStateHandle"},
            {"directoryString", path.toString()},
            {"stateSize", expectedDirSize},
            {"directory", "file://" + path.toString() + "/"},
    };
    jsonObj["keyGroupRange"] = {
            {"startKeyGroup", 0},
            {"endKeyGroup", 9}
    };

    nlohmann::json sharedHandle1 = {
            {"handle", {
                           {"@class", "org.apache.flink.runtime.state.filesystem.FileStateHandle"},
                           {"stateHandleName", "FileStateHandle"},
                           {"filePath", metaPath.toString()},
                           {"stateSize", metaStateSize}
                       }},
            {"localPath", "/local/path/1"}
    };
    nlohmann::json sharedHandle2 = {
            {"handle", nullptr},
            {"localPath", "/local/path/2"}
    };
    jsonObj["sharedState"] = nlohmann::json::array({sharedHandle1, sharedHandle2});

    auto handle = std::make_shared<IncrementalLocalKeyedStateHandle>(jsonObj);
    managedKeyedState->Add(handle);

    auto operatorSubtaskState = std::make_shared<OperatorSubtaskState>(*managedOperatorState,
       *rawOperatorState,
       *managedKeyedState,
       *rawKeyedState,
       *inputChannelState,
       *resultSubpartitionState,
       inputRescalingDescriptor,
       outputRescalingDescriptor);

    subtaskStatesByOperatorID[OperatorID()] = operatorSubtaskState;

    auto taskStateSnapshot = std::make_shared<TaskStateSnapshot>(subtaskStatesByOperatorID, false, false);

    auto res = TaskStateSnapshotSerializer::Serialize(taskStateSnapshot);

    std::cout << to_string(res) << std::endl;
}