* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "KafkaPartitionSplitSerializer.h"
#include <sstream>
#include "common.h"
int KafkaPartitionSplitSerializer::getVersion() const
{
return CURRENT_VERSION;
}
namespace {
void requireAvailable(const std::vector<uint8_t>& serialized, size_t offset, size_t bytes, const std::string& field)
{
if (offset > serialized.size() || bytes > serialized.size() - offset) {
std::ostringstream oss;
oss << "Corrupt KafkaPartitionSplit bytes: need " << bytes << " byte(s) for " << field << " at offset "
<< offset << ", totalBytes=" << serialized.size();
throw std::runtime_error(oss.str());
}
}
int64_t readBigEndianLong(const std::vector<uint8_t>& serialized, size_t offset)
{
uint64_t value = 0;
for (size_t i = 0; i < sizeof(int64_t); ++i) {
value = (value << KafkaPartitionSplitSerializer::ONE_BYTE_LENGTH) | serialized[offset + i];
}
return static_cast<int64_t>(value);
}
}
std::vector<uint8_t> KafkaPartitionSplitSerializer::serialize(const KafkaPartitionSplit& split)
{
std::vector<uint8_t> serialized;
const auto& topicPartition = split.getTopicPartition();
const std::string& topic = topicPartition->topic();
uint32_t partition = static_cast<uint32_t>(topicPartition->partition());
uint64_t startingOffset = static_cast<uint64_t>(split.getStartingOffset());
uint64_t stoppingOffset = static_cast<uint64_t>(split.getStoppingOffset());
uint16_t topicSize = static_cast<uint16_t>(topic.size());
serialized.push_back(static_cast<uint8_t>((topicSize >> ONE_BYTE_LENGTH) & 0xFF));
serialized.push_back(static_cast<uint8_t>(topicSize & 0xFF));
for (char c : topic) {
serialized.push_back(static_cast<uint8_t>(c));
}
serialized.push_back(static_cast<uint8_t>((partition >> THREE_BYTE_LENGTH) & 0xFF));
serialized.push_back(static_cast<uint8_t>((partition >> TWO_BYTE_LENGTH) & 0xFF));
serialized.push_back(static_cast<uint8_t>((partition >> ONE_BYTE_LENGTH) & 0xFF));
serialized.push_back(static_cast<uint8_t>(partition & 0xFF));
for (int i = 7; i >= 0; --i) {
serialized.push_back(static_cast<uint8_t>((startingOffset >> (ONE_BYTE_LENGTH * i)) & 0xFF));
}
for (int i = 7; i >= 0; --i) {
serialized.push_back(static_cast<uint8_t>((stoppingOffset >> (ONE_BYTE_LENGTH * i)) & 0xFF));
}
return serialized;
}
KafkaPartitionSplit* KafkaPartitionSplitSerializer::deserialize(int version, std::vector<uint8_t>& serialized)
{
if (version != CURRENT_VERSION) {
throw std::runtime_error("Unsupported version");
}
size_t start = 0;
requireAvailable(serialized, start, sizeof(int16_t), "topic length");
uint16_t topicSize = (serialized[start] << ONE_BYTE_LENGTH) | serialized[start + 1];
start += sizeof(int16_t);
requireAvailable(serialized, start, topicSize, "topic");
std::string topic(serialized.begin() + start, serialized.begin() + start + topicSize);
start += topicSize;
requireAvailable(serialized, start, sizeof(int32_t), "partition");
int32_t partition = static_cast<int32_t>(
(static_cast<uint32_t>(serialized[start]) << THREE_BYTE_LENGTH) |
(static_cast<uint32_t>(serialized[start + 1]) << TWO_BYTE_LENGTH) |
(static_cast<uint32_t>(serialized[start + 2]) << ONE_BYTE_LENGTH) |
static_cast<uint32_t>(serialized[start + 3]));
start += sizeof(int32_t);
requireAvailable(serialized, start, sizeof(int64_t), "startingOffset");
int64_t startingOffset = readBigEndianLong(serialized, start);
start += sizeof(int64_t);
requireAvailable(serialized, start, sizeof(int64_t), "stoppingOffset");
int64_t stoppingOffset = readBigEndianLong(serialized, start);
std::shared_ptr<RdKafka::TopicPartition> topicPartition;
if (startingOffset < 0) {
topicPartition = std::shared_ptr<RdKafka::TopicPartition>(RdKafka::TopicPartition::create(topic, partition));
} else {
topicPartition =
std::shared_ptr<RdKafka::TopicPartition>(RdKafka::TopicPartition::create(topic, partition, startingOffset));
}
if (!topicPartition) {
throw std::runtime_error(
"Failed to create Kafka TopicPartition for topic=" + topic + ", partition=" + std::to_string(partition));
}
try {
auto* split = new KafkaPartitionSplit(topicPartition, startingOffset, stoppingOffset);
return split;
} catch (const std::exception& e) {
LOG("Error: KafkaPartitionSplit invalid, topic=" << topic << ", partition=" << partition << ", startingOffset="
<< startingOffset << ", stoppingOffset=" << stoppingOffset
<< ", bytes=" << serialized.size() << ", error=" << e.what());
throw;
}
}