* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNISTREAM_CHECKPOINTMETRICSBUILDER_H
#define OMNISTREAM_CHECKPOINTMETRICSBUILDER_H
#include <memory>
#include "CheckpointMetrics.h"
#include "core/utils/threads/CompletableFutureV2.h"
class CheckpointMetricsBuilder {
public:
CheckpointMetricsBuilder()
{
bytesProcessedDuringAlignment_ = std::make_shared<CompletableFutureV2<long>>();
alignmentDurationNanos_ = std::make_shared<CompletableFutureV2<long>>();
};
CheckpointMetricsBuilder *SetBytesProcessedDuringAlignment(long bytesProcessedDuringAlignment)
{
bytesProcessedDuringAlignment_->Complete(bytesProcessedDuringAlignment);
return this;
}
CheckpointMetricsBuilder *SetBytesProcessedDuringAlignment(
std::shared_ptr<CompletableFutureV2<long>> bytesProcessedDuringAlignment)
{
bytesProcessedDuringAlignment_ = bytesProcessedDuringAlignment;
return this;
}
CheckpointMetricsBuilder *SetAlignmentDurationNanos(long alignmentDurationNanos)
{
alignmentDurationNanos_->Complete(alignmentDurationNanos);
return this;
}
CheckpointMetricsBuilder *SetAlignmentDurationNanos(
std::shared_ptr<CompletableFutureV2<long>> alignmentDurationNanos)
{
if (!alignmentDurationNanos_->IsDone()) {
alignmentDurationNanos_ = alignmentDurationNanos;
} else {
THROW_LOGIC_EXCEPTION("alignmentDurationNanos has already been completed by someone else");
}
return this;
}
CheckpointMetricsBuilder *SetBytesPersistedDuringAlignment(long bytesPersistedDuringAlignment)
{
bytesPersistedDuringAlignment_ = bytesPersistedDuringAlignment;
return this;
}
CheckpointMetricsBuilder *SetAsyncDurationMillis(long asyncDurationMillis)
{
asyncDurationMillis_ = asyncDurationMillis;
return this;
}
CheckpointMetricsBuilder *SetSyncDurationMillis(long syncDurationMillis)
{
syncDurationMillis_ = syncDurationMillis;
return this;
}
CheckpointMetricsBuilder *SetTotalBytesPersisted(long totalBytesPersisted)
{
totalBytesPersisted_ = totalBytesPersisted;
return this;
}
CheckpointMetricsBuilder *SetBytesPersistedOfThisCheckpoint(long bytesPersistedOfThisCheckpoint)
{
bytesPersistedOfThisCheckpoint_ = bytesPersistedOfThisCheckpoint;
return this;
}
CheckpointMetricsBuilder *SetCheckpointStartDelayNanos(long checkpointStartDelayNanos)
{
checkpointStartDelayNanos_ = checkpointStartDelayNanos;
return this;
}
CheckpointMetrics *BuildIncomplete()
{
return new CheckpointMetrics(
bytesProcessedDuringAlignment_->GetNow(-1),
bytesPersistedDuringAlignment_,
alignmentDurationNanos_->GetNow(-1),
syncDurationMillis_,
asyncDurationMillis_,
checkpointStartDelayNanos_,
bytesPersistedDuringAlignment_ > 0,
bytesPersistedOfThisCheckpoint_,
totalBytesPersisted_);
}
CheckpointMetrics *Build()
{
auto bpda = bytesProcessedDuringAlignment_->Get();
auto adn = alignmentDurationNanos_->Get();
return new CheckpointMetrics(
bpda,
bytesPersistedDuringAlignment_,
adn,
syncDurationMillis_,
asyncDurationMillis_,
checkpointStartDelayNanos_,
bytesPersistedDuringAlignment_ > 0,
bytesPersistedOfThisCheckpoint_,
totalBytesPersisted_);
}
private:
long bytesPersistedDuringAlignment_ = -1;
long asyncDurationMillis_ = -1;
long syncDurationMillis_ = -1;
long totalBytesPersisted_ = -1;
long bytesPersistedOfThisCheckpoint_ = -1;
long checkpointStartDelayNanos_ = -1;
std::shared_ptr<CompletableFutureV2<long>> bytesProcessedDuringAlignment_;
std::shared_ptr<CompletableFutureV2<long>> alignmentDurationNanos_;
};
#endif