* Copyright (c) Huawei Technologies Co., Ltd. 2026. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*
* Extracted from FullSnapshotAsyncWriter.cpp for testability.
*/
#ifndef OMNISTREAM_SAVEPOINTKVSTREAMWRITER_H
#define OMNISTREAM_SAVEPOINTKVSTREAMWRITER_H
#include <cstdint>
#include <cstring>
#include <stdexcept>
#include <vector>
#include "core/utils/ByteView.h"
#include "KeyGroupRangeOffsets.h"
#include "CheckpointStateOutputStreamProxy.h"
* Writes key-group-prefixed key/value entries into a savepoint stream.
*
* <p>Supports two write paths:
* <ul>
* <li>Patchable: writes directly into the stream buffer with a BytePatch
* handle. The first byte of the key can later be patched with
* {@code FIRST_BIT_IN_BYTE_MASK} to mark end-of-key-group/state.</li>
* <li>Buffered: when the entry is too large for the stream chunk, the
* key/value are copied into pending vectors and written later via
* {@link CheckpointStateOutputStreamProxy#writeKeyValuePair}.</li>
* </ul>
*
* <p>Usage pattern: call {@code writeFirst} for the first entry, then
* {@code writeNext} for subsequent entries, and finally {@code finish}.
* Between calls the writer maintains a pending entry that must be finalized
* before any operation that may flush or reallocate the stream buffer.
*/
class SavepointKvStreamWriter {
public:
static constexpr int END_OF_KEY_GROUP_MASK = 0xffff;
static constexpr int FIRST_BIT_IN_BYTE_MASK = 0x80;
SavepointKvStreamWriter(
CheckpointStateOutputStreamProxy& stream,
KeyGroupRangeOffsets& keyGroupRangeOffsets)
: stream_(stream),
keyGroupRangeOffsets_(keyGroupRangeOffsets)
{}
void writeFirst(ByteView key, ByteView value, int kvStateId, int keyGroup)
{
keyGroupRangeOffsets_.setKeyGroupOffset(keyGroup, stream_.getPos());
stream_.writeShort(kvStateId);
stream_.prepareForPatchableKeyValuePair(encodedLength(key, value));
writePending(key, value);
}
void writeNext(ByteView key, ByteView value, int kvStateId, int keyGroup, bool newKeyGroup, bool newKvState)
{
finalizePending(newKeyGroup || newKvState);
if (newKeyGroup) {
stream_.writeShort(END_OF_KEY_GROUP_MASK);
keyGroupRangeOffsets_.setKeyGroupOffset(keyGroup, stream_.getPos());
stream_.writeShort(kvStateId);
} else if (newKvState) {
stream_.writeShort(kvStateId);
}
stream_.prepareForPatchableKeyValuePair(encodedLength(key, value));
writePending(key, value);
}
void finish()
{
if (!pending_.hasPending) {
return;
}
finalizePending(true);
stream_.writeShort(END_OF_KEY_GROUP_MASK);
}
private:
struct PendingEntry {
bool hasPending = false;
bool buffered = false;
CheckpointStateOutputStreamProxy::BytePatch patch;
std::vector<int8_t> key;
std::vector<int8_t> value;
};
CheckpointStateOutputStreamProxy& stream_;
KeyGroupRangeOffsets& keyGroupRangeOffsets_;
PendingEntry pending_;
static void copyViewToVector(std::vector<int8_t>& target, ByteView source)
{
target.resize(source.size());
if (!source.empty()) {
std::memcpy(target.data(), source.data(), source.size());
}
}
static void markEndOfKeyGroupOrState(std::vector<int8_t>& key)
{
key[0] = static_cast<int8_t>(static_cast<uint8_t>(key[0]) | FIRST_BIT_IN_BYTE_MASK);
}
static size_t encodedLength(ByteView key, ByteView value)
{
return sizeof(int32_t) + key.size() + sizeof(int32_t) + value.size();
}
void writePending(ByteView key, ByteView value)
{
if (key.empty()) {
throw std::runtime_error("Savepoint key/value entry key must not be empty");
}
CheckpointStateOutputStreamProxy::BytePatch patch;
bool wrotePatchable = stream_.tryWritePatchableKeyValuePair(key, value, patch);
if (wrotePatchable) {
pending_.patch = patch;
pending_.buffered = false;
} else {
copyViewToVector(pending_.key, key);
copyViewToVector(pending_.value, value);
pending_.buffered = true;
pending_.patch = {};
}
pending_.hasPending = true;
}
void finalizePending(bool markEnd)
{
if (!pending_.hasPending) {
return;
}
if (pending_.buffered) {
if (markEnd) {
markEndOfKeyGroupOrState(pending_.key);
}
stream_.writeKeyValuePair(pending_.key, pending_.value);
pending_.key.clear();
pending_.value.clear();
} else if (markEnd) {
stream_.patchByte(pending_.patch, FIRST_BIT_IN_BYTE_MASK);
}
if (!pending_.buffered) {
stream_.releasePatch(pending_.patch);
}
pending_.hasPending = false;
pending_.patch = {};
}
};
#endif