#include <gtest/gtest.h>
#include "runtime/state/TaskStateManager.cpp"
#include "runtime/state/LocalRecoveryDirectoryProviderImpl.h"
using namespace omnistream;
TEST(TaskStateManagerTest, ReportAndComplete)
{
    JobIDPOD jobId;
    ExecutionAttemptIDPOD attemptId;
    omnistream::JobVertexID vertexId(0, 0);
    const std::vector<std::filesystem::path> path = {"/tmp"};
    auto dirProvider = std::make_shared<LocalRecoveryDirectoryProviderImpl>(path, jobId, vertexId, 0);
    auto localRecoveryConfig = std::make_shared<LocalRecoveryConfig>(dirProvider);
    auto stateStore = new TaskLocalStateStore(jobId, vertexId, 0, localRecoveryConfig);
    auto responder = new NoOpCheckpoingResponder();
    auto manager = new TaskStateManager(jobId, attemptId, stateStore, responder);
    auto metrics = new CheckpointMetrics(1, 1, 1, 1, 1, 1, false, 1, 1);
    auto snapshot = std::make_shared<TaskStateSnapshot>(4, true);
    auto metadata = new CheckpointMetaData(0, 100);

    manager->ReportTaskStateSnapshots(metadata, metrics, nullptr, snapshot);

    EXPECT_EQ(stateStore->retrieveLocalState(0)->ToString(), snapshot->ToString());

    manager->NotifyCheckpointComplete(1);

    EXPECT_EQ(stateStore->retrieveLocalState(0), nullptr);

    delete manager;
    delete metadata;
    delete metrics;
}

TEST(TaskStateManagerTest, ReportAndAbort)
{
    JobIDPOD jobId;
    ExecutionAttemptIDPOD attemptId;
    omnistream::JobVertexID vertexId(0, 0);
    const std::vector<std::filesystem::path> path = {"/tmp"};
    auto dirProvider = std::make_shared<LocalRecoveryDirectoryProviderImpl>(path, jobId, vertexId, 0);
    auto localRecoveryConfig = std::make_shared<LocalRecoveryConfig>(dirProvider);
    auto stateStore = new TaskLocalStateStore(jobId, vertexId, 0, localRecoveryConfig);
    auto responder = new NoOpCheckpoingResponder();
    auto manager = new TaskStateManager(jobId, attemptId, stateStore, responder);
    auto metrics = new CheckpointMetrics(1, 1, 1, 1, 1, 1, false, 1, 1);
    auto snapshot = std::make_shared<TaskStateSnapshot>(4, true);
    auto metadata = new CheckpointMetaData(0, 100);

    manager->ReportTaskStateSnapshots(metadata, metrics, nullptr, snapshot);

    EXPECT_EQ(*stateStore->retrieveLocalState(0), *snapshot);

    manager->NotifyCheckpointAborted(1);

    EXPECT_EQ(*stateStore->retrieveLocalState(0), *snapshot);

    manager->NotifyCheckpointAborted(0);

    EXPECT_EQ(stateStore->retrieveLocalState(0), nullptr);

    delete manager;
    delete metadata;
    delete metrics;
}