* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "KafkaPartitionSplitReader.h"
#include <thread>
#include <utility>
#include <unordered_set>
namespace {
std::string topicPartitionKey(const RdKafka::TopicPartition* tp)
{
if (tp == nullptr) {
return "null";
}
return tp->topic() + "-" + std::to_string(tp->partition());
}
}
KafkaPartitionSplitReader::KafkaPartitionSplitReader(
const std::unordered_map<std::string, std::string>& props, SourceReaderContext* context)
: subtaskId(context->getSubTaskIndex())
{
std::unordered_map<std::string, std::string> consumerProps = props;
consumerProps["client.id"] = createConsumerClientId(props);
std::string errstr;
consumer = new RdKafkaConsumer(consumerProps);
if (!consumer) {
throw std::runtime_error("Failed to create Kafka consumer: " + errstr);
}
}
KafkaPartitionSplitReader::~KafkaPartitionSplitReader()
{
close();
delete consumer;
}
RecordsWithSplitIds<RdKafka::Message>* KafkaPartitionSplitReader::fetch()
{
ConsumerRecords* consumerRecords;
try {
consumerRecords = consumer->poll(POLL_TIMEOUT);
} catch (const std::exception& e) {
consumerRecords = ConsumerRecords::EMPTY;
auto recordsBySplits = new KafkaPartitionSplitRecords(consumerRecords);
markEmptySplitsAsFinished(recordsBySplits);
return recordsBySplits;
}
auto recordsBySplits = new KafkaPartitionSplitRecords(consumerRecords);
std::vector<RdKafka::TopicPartition*> finishedPartitions;
std::vector<RdKafka::TopicPartition*> recordsPartitions;
consumerRecords->partitions(recordsPartitions);
for (auto& tp : recordsPartitions) {
long stoppingOffset = getStoppingOffset(tp);
auto recordsFromPartition = consumerRecords->records(tp);
if (!recordsFromPartition.empty()) {
auto lastRecord = recordsFromPartition.back();
if (lastRecord->offset() >= stoppingOffset - 1) {
recordsBySplits->setPartitionStoppingOffset(tp, stoppingOffset);
finishedPartitions.push_back(tp);
recordsBySplits->addFinishedSplit(KafkaPartitionSplit::toSplitId(tp));
}
}
}
markEmptySplitsAsFinished(recordsBySplits);
if (!finishedPartitions.empty()) {
unassignPartitions(finishedPartitions);
}
return recordsBySplits;
}
void KafkaPartitionSplitReader::handleSplitsChanges(const std::vector<KafkaPartitionSplit*>& splitsChange)
{
std::vector<RdKafka::TopicPartition*> newPartitionAssignments;
std::vector<std::shared_ptr<RdKafka::TopicPartition>> partitionsStoppingAtLatest;
std::vector<std::shared_ptr<RdKafka::TopicPartition>> partitionsStoppingAtCommitted;
std::vector<RdKafka::TopicPartition*> currentAssignment;
consumer->assignment(currentAssignment);
std::unordered_set<std::string> assignmentKeys;
newPartitionAssignments.reserve(currentAssignment.size() + splitsChange.size());
for (auto* tp : currentAssignment) {
if (tp == nullptr) {
continue;
}
const std::string splitId = topicPartitionKey(tp);
if (assignmentKeys.insert(splitId).second) {
newPartitionAssignments.push_back(tp);
}
}
size_t acceptedNewSplits = 0;
for (const auto& s : splitsChange) {
if (s == nullptr) {
continue;
}
auto tp = s->getTopicPartition();
if (tp == nullptr) {
continue;
}
const std::string splitId = s->splitId();
if (!assignmentKeys.insert(splitId).second) {
continue;
}
acceptedNewSplits++;
setStartingOffsetForAssignment(s);
newPartitionAssignments.push_back(tp.get());
parseStoppingOffsets(s, partitionsStoppingAtLatest, partitionsStoppingAtCommitted);
}
if (acceptedNewSplits == 0) {
for (auto* tp : currentAssignment) {
delete tp;
}
return;
}
consumer->assign(newPartitionAssignments);
for (auto* tp : currentAssignment) {
delete tp;
}
acquireAndSetStoppingOffsets(partitionsStoppingAtLatest, partitionsStoppingAtCommitted);
removeEmptySplits();
maybeLogSplitChangesHandlingResult(splitsChange);
}
void KafkaPartitionSplitReader::wakeUp()
{
}
void KafkaPartitionSplitReader::close()
{
consumer->close();
}
void KafkaPartitionSplitReader::markEmptySplitsAsFinished(KafkaPartitionSplitRecords* recordsBySplits)
{
if (!emptySplits.empty()) {
for (const auto& split : emptySplits) {
recordsBySplits->finishedSplits().insert(split);
}
emptySplits.clear();
}
}
std::string KafkaPartitionSplitReader::createConsumerClientId(const std::unordered_map<std::string, std::string>& props)
{
auto it = props.find("client.id.prefix");
std::string prefix = (it != props.end()) ? it->second : "";
return prefix + "-" + std::to_string(subtaskId);
}
void KafkaPartitionSplitReader::setStartingOffsetForAssignment(KafkaPartitionSplit* split)
{
if (split == nullptr) {
return;
}
const std::shared_ptr<RdKafka::TopicPartition> tp = split->getTopicPartition();
if (!tp) {
return;
}
const int64_t startingOffset = split->getStartingOffset();
int64_t assignmentOffset = startingOffset;
if (startingOffset == KafkaPartitionSplit::EARLIEST_OFFSET) {
assignmentOffset = RdKafka::Topic::OFFSET_BEGINNING;
} else if (startingOffset == KafkaPartitionSplit::LATEST_OFFSET) {
assignmentOffset = RdKafka::Topic::OFFSET_END;
} else if (startingOffset == KafkaPartitionSplit::COMMITTED_OFFSET) {
assignmentOffset = RdKafka::Topic::OFFSET_STORED;
}
tp->set_offset(assignmentOffset);
}
void KafkaPartitionSplitReader::parseStoppingOffsets(
KafkaPartitionSplit* split,
std::vector<std::shared_ptr<RdKafka::TopicPartition>>& partitionsStoppingAtLatest,
std::vector<std::shared_ptr<RdKafka::TopicPartition>>& partitionsStoppingAtCommitted)
{
const std::shared_ptr<RdKafka::TopicPartition> tp = split->getTopicPartition();
auto stoppingOffset = split->getStoppingOffset();
if (stoppingOffset >= 0) {
stoppingOffsets[tp.get()] = stoppingOffset;
} else if (stoppingOffset == KafkaPartitionSplit::LATEST_OFFSET) {
partitionsStoppingAtLatest.push_back(tp);
} else if (stoppingOffset == KafkaPartitionSplit::COMMITTED_OFFSET) {
partitionsStoppingAtCommitted.push_back(tp);
}
}
void KafkaPartitionSplitReader::acquireAndSetStoppingOffsets(
std::vector<std::shared_ptr<RdKafka::TopicPartition>>& partitionsStoppingAtLatest,
std::vector<std::shared_ptr<RdKafka::TopicPartition>>& partitionsStoppingAtCommitted)
{
consumer->endOffsets(partitionsStoppingAtLatest);
for (const auto& tp : partitionsStoppingAtLatest) {
stoppingOffsets[tp.get()] = tp->offset();
}
if (!partitionsStoppingAtCommitted.empty()) {
consumer->committed(partitionsStoppingAtCommitted);
for (const auto& tp : partitionsStoppingAtCommitted) {
stoppingOffsets[tp.get()] = tp->offset();
}
}
}
void KafkaPartitionSplitReader::removeEmptySplits()
{
std::vector<RdKafka::TopicPartition*> emptyPartitions;
std::vector<RdKafka::TopicPartition*> currentAssignment;
consumer->assignment(currentAssignment);
consumer->position(currentAssignment);
for (auto* tp : currentAssignment) {
if (tp->offset() >= getStoppingOffset(tp)) {
emptyPartitions.push_back(tp);
}
}
if (!emptyPartitions.empty()) {
for (auto* tp : emptyPartitions) {
emptySplits.insert(KafkaPartitionSplit::toSplitId(tp));
}
unassignPartitions(emptyPartitions);
}
for (auto* tp : currentAssignment) {
delete tp;
}
}
void KafkaPartitionSplitReader::maybeLogSplitChangesHandlingResult(
const std::vector<KafkaPartitionSplit*>& splitsChange)
{
}
void KafkaPartitionSplitReader::unassignPartitions(const std::vector<RdKafka::TopicPartition*>& partitionsToUnassign)
{
std::vector<RdKafka::TopicPartition*> currentAssignment;
consumer->assignment(currentAssignment);
std::unordered_set<std::string> unassignSet;
for (auto* tp : partitionsToUnassign) {
if (tp != nullptr) {
unassignSet.insert(topicPartitionKey(tp));
}
}
std::vector<RdKafka::TopicPartition*> newAssignment;
for (auto tp : currentAssignment) {
if (tp != nullptr && unassignSet.find(topicPartitionKey(tp)) == unassignSet.end()) {
newAssignment.push_back(tp);
}
}
consumer->assign(newAssignment);
for (auto* tp : currentAssignment) {
delete tp;
}
}
long KafkaPartitionSplitReader::getStoppingOffset(RdKafka::TopicPartition* tp)
{
auto it = stoppingOffsets.find(tp);
return (it != stoppingOffsets.end()) ? it->second : std::numeric_limits<long>::max();
}
void KafkaPartitionSplitReader::commitOffsets(
const std::map<std::shared_ptr<RdKafka::TopicPartition>, int64_t>& offsets)
{
consumer->commitOffsets(offsets);
}