/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

#ifndef OMNISTREAM_SNAPSHOTSTRATEGYRUNNER
#define OMNISTREAM_SNAPSHOTSTRATEGYRUNNER

#include "SnapshotExecutionType.h"
#include "SnapshotResult.h"
#include "CheckpointStreamFactory.h"
#include "AsyncSnapshotCallable.h"
#include "SnapshotStrategy.h"
#include "SnapshotResources.h"
#include "runtime/checkpoint/CheckpointOptions.h"
#include "KeyedStateHandle.h"
#include "runtime/state/bridge/OmniTaskBridge.h"
#include <string>
#include <future>
#include <chrono>
#include <memory>

template <typename T, typename SR>
class SnapshotStrategyRunner {
public:
    SnapshotStrategyRunner() {};
    SnapshotStrategyRunner(std::string description, SnapshotStrategy<T, SR>* snapshotStrategy, SnapshotExecutionType executionType)
        : description_(description), snapshotStrategy_(snapshotStrategy), executionType_(executionType)
        {}
    ~SnapshotStrategyRunner() {};

    std::shared_ptr<std::packaged_task<std::shared_ptr<SnapshotResult<T>>()>> snapshot(
        long checkpointId,
        long timestamp,
        CheckpointStreamFactory* streamFactory,
        CheckpointOptions* checkpointOptions,
        std::shared_ptr<omnistream::OmniTaskBridge> bridge,
        std::string keySerializer)
    {
        try {
            auto snapshotResources = snapshotStrategy_->syncPrepareResources(checkpointId);
            auto asyncSnapshot = snapshotStrategy_->asyncSnapshot(snapshotResources, checkpointId, timestamp, streamFactory,
                                                                  checkpointOptions, keySerializer);
            auto task = std::make_shared<std::packaged_task<std::shared_ptr<SnapshotResult<T>>()>>(
                [=]() {
                    try {
                        auto res = asyncSnapshot->get(bridge);
                        snapshotResources->cleanup();
                        return res;
                    } catch (const std::exception &e) {
                        snapshotResources->cleanup();
                        throw e;
                    }
                });

            if (executionType_ == SnapshotExecutionType::SYNCHRONOUS) {
                try {
                    auto res = asyncSnapshot->get(bridge);
                    snapshotResources->cleanup();
                    if (res) {
                        LOG("native rocksdb checkpoint has been finished.");
                    }
                } catch (const std::exception &e) {
                    snapshotResources->cleanup();
                    throw e;
                }
            }
            return task;
        } catch (const std::exception &e) {
            INFO_RELEASE("Error:SnapshotStrategyRunner[" << description_
                << "]: snapshot pipeline failed during preparation, checkpointId=" << checkpointId
                << ", exception=" << e.what());
            throw;
        } catch (...) {
            INFO_RELEASE("Error:SnapshotStrategyRunner[" << description_
                << "]: snapshot pipeline failed during preparation, checkpointId=" << checkpointId
                << ", exception=unknown");
            throw;
        }
    }

private:
    std::string description_;
    SnapshotStrategy<T, SR>* snapshotStrategy_;
    SnapshotExecutionType executionType_;
};

#endif // OMNISTREAM_SNAPSHOTSTRATEGYRUNNER