* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "KafkaSource.h"
#include "core/api/common/serialization/DeserializationFactory.h"
KafkaSource::KafkaSource(nlohmann::json& opDescriptionJSON, bool isBatch) : isBatch(isBatch)
{
nlohmann::json properties = opDescriptionJSON["properties"];
for (auto& [key, value] : properties.items()) {
auto iter = ConsumerConfigUtil::GetConfig().find(key);
if (iter != ConsumerConfigUtil::GetConfig().end() && iter->second != "") {
props.emplace(iter->second, value);
}
}
std::unordered_map<std::string, std::string> optConsumerConfig = ConfigLoader::GetKafkaConsumerConfig();
for (const auto& pair : optConsumerConfig) {
props.emplace(pair.first, pair.second);
}
auto innerDeserializationSchema = DeserializationFactory::getDeserializationSchema(opDescriptionJSON);
deserializationSchema = KafkaRecordDeserializationSchema::valueOnly(innerDeserializationSchema);
}
SourceReader<KafkaPartitionSplit>* KafkaSource::createReader(SourceReaderContext* readerContext) const
{
auto subTaskId = readerContext->getSubTaskIndex();
auto elementsQueue = new FutureCompletingBlockingQueue<RdKafka::Message>(subTaskId);
deserializationSchema->open();
std::function<SplitReader<RdKafka::Message, KafkaPartitionSplit>*()> splitReaderSupplier = [this, readerContext]() {
return new KafkaPartitionSplitReader(props, readerContext);
};
RecordEmitter<RdKafka::Message, KafkaPartitionSplitState>* recordEmitter =
new KafkaRecordEmitter(deserializationSchema);
SingleThreadFetcherManager<RdKafka::Message, KafkaPartitionSplit>* kafkaSourceFetcherManager =
new KafkaSourceFetcherManager(elementsQueue, splitReaderSupplier);
return new KafkaSourceReader(elementsQueue, kafkaSourceFetcherManager, recordEmitter, readerContext, isBatch);
}
KafkaPartitionSplitSerializer* KafkaSource::getSplitSerializer() const
{
return new KafkaPartitionSplitSerializer();
}