* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef FLINK_TNEL_ABSTRACTFSCHECKPOINTSTORAGEACCESS_H
#define FLINK_TNEL_ABSTRACTFSCHECKPOINTSTORAGEACCESS_H
#include "runtime/state/StreamStateHandle.h"
#include "runtime/state/CheckpointStorageAccess.h"
#include "core/fs/Path.h"
#include "runtime/state/CheckpointStorageLocationReference.h"
#include "core/include/common.h"
#include "runtime/executiongraph/JobIDPOD.h"
#include "runtime/state/CheckpointStorageAccess.h"
namespace omnistream {
class AbstractFsCheckpointStorageAccess : public CheckpointStorageAccess {
public:
inline static const std::string CHECKPOINT_DIR_PREFIX{"chk-"};
inline static const std::string CHECKPOINT_SHARED_STATE_DIR{"shared"};
inline static const std::string CHECKPOINT_TASK_OWNED_STATE_DIR{"taskowned"};
inline static const std::string METADATA_FILE_NAME{"_metadata"};
static constexpr unsigned char REFERENCE_MAGIC_NUMBER[4] = {0x05, 0x5F, 0x3F, 0x18};
protected:
static Path *createCheckpointDirectory(const Path &baseDirectory, int64_t checkpointId)
{
return new Path(baseDirectory, std::string(CHECKPOINT_DIR_PREFIX) + std::to_string(checkpointId));
};
static Path *decodePathFromReference(std::shared_ptr<CheckpointStorageLocationReference> reference)
{
if (reference->IsDefaultReference()) {
THROW_LOGIC_EXCEPTION("Cannot decode default reference")
}
auto bytes = reference->GetReferenceBytes();
size_t headerLen = 4;
if (bytes->size() > headerLen) {
for (size_t i = 0; i < headerLen; i++) {
if (static_cast<unsigned char>(bytes->at(i)) != REFERENCE_MAGIC_NUMBER[i]) {
THROW_LOGIC_EXCEPTION("Reference starts with the wrong magic number")
}
}
try {
std::string pathStr(bytes->begin() + headerLen, bytes->end());
return new Path(pathStr);
}
catch (...) {
THROW_LOGIC_EXCEPTION("Reference cannot be decoded to a path")
}
} else {
THROW_LOGIC_EXCEPTION("Reference too short.")
}
};
Path *getCheckpointDirectoryForJob(const Path &baseDirectory, const omnistream::JobIDPOD &jobId)
{
return new Path(baseDirectory, jobId.AbstractIDPOD::toString());
}
};
}
#endif