* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNISTREAM_SNAPSHOTSTRATEGYRUNNER
#define OMNISTREAM_SNAPSHOTSTRATEGYRUNNER
#include "SnapshotExecutionType.h"
#include "SnapshotResult.h"
#include "CheckpointStreamFactory.h"
#include "AsyncSnapshotCallable.h"
#include "SnapshotStrategy.h"
#include "SnapshotResources.h"
#include "runtime/checkpoint/CheckpointOptions.h"
#include "KeyedStateHandle.h"
#include "runtime/state/bridge/OmniTaskBridge.h"
#include <string>
#include <future>
#include <chrono>
#include <memory>
template <typename T, typename SR>
class SnapshotStrategyRunner {
public:
SnapshotStrategyRunner() {};
SnapshotStrategyRunner(std::string description, SnapshotStrategy<T, SR>* snapshotStrategy, SnapshotExecutionType executionType)
: description_(description), snapshotStrategy_(snapshotStrategy), executionType_(executionType)
{}
~SnapshotStrategyRunner() {};
std::shared_ptr<std::packaged_task<std::shared_ptr<SnapshotResult<T>>()>> snapshot(
long checkpointId,
long timestamp,
CheckpointStreamFactory* streamFactory,
CheckpointOptions* checkpointOptions,
std::shared_ptr<omnistream::OmniTaskBridge> bridge,
std::string keySerializer)
{
try {
auto snapshotResources = snapshotStrategy_->syncPrepareResources(checkpointId);
auto asyncSnapshot = snapshotStrategy_->asyncSnapshot(snapshotResources, checkpointId, timestamp, streamFactory,
checkpointOptions, keySerializer);
auto task = std::make_shared<std::packaged_task<std::shared_ptr<SnapshotResult<T>>()>>(
[=]() {
try {
auto res = asyncSnapshot->get(bridge);
snapshotResources->cleanup();
return res;
} catch (const std::exception &e) {
snapshotResources->cleanup();
throw e;
}
});
if (executionType_ == SnapshotExecutionType::SYNCHRONOUS) {
try {
auto res = asyncSnapshot->get(bridge);
snapshotResources->cleanup();
if (res) {
LOG("native rocksdb checkpoint has been finished.");
}
} catch (const std::exception &e) {
snapshotResources->cleanup();
throw e;
}
}
return task;
} catch (const std::exception &e) {
INFO_RELEASE("Error:SnapshotStrategyRunner[" << description_
<< "]: snapshot pipeline failed during preparation, checkpointId=" << checkpointId
<< ", exception=" << e.what());
throw;
} catch (...) {
INFO_RELEASE("Error:SnapshotStrategyRunner[" << description_
<< "]: snapshot pipeline failed during preparation, checkpointId=" << checkpointId
<< ", exception=unknown");
throw;
}
}
private:
std::string description_;
SnapshotStrategy<T, SR>* snapshotStrategy_;
SnapshotExecutionType executionType_;
};
#endif