* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "IncrementalRemoteKeyedStateHandle.h"
IncrementalRemoteKeyedStateHandle::IncrementalRemoteKeyedStateHandle(
UUID backendIdentifier,
KeyGroupRange keyGroupRange,
int64_t checkpointId,
std::vector<HandleAndLocalPath> sharedState,
std::vector<HandleAndLocalPath> privateState,
std::shared_ptr<StreamStateHandle> metaStateHandle,
long persistedSizeOfThisCheckpoint,
StateHandleID stateHandleId)
: backendIdentifier_(backendIdentifier),
keyGroupRange_(keyGroupRange),
checkpointId_(checkpointId),
sharedState_(sharedState),
privateState_(privateState),
metaStateHandle_(metaStateHandle),
persistedSizeOfThisCheckpoint_(persistedSizeOfThisCheckpoint),
stateHandleId_(stateHandleId)
{
this->persistedSizeOfThisCheckpoint_ =
persistedSizeOfThisCheckpoint == unkownCheckpointSize
? GetStateSize()
: persistedSizeOfThisCheckpoint;
}
IncrementalRemoteKeyedStateHandle *IncrementalRemoteKeyedStateHandle::Restore(
UUID backendIdentifier,
KeyGroupRange keyGroupRange,
int64_t checkpointId,
std::vector<HandleAndLocalPath> sharedState,
std::vector<HandleAndLocalPath> privateState,
std::shared_ptr<StreamStateHandle> metaStateHandle,
long persistedSizeOfThisCheckpoint,
StateHandleID stateHandleId)
{
return new IncrementalRemoteKeyedStateHandle(
backendIdentifier,
keyGroupRange,
checkpointId,
sharedState,
privateState,
metaStateHandle,
persistedSizeOfThisCheckpoint,
stateHandleId);
}
std::shared_ptr<CheckpointBoundKeyedStateHandle> IncrementalRemoteKeyedStateHandle::rebound(int64_t newCheckpointId) const
{
return std::make_shared<IncrementalRemoteKeyedStateHandle>(
backendIdentifier_,
keyGroupRange_,
newCheckpointId,
sharedState_,
privateState_,
metaStateHandle_,
persistedSizeOfThisCheckpoint_,
stateHandleId_);
}
std::string IncrementalRemoteKeyedStateHandle::ToString() const
{
std::string sharedStateStr = "[";
for (size_t i = 0; i < sharedState_.size(); ++i) {
sharedStateStr += sharedState_[i].ToString();
if (i != sharedState_.size() - 1) {
sharedStateStr += ", ";
}
}
sharedStateStr += "]";
std::string privateStateStr = "[";
for (size_t i = 0; i < privateState_.size(); ++i) {
privateStateStr += privateState_[i].ToString();
if (i != privateState_.size() - 1) {
privateStateStr += ", ";
}
}
privateStateStr += "]";
std::ostringstream oss;
oss << "{"
<< "\"backendIdentifier\": \""
<< backendIdentifier_.ToString()
<< "\", \"stateHandleId\": "
<< stateHandleId_.ToString()
<< ", \"keyGroupRange\": "
<< keyGroupRange_.ToString()
<< ", \"checkpointId\": "
<< checkpointId_
<< ", \"sharedState\": "
<< sharedStateStr
<< ", \"privateState\": "
<< privateStateStr
<< ", \"metaStateHandle\": "
<< (metaStateHandle_ ? metaStateHandle_->ToString() : "null")
<< ", \"persistedSizeOfThisCheckpoint\": "
<< persistedSizeOfThisCheckpoint_
<< ", \"checkpointedSize\": "
<< persistedSizeOfThisCheckpoint_
<< ", \"stateHandleName\": \"IncrementalRemoteKeyedStateHandle\"}";
return oss.str();
}
void IncrementalRemoteKeyedStateHandle::DiscardState()
{
try {
metaStateHandle_->DiscardState();
} catch (...) {
}
try {
for (auto path : privateState_) {
path.getHandle()->DiscardState();
}
} catch (...) {
}
}
void IncrementalRemoteKeyedStateHandle::RegisterSharedStates(SharedStateRegistry &stateRegistry, int64_t checkpointId)
{
}
long IncrementalRemoteKeyedStateHandle::GetCheckpointedSize()
{
return persistedSizeOfThisCheckpoint_;
}
StateHandleID IncrementalRemoteKeyedStateHandle::GetStateHandleId() const
{
return stateHandleId_;
}
std::shared_ptr<KeyedStateHandle> IncrementalRemoteKeyedStateHandle::GetIntersection(const KeyGroupRange &keyGroupRange) const
{
auto range = keyGroupRange_.getIntersection(keyGroupRange);
if (range == nullptr || range->getNumberOfKeyGroups() == 0) {
delete range;
return nullptr;
}
auto shared = std::make_shared<IncrementalRemoteKeyedStateHandle>(
backendIdentifier_,
*range,
checkpointId_,
sharedState_,
privateState_,
metaStateHandle_,
persistedSizeOfThisCheckpoint_,
stateHandleId_);
return shared;
}
const UUID &IncrementalRemoteKeyedStateHandle::GetBackendIdentifier() const
{
return backendIdentifier_;
}
std::shared_ptr<StreamStateHandle> IncrementalRemoteKeyedStateHandle::GetMetaDataStateHandle() const
{
return metaStateHandle_;
}
const std::vector<IncrementalKeyedStateHandle::HandleAndLocalPath>& IncrementalRemoteKeyedStateHandle::GetSharedStateHandles() const
{
return GetSharedState();
}
bool IncrementalRemoteKeyedStateHandle::operator==(const IncrementalRemoteKeyedStateHandle &other) const
{
if (this == &other) {
return true;
}
if (GetStateHandleId() != other.GetStateHandleId()) {
return false;
}
if (GetCheckpointId() != other.GetCheckpointId()) {
return false;
}
if (GetBackendIdentifier() != other.GetBackendIdentifier()) {
return false;
}
if (!(GetKeyGroupRange() == other.GetKeyGroupRange())) {
return false;
}
if (!(GetSharedState() == other.GetSharedState())) {
return false;
}
if (!(GetPrivateState() == other.GetPrivateState())) {
return false;
}
auto meta1 = GetMetaDataStateHandle();
auto meta2 = other.GetMetaDataStateHandle();
if (meta1 == nullptr && meta2 == nullptr) {
return true;
}
if ((meta1 == nullptr) != (meta2 == nullptr)) {
return false;
}
return *meta1 == *meta2;
}
long IncrementalRemoteKeyedStateHandle::GetStateSize() const
{
long size = metaStateHandle_ ? metaStateHandle_->GetStateSize() : 0;
for (HandleAndLocalPath path : sharedState_) {
size += path.GetStateSize();
}
for (HandleAndLocalPath path : privateState_) {
size += path.GetStateSize();
}
return size;
}
KeyGroupRange IncrementalRemoteKeyedStateHandle::GetKeyGroupRange() const
{
return keyGroupRange_;
}
int64_t IncrementalRemoteKeyedStateHandle::GetCheckpointId() const
{
return checkpointId_;
}
const std::vector<IncrementalKeyedStateHandle::HandleAndLocalPath>& IncrementalRemoteKeyedStateHandle::GetSharedState() const
{
return sharedState_;
}
std::vector<IncrementalKeyedStateHandle::HandleAndLocalPath> IncrementalRemoteKeyedStateHandle::GetPrivateState() const
{
return privateState_;
}