* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef FLINK_TNEL_SNAPSHOTRESULT_H
#define FLINK_TNEL_SNAPSHOTRESULT_H
#include <iostream>
#include <memory>
#include "StateObject.h"
* This class contains the combined results from the snapshot of a state backend:
*
* - A state object representing the state that will be reported to the Job Manager
* to acknowledge the checkpoint.
* - A state object that represents the state for the TaskLocalStateStoreImpl.
*
* Both state objects are optional and can be nullptr, for example if there was no state
* to snapshot in the backend. If the local state object is not nullptr, the job manager
* state must also not be nullptr, because the Job Manager always owns the ground truth
* about the checkpointed state.
*/
template <typename T>
class SnapshotResult : public StateObject {
static_assert(std::is_base_of<StateObject, T>::value, "T must inherit from StateObject");
public:
const std::shared_ptr<T>& GetJobManagerOwnedSnapshot() const
{
return jobManagerOwnedSnapshot;
}
const std::shared_ptr<T>& GetTaskLocalSnapshot() const
{
return taskLocalSnapshot;
}
void DiscardState() override
{
std::exception_ptr aggregatedExceptions = nullptr;
if (jobManagerOwnedSnapshot != nullptr) {
try {
jobManagerOwnedSnapshot->DiscardState();
} catch (...) {
aggregatedExceptions = std::current_exception();
}
}
if (taskLocalSnapshot != nullptr) {
try {
taskLocalSnapshot->DiscardState();
} catch (...) {
if (aggregatedExceptions) {
std::cerr << "Suppressed exception during DiscardState()" << std::endl;
} else {
aggregatedExceptions = std::current_exception();
}
}
}
if (aggregatedExceptions) {
std::rethrow_exception(aggregatedExceptions);
}
}
long GetStateSize() const override
{
return jobManagerOwnedSnapshot != nullptr ? jobManagerOwnedSnapshot->GetStateSize() : 0L;
}
static std::shared_ptr<SnapshotResult<T>> Empty()
{
return EMPTY;
}
static std::shared_ptr<SnapshotResult<T>> Of(std::shared_ptr<T> jobManagerState)
{
return (jobManagerState != nullptr) ? std::make_shared<SnapshotResult<T>>(jobManagerState, nullptr)
: Empty();
}
static std::shared_ptr<SnapshotResult<T>>
WithLocalState(std::shared_ptr<T> jobManagerState, std::shared_ptr<T> localState)
{
return std::make_shared<SnapshotResult<T>>(jobManagerState, localState);
}
* Creates a SnapshotResult for the given jobManagerOwnedSnapshot and taskLocalSnapshot.
* If jobManagerOwnedSnapshot is null, then taskLocalSnapshot must also be null.
* @param jobManagerOwnedSnapshot Snapshot to report to the job manager. Can be nullptr.
* @param taskLocalSnapshot Optional snapshot to report to the local state manager.
* If provided, jobManagerOwnedSnapshot must not be nullptr.
*/
SnapshotResult(std::shared_ptr<T> jobManagerOwnedSnapshot, std::shared_ptr<T> taskLocalSnapshot)
: jobManagerOwnedSnapshot(jobManagerOwnedSnapshot), taskLocalSnapshot(taskLocalSnapshot)
{
if (jobManagerOwnedSnapshot == nullptr && taskLocalSnapshot != nullptr) {
throw std::logic_error("Cannot report local state snapshot without corresponding remote state!");
}
}
std::string ToString() const override
{
return "ToString returns empty in SnapshotResult";
}
static const std::shared_ptr<SnapshotResult<T>> EMPTY;
private:
const std::shared_ptr<T> jobManagerOwnedSnapshot;
const std::shared_ptr<T> taskLocalSnapshot;
};
template <typename T>
const std::shared_ptr<SnapshotResult<T>> SnapshotResult<T>::EMPTY =
std::make_shared<SnapshotResult<T>>(nullptr, nullptr);
#endif