* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNISTREAM_CHECKPOINTCONFIGPOD_H
#define OMNISTREAM_CHECKPOINTCONFIGPOD_H
#include <string>
#include <sstream>
#include <cstdint>
#include <nlohmann/json.hpp>
namespace omnistream {
class CheckpointConfigPOD {
public:
CheckpointConfigPOD() = default;
CheckpointConfigPOD(
std::string checkpointsDirectory, std::string stateBackend, std::string checkpointStorage,
int maxRetainedCheckpoint, bool asyncSnapshots, bool incrementalCheckpoints, bool localRecovery,
std::string localRecoveryTaskManagerStateRootDirs, std::string savepointDirectory,
std::int64_t fsSmallFileThreshold, int fsWriteBufferSize)
: checkpointsDirectory(std::move(checkpointsDirectory)), stateBackend(std::move(stateBackend)),
checkpointStorage(std::move(checkpointStorage)), maxRetainedCheckpoint(maxRetainedCheckpoint),
asyncSnapshots(asyncSnapshots), incrementalCheckpoints(incrementalCheckpoints), localRecovery(localRecovery),
localRecoveryTaskManagerStateRootDirs(std::move(localRecoveryTaskManagerStateRootDirs)),
savepointDirectory(std::move(savepointDirectory)), fsSmallFileThreshold(fsSmallFileThreshold),
fsWriteBufferSize(fsWriteBufferSize)
{}
std::string getCheckpointsDirectory() const
{
return checkpointsDirectory;
}
void setCheckpointsDirectory(const std::string& input)
{
checkpointsDirectory = input;
}
std::string getStateBackend() const
{
return stateBackend;
}
void setStateBackend(const std::string& input)
{
stateBackend = input;
}
std::string getCheckpointStorage() const
{
return checkpointStorage;
}
void setCheckpointStorage(const std::string& input)
{
checkpointStorage = input;
}
int getMaxRetainedCheckpoint() const
{
return maxRetainedCheckpoint;
}
void setMaxRetainedCheckpoint(int input)
{
maxRetainedCheckpoint = input;
}
bool getAsyncSnapshots() const
{
return asyncSnapshots;
}
void setAsyncSnapshots(bool input)
{
asyncSnapshots = input;
}
bool getIncrementalCheckpoints() const
{
return incrementalCheckpoints;
}
void setIncrementalCheckpoints(bool input)
{
incrementalCheckpoints = input;
}
bool getLocalRecovery() const
{
return localRecovery;
}
void setLocalRecovery(bool input)
{
localRecovery = input;
}
std::string getLocalRecoveryTaskManagerStateRootDirs() const
{
return localRecoveryTaskManagerStateRootDirs;
}
void setLocalRecoveryTaskManagerStateRootDirs(const std::string& input)
{
localRecoveryTaskManagerStateRootDirs = input;
}
std::string getSavepointDirectory() const
{
return savepointDirectory;
}
void setSavepointDirectory(const std::string& input)
{
savepointDirectory = input;
}
std::int64_t getFsSmallFileThreshold() const
{
return fsSmallFileThreshold;
}
void setFsSmallFileThreshold(std::int64_t input)
{
fsSmallFileThreshold = input;
}
int getFsWriteBufferSize() const
{
return fsWriteBufferSize;
}
void setFsWriteBufferSize(int input)
{
fsWriteBufferSize = input;
}
int getRocksdbCheckpointTransferThreadNum() const
{
return 4;
}
std::string toString() const
{
std::ostringstream oss;
oss << "CheckpointConfigPOD{"
<< "checkpointsDirectory=" << checkpointsDirectory
<< ", stateBackend=" << stateBackend
<< ", checkpointStorage=" << checkpointStorage
<< ", maxRetainedCheckpoint=" << maxRetainedCheckpoint
<< ", asyncSnapshots=" << asyncSnapshots
<< ", incrementalCheckpoints=" << incrementalCheckpoints
<< ", localRecovery=" << localRecovery
<< ", localRecoveryTaskManagerStateRootDirs=" << localRecoveryTaskManagerStateRootDirs
<< ", savepointDirectory=" << savepointDirectory
<< ", fsSmallFileThreshold=" << fsSmallFileThreshold
<< ", fsWriteBufferSize=" << fsWriteBufferSize
<< '}';
return oss.str();
}
bool operator==(const CheckpointConfigPOD& other) const
{
return checkpointsDirectory == other.checkpointsDirectory &&
stateBackend == other.stateBackend &&
checkpointStorage == other.checkpointStorage &&
maxRetainedCheckpoint == other.maxRetainedCheckpoint &&
asyncSnapshots == other.asyncSnapshots &&
incrementalCheckpoints == other.incrementalCheckpoints &&
localRecovery == other.localRecovery &&
localRecoveryTaskManagerStateRootDirs == other.localRecoveryTaskManagerStateRootDirs &&
savepointDirectory == other.savepointDirectory &&
fsSmallFileThreshold == other.fsSmallFileThreshold &&
fsWriteBufferSize == other.fsWriteBufferSize;
}
NLOHMANN_DEFINE_TYPE_INTRUSIVE(CheckpointConfigPOD, checkpointsDirectory, stateBackend, checkpointStorage,
maxRetainedCheckpoint, asyncSnapshots, incrementalCheckpoints, localRecovery,
localRecoveryTaskManagerStateRootDirs, savepointDirectory, fsSmallFileThreshold, fsWriteBufferSize)
private:
std::string checkpointsDirectory;
std::string stateBackend;
std::string checkpointStorage;
int maxRetainedCheckpoint = 0;
bool asyncSnapshots = false;
bool incrementalCheckpoints = false;
bool localRecovery = false;
std::string localRecoveryTaskManagerStateRootDirs;
std::string savepointDirectory;
std::int64_t fsSmallFileThreshold = 0;
int fsWriteBufferSize = 0;
};
}
namespace std {
template<>
struct hash<omnistream::CheckpointConfigPOD> {
size_t operator()(const omnistream::CheckpointConfigPOD& obj) const
{
size_t h1 = std::hash<std::string>()(obj.getCheckpointsDirectory());
size_t h2 = std::hash<std::string>()(obj.getStateBackend());
size_t h3 = std::hash<std::string>()(obj.getCheckpointStorage());
size_t h4 = std::hash<int>() (obj.getMaxRetainedCheckpoint());
size_t h5 = std::hash<bool>() (obj.getAsyncSnapshots());
size_t h6 = std::hash<bool>() (obj.getIncrementalCheckpoints());
size_t h7 = std::hash<bool>() (obj.getLocalRecovery());
size_t h8 = std::hash<std::string>()(obj.getLocalRecoveryTaskManagerStateRootDirs());
size_t h9 = std::hash<std::string>()(obj.getSavepointDirectory());
size_t h10 = std::hash<std::int64_t>()(obj.getFsSmallFileThreshold());
size_t h11 = std::hash<int>() (obj.getFsWriteBufferSize());
size_t seed = 0;
const size_t C = 0x85ebca6b;
seed ^= (h1 + C + (seed << 6) + (seed >> 2));
seed ^= (h2 + C + (seed << 6) + (seed >> 2));
seed ^= (h3 + C + (seed << 6) + (seed >> 2));
seed ^= (h4 + C + (seed << 6) + (seed >> 2));
seed ^= (h5 + C + (seed << 6) + (seed >> 2));
seed ^= (h6 + C + (seed << 6) + (seed >> 2));
seed ^= (h7 + C + (seed << 6) + (seed >> 2));
seed ^= (h8 + C + (seed << 6) + (seed >> 2));
seed ^= (h9 + C + (seed << 6) + (seed >> 2));
seed ^= (h10 + C + (seed << 6) + (seed >> 2));
seed ^= (h11 + C + (seed << 6) + (seed >> 2));
return seed;
}
};
}
#endif