Subject: [PATCH] modify source lock granularity from row to batch(64)
delete nouse test
message destruction enable batch sub ref count
reduce vector copy
optimize batch api by avoiding frequent resize
static thread batch kafka and partition range
fix mem bug
add batch produce in cpp remove debug print in kafka msg add test for interface
add consumebatch api
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -28,6 +28,10 @@
#include <string>
#include <vector>
+#include <chrono>
+#include <climits>
+#include <memory>
+#include <algorithm>
#include "rdkafkacpp_int.h"
@@ -73,6 +77,7 @@
}
rkc->rk_ = rk;
+ rkc->rk_queue_ = rd_kafka_queue_get_consumer(rk);
/* Redirect handle queue to cgrp's queue to provide a single queue point */
rd_kafka_poll_set_consumer(rk);
@@ -119,6 +124,37 @@
}
+std::unordered_map<RdKafka::TopicPartition*, std::vector<RdKafka::Message*>> RdKafka::KafkaConsumerImpl::consumeBatch(int timeout_ms, size_t maxPollRecords) {
+ std::vector<rd_kafka_message_t*> msgPtrArray(maxPollRecords);
+ auto msgReceived = rd_kafka_consume_batch_queue(rk_queue_, timeout_ms, msgPtrArray.data(), maxPollRecords);
+ std::unordered_map<RdKafka::TopicPartition*, std::vector<RdKafka::Message*>> result;
+ if (msgReceived < 0) {
+ return result;
+ }
+
+ for (int i = 0; i < msgReceived; ++i) {
+ rd_kafka_message_t *cur = msgPtrArray[i];
+ if (cur) {
+ RdKafka::Message *msg = new RdKafka::MessageImpl(RD_KAFKA_CONSUMER, cur);
+ const std::string &topic = msg->topic_name();
+ int partition = msg->partition();
+
+ auto key_it = std::find_if(result.begin(), result.end(), [&](const auto& pair) {
+ return pair.first->topic() == topic && pair.first->partition() == partition;
+ });
+ if (key_it != result.end()) {
+ key_it->second.push_back(msg);
+ } else {
+ auto key = RdKafka::TopicPartition::create(topic, partition);
+ auto& msgVec = result[key];
+ msgVec.reserve(maxPollRecords);
+ msgVec.push_back(msg);
+ }
+ }
+ }
+ return result;
+};
+
RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::assignment(
std::vector<RdKafka::TopicPartition *> &partitions) {
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -55,6 +55,7 @@
#include <cstring>
#include <stdint.h>
#include <sys/types.h>
+#include <unordered_map>
#ifdef _WIN32
#ifndef ssize_t
@@ -2080,6 +2081,8 @@
* @returns \c rd_kafka_topic_t*
*/
virtual struct rd_kafka_topic_s *c_ptr() = 0;
+
+ virtual int32_t get_partition_num() = 0;
};
@@ -2358,7 +2361,12 @@
virtual size_t size() const = 0;
};
-
+class RD_EXPORT RdKafkaCInterface {
+ public:
+ static void DeleteTopParRefCnt(uint64_t add, uint32_t refNum);
+ static void DeleteTopicRecCnt(uint64_t add, uint32_t refNum);
+ static void DeleteBufRefCnt(uint64_t add, uint32_t refNum);
+};
/**
* @brief Message object
*
@@ -2412,6 +2420,8 @@
/** @returns Topic name (if applicable, else empty string) */
virtual std::string topic_name() const = 0;
+ virtual void DeleteMessage(uint64_t *add1, uint64_t *add2, uint64_t *add3) = 0;
+
/** @returns Partition (if applicable) */
virtual int32_t partition() const = 0;
@@ -2737,6 +2747,8 @@
*/
virtual Message *consume(int timeout_ms) = 0;
+ virtual std::unordered_map<RdKafka::TopicPartition*, std::vector<RdKafka::Message*>> consumeBatch(int timeout_ms, size_t maxPollRecords) = 0;
+
/**
* @brief Commit offsets for the current assignment.
*
@@ -3351,6 +3363,16 @@
size_t len,
const std::string *key,
void *msg_opaque) = 0;
+
+
+ virtual ErrorCode produce(Topic *topic,
+ int32_t partition,
+ int msgflags,
+ const std::vector<char*> &data,
+ const std::vector<size_t> &lens,
+ const void *key,
+ size_t key_len,
+ void *msg_opaque) = 0;
/**
* @brief Variant produce() that passes the key as a pointer and length
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -378,14 +378,21 @@
class MessageImpl : public Message {
public:
~MessageImpl() {
- if (free_rkmessage_)
- rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_));
+ //rkmessage_需要调用DeleteMessage手动释放,注意引用计数通过参数返回后需要批量处理
+// if (free_rkmessage_)
+// rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_));
if (key_)
delete key_;
if (headers_)
delete headers_;
}
+ void DeleteMessage(uint64_t *add1, uint64_t *add2, uint64_t *add3) {
+ if (free_rkmessage_)
+ rd_kafka_message_destroy_1(const_cast<rd_kafka_message_t *>(rkmessage_), add1, add2, add3);
+ }
+
+
MessageImpl(rd_kafka_type_t rk_type,
RdKafka::Topic *topic,
rd_kafka_message_t *rkmessage) :
@@ -1180,6 +1187,7 @@
}
rd_kafka_t *rk_;
+ rd_kafka_queue_t* rk_queue_;
/* All Producer and Consumer callbacks must reside in HandleImpl and
* the opaque provided to rdkafka must be a pointer to HandleImpl, since
* ProducerImpl and ConsumerImpl classes cannot be safely directly cast to
@@ -1217,6 +1225,10 @@
rd_kafka_offset_store(rkt_, partition, offset));
}
+ int32_t get_partition_num () {
+ return rd_kafka_get_partition_num(rkt_);
+ }
+
static Topic *create(Handle &base, const std::string &topic, Conf *conf);
struct rd_kafka_topic_s *c_ptr() {
@@ -1339,8 +1351,10 @@
virtual public HandleImpl {
public:
~KafkaConsumerImpl() {
- if (rk_)
- rd_kafka_destroy_flags(rk_, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
+ if (rk_) {
+ rd_kafka_destroy_flags(rk_, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
+ rd_kafka_queue_destroy(rk_queue_);
+ }
}
static KafkaConsumer *create(Conf *conf, std::string &errstr);
@@ -1360,6 +1374,7 @@
Error *incremental_unassign(const std::vector<TopicPartition *> &partitions);
Message *consume(int timeout_ms);
+ std::unordered_map<RdKafka::TopicPartition*, std::vector<RdKafka::Message*>> consumeBatch(int timeout_ms, size_t maxPollRecords);
ErrorCode commitSync() {
return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 0 /*sync*/));
}
@@ -1507,10 +1522,16 @@
class ProducerImpl : virtual public Producer, virtual public HandleImpl {
+private:
+ std::vector<rd_kafka_message_t> msgs;
public:
- ~ProducerImpl() {
+ explicit ProducerImpl(int32_t limit = 100000) {
+ msgs.reserve(limit);
+ }
+ ~ProducerImpl() {
if (rk_)
rd_kafka_destroy(rk_);
+ msgs.clear();
}
ErrorCode produce(Topic *topic,
@@ -1529,6 +1550,15 @@
const void *key,
size_t key_len,
void *msg_opaque);
+
+ ErrorCode produce(Topic *topic,
+ int32_t partition,
+ int msgflags,
+ const std::vector<char*> &data,
+ const std::vector<size_t> &lens,
+ const void *key,
+ size_t key_len,
+ void *msg_opaque) ;
ErrorCode produce(Topic *topic,
int32_t partition,
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -197,6 +197,7 @@
/*
* Generic configuration
*/
+ int max_poll_records;
int enabled_events;
int max_msg_size;
int msg_copy_max_size;
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -127,6 +127,48 @@
}
+RdKafka::ErrorCode RdKafka::ProducerImpl::produce(Topic *topic,
+ int32_t partition,
+ int msgflags,
+ const std::vector<char*> &data,
+ const std::vector<size_t> &lens,
+ const void *key,
+ size_t keylen,
+ void *msg_opaque){
+ RdKafka::TopicImpl *topicimpl = reinterpret_cast<RdKafka::TopicImpl *>(topic);
+ int32_t cycles = data.size();
+ msgs.resize(cycles);
+
+ std::vector<int> msgidps;
+ msgidps.resize(cycles);
+ for (int i = 0; i < cycles; i++) {
+ int* msgidp = &msgidps[i];
+
+ msgs[i].payload = data[i];
+ msgs[i].len = lens[i];
+ msgs[i]._private = msgidp;
+ msgs[i].partition = partition; /* Will be ignored since
+ * RD_KAFKA_MSG_F_PARTITION
+ * is not supplied. */
+ }
+
+ auto buffSize = rd_kafka_produce_batch(topicimpl->rkt_, partition,
+ msgflags | RD_KAFKA_MSG_F_PARTITION, msgs.data(), cycles);
+ if (buffSize != cycles) {
+ for (int i=0;i<cycles;++i) {
+ if (msgs[i].err != RD_KAFKA_RESP_ERR_NO_ERROR) {
+ std::cout<<"error : buffer size "<<buffSize <<
+ ", but need buffer size is "<< cycles<<
+ ",reason is "<<
+ rd_kafka_err2str(msgs[i].err)<< std::endl;
+ return static_cast<RdKafka::ErrorCode>(msgs[i].err);
+ }
+ }
+
+ }
+ return RdKafka::ERR_NO_ERROR;
+}
+
RdKafka::ErrorCode RdKafka::ProducerImpl::produce(
RdKafka::Topic *topic,
int32_t partition,
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -113,7 +113,22 @@
return RD_KAFKA_RESP_ERR__STATE;
}
-
+void rd_kafka_msg_destroy_1(rd_kafka_t *rk, rd_kafka_msg_t *rkm, uint64_t *add) {
+ // FIXME
+ if (rkm->rkm_flags & RD_KAFKA_MSG_F_ACCOUNT) {
+ rd_dassert(rk || rkm->rkm_rkmessage.rkt);
+ rd_kafka_curr_msgs_sub(rk ? rk : rkm->rkm_rkmessage.rkt->rkt_rk,
+ 1, rkm->rkm_len);
+ }
+ if (rkm->rkm_headers)
+ rd_kafka_headers_destroy(rkm->rkm_headers);
+ if (likely(rkm->rkm_rkmessage.rkt != NULL))
+ rd_kafka_topic_destroy1(rkm->rkm_rkmessage.rkt, add);
+ if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE && rkm->rkm_payload)
+ rd_free(rkm->rkm_payload);
+ if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE_RKM)
+ rd_free(rkm);
+}
void rd_kafka_msg_destroy(rd_kafka_t *rk, rd_kafka_msg_t *rkm) {
// FIXME
@@ -337,7 +352,7 @@
/* Partition the message */
- err = rd_kafka_msg_partitioner(rkt, rkm, 1);
+ err = rd_kafka_msg_partitioner(rkt, rkm, 0);
if (likely(!err)) {
rd_kafka_set_last_error(0, 0);
return 0;
@@ -1268,7 +1283,18 @@
return 0;
}
-
+/**
+ * @name refcnt will handle batch later
+ */
+void rd_kafka_message_destroy_1(rd_kafka_message_t *rkmessage, uint64_t *add1, uint64_t *add2, uint64_t *add3) {
+ rd_kafka_op_t *rko;
+ if (likely((rko = (rd_kafka_op_t *)rkmessage->_private) != NULL))
+ rd_kafka_op_destroy_1(rko, add1, add2, add3);
+ else {
+ rd_kafka_msg_t *rkm = rd_kafka_message2msg(rkmessage);
+ rd_kafka_msg_destroy(NULL, rkm);
+ }
+}
/**
* @name Public message type (rd_kafka_message_t)
@@ -1284,6 +1310,17 @@
}
}
+void rd_kafka_toppar_destroy_sub_ref_batch(uint64_t add, uint32_t refNum){
+ rd_kafka_toppar_t *rktp =(rd_kafka_toppar_t *)add;
+ if(unlikely(rktp == NULL)) {
+ return;
+ }
+
+ if (unlikely(rd_refcnt_sub_n(&rktp->rktp_refcnt, refNum) == 0)){
+ rd_kafka_toppar_destroy_final(rktp);
+ }
+}
+
rd_kafka_message_t *rd_kafka_message_new(void) {
rd_kafka_msg_t *rkm = rd_calloc(1, sizeof(*rkm));
@@ -1818,6 +1855,10 @@
(msg_cnt > 0 && now >= rkmq->rkmq_wakeup.abstime)) {
/* Prevent further signalling */
rkmq->rkmq_wakeup.signalled = rd_true;
+// printf("msg_cnt(%d)>= batch_msg_cnt(%d) || msg_bytes(%lld) >= batch_msg_bytes(%lld) "
+// "|| (msg_cnt(%d) > 0 && now(%lld) >= rkmq->rkmq_wakeup.abstime(%lld)): %d\n",
+// msg_cnt ,batch_msg_cnt, msg_bytes ,batch_msg_bytes,msg_cnt , now ,
+// rkmq->rkmq_wakeup.abstime, now >= rkmq->rkmq_wakeup.abstime?1:0);
/* Batch is ready */
return rd_true;
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -3777,6 +3777,9 @@
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
+int32_t rd_kafka_get_partition_num(rd_kafka_topic_t *rk) {
+ return rk->rkt_partition_cnt;
+}
/**
* @brief get_offsets_for_times() state
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -1497,7 +1497,17 @@
RD_EXPORT
void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage);
+RD_EXPORT
+void rd_kafka_message_destroy_1(rd_kafka_message_t *rkmessage, uint64_t *add1, uint64_t *add2, uint64_t *add3);
+
+RD_EXPORT
+void rd_kafka_toppar_destroy_sub_ref_batch(uint64_t add, uint32_t refNum);
+RD_EXPORT
+void rd_kafka_buf_destroy_sub_ref_batch(uint64_t add, uint32_t refNum);
+
+RD_EXPORT
+void rd_kafka_topic_destroy_sub_ref_batch(uint64_t add, uint32_t refNum);
/**
* @brief Returns the error string for an errored rd_kafka_message_t or NULL if
@@ -3340,6 +3350,9 @@
int64_t *low,
int64_t *high);
+RD_EXPORT int32_t rd_kafka_get_partition_num(rd_kafka_topic_t *rk);
+
+
/**
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -778,6 +778,17 @@
if (rk->rk_type != RD_KAFKA_PRODUCER)
return RD_KAFKA_RESP_ERR_NO_ERROR;
+ /*static __thread int32_t count = 0;
+ ++count;
+ if (count == 5000) {
+ printf("Debug: rk->rk_curr_msgs.max_cnt = %d, k->rk_curr_msgs.cnt = %d, "
+ "rk->rk_curr_msgs.size = %zu"
+ "rk->rk_curr_msgs.max_size = %zu, cnt = %d, size = %u \n",
+ rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.cnt,
+ rk->rk_curr_msgs.size, rk->rk_curr_msgs.max_size,
+ cnt, size);
+ count = 0;
+ }*/
mtx_lock(&rk->rk_curr_msgs.lock);
while (
unlikely((rk->rk_curr_msgs.max_cnt > 0 &&
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -48,6 +48,18 @@
return new TopicPartitionImpl(topic, partition, offset);
}
+void RdKafka::RdKafkaCInterface::DeleteTopParRefCnt(uint64_t add, uint32_t refNum){
+ rd_kafka_toppar_destroy_sub_ref_batch(add, refNum);
+}
+
+void RdKafka::RdKafkaCInterface::DeleteTopicRecCnt(uint64_t add, uint32_t refNum){
+ rd_kafka_topic_destroy_sub_ref_batch(add, refNum);
+}
+
+void RdKafka::RdKafkaCInterface::DeleteBufRefCnt(uint64_t add, uint32_t refNum){
+ rd_kafka_buf_destroy_sub_ref_batch(add, refNum);
+}
+
void RdKafka::TopicPartition::destroy(
std::vector<TopicPartition *> &partitions) {
for (std::vector<TopicPartition *>::iterator it = partitions.begin();
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -339,6 +339,20 @@
#define rd_refcnt_add0(R) rd_atomic32_add(R, 1)
#endif
+static RD_INLINE RD_UNUSED int rd_refcnt_sub_n(rd_refcnt_t *R, uint32_t n) {
+ int r;
+#ifdef RD_REFCNT_USE_LOCKS
+ mtx_lock(&R->lock);
+ r = (R->v)-n;
+ mtx_unlock(&R->lock);
+#else
+ r = rd_atomic32_sub(R, n);
+#endif
+ if (r < 0)
+ rd_assert(!*"refcnt sub-zero");
+ return r;
+}
+
static RD_INLINE RD_UNUSED int rd_refcnt_sub0(rd_refcnt_t *R) {
int r;
#ifdef RD_REFCNT_USE_LOCKS
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -411,7 +411,53 @@
return 1;
}
+void rd_kafka_buf_destroy_sub_ref_batch(uint64_t add, uint32_t refNum){
+ rd_kafka_buf_t *request =(rd_kafka_buf_t *)add;
+ if(unlikely(request == NULL)) {
+ return;
+ }
+ if (rd_refcnt_sub_n(&(request)->rkbuf_refcnt, refNum) > 0){
+ return;
+ }
+ rd_kafka_buf_destroy_final(request);
+}
+
+void rd_kafka_buf_handle_op_1(rd_kafka_op_t *rko, rd_kafka_resp_err_t err, uint64_t *add) {
+ rd_kafka_buf_t *request, *response;
+ rd_kafka_t *rk;
+
+ request = rko->rko_u.xbuf.rkbuf;
+ rko->rko_u.xbuf.rkbuf = NULL;
+
+ /* NULL on op_destroy() */
+ if (request->rkbuf_replyq.q) {
+ int32_t version = request->rkbuf_replyq.version;
+ /* Current queue usage is done, but retain original replyq for
+ * future retries, stealing
+ * the current reference. */
+ request->rkbuf_orig_replyq = request->rkbuf_replyq;
+ rd_kafka_replyq_clear(&request->rkbuf_replyq);
+ /* Callback might need to version check so we retain the
+ * version across the clear() call which clears it. */
+ request->rkbuf_replyq.version = version;
+ }
+
+ if (!request->rkbuf_cb) {
+ *add = (uint64_t) request;
+ return;
+ }
+ /* Let buf_callback() do destroy()s */
+ response = request->rkbuf_response; /* May be NULL */
+ request->rkbuf_response = NULL;
+
+ if (!(rk = rko->rko_rk)) {
+ rd_assert(request->rkbuf_rkb != NULL);
+ rk = request->rkbuf_rkb->rkb_rk;
+ }
+
+ rd_kafka_buf_callback(rk, request->rkbuf_rkb, err, response, request);
+}
/**
* @brief Handle RD_KAFKA_OP_RECV_BUF.
*/
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -1027,7 +1027,7 @@
rd_kafka_bufq_t *rkbq);
int rd_kafka_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
-
+void rd_kafka_buf_handle_op_1(rd_kafka_op_t *rko, rd_kafka_resp_err_t err, uint64_t *add);
void rd_kafka_buf_handle_op(rd_kafka_op_t *rko, rd_kafka_resp_err_t err);
void rd_kafka_buf_callback(rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -255,6 +255,7 @@
void rd_kafka_msg_destroy(rd_kafka_t *rk, rd_kafka_msg_t *rkm);
+void rd_kafka_msg_destroy_1(rd_kafka_t *rk, rd_kafka_msg_t *rkm, uint64_t *add);
int rd_kafka_msg_new(rd_kafka_topic_t *rkt,
int32_t force_partition,
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -304,6 +304,234 @@
return rko;
}
+void rd_kafka_op_destroy_1(rd_kafka_op_t *rko, uint64_t *add1, uint64_t *add2, uint64_t *add3) {
+ *add2 = 0;
+ *add3 = 0;
+ /* Call ops callback with ERR__DESTROY to let it
+ * clean up its resources. */
+ if ((rko->rko_type & RD_KAFKA_OP_CB) && rko->rko_op_cb) {
+ rd_kafka_op_res_t res;
+ rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY;
+ res = rko->rko_op_cb(rko->rko_rk, NULL, rko);
+ rd_assert(res != RD_KAFKA_OP_RES_YIELD);
+ rd_assert(res != RD_KAFKA_OP_RES_KEEP);
+ }
+
+ switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) {
+ case RD_KAFKA_OP_FETCH:
+ rd_kafka_msg_destroy_1(NULL, &rko->rko_u.fetch.rkm, add2);
+ /* Decrease refcount on rkbuf to eventually rd_free shared buf*/
+ if (rko->rko_u.fetch.rkbuf)
+ rd_kafka_buf_handle_op_1(rko, RD_KAFKA_RESP_ERR__DESTROY, add3);
+
+ break;
+
+ case RD_KAFKA_OP_OFFSET_FETCH:
+ if (rko->rko_u.offset_fetch.partitions &&
+ rko->rko_u.offset_fetch.do_free)
+ rd_kafka_topic_partition_list_destroy(
+ rko->rko_u.offset_fetch.partitions);
+ break;
+
+ case RD_KAFKA_OP_OFFSET_COMMIT:
+ RD_IF_FREE(rko->rko_u.offset_commit.partitions,
+ rd_kafka_topic_partition_list_destroy);
+ RD_IF_FREE(rko->rko_u.offset_commit.reason, rd_free);
+ break;
+
+ case RD_KAFKA_OP_SUBSCRIBE:
+ case RD_KAFKA_OP_GET_SUBSCRIPTION:
+ RD_IF_FREE(rko->rko_u.subscribe.topics,
+ rd_kafka_topic_partition_list_destroy);
+ break;
+
+ case RD_KAFKA_OP_ASSIGN:
+ case RD_KAFKA_OP_GET_ASSIGNMENT:
+ RD_IF_FREE(rko->rko_u.assign.partitions,
+ rd_kafka_topic_partition_list_destroy);
+ break;
+
+ case RD_KAFKA_OP_REBALANCE:
+ RD_IF_FREE(rko->rko_u.rebalance.partitions,
+ rd_kafka_topic_partition_list_destroy);
+ break;
+
+ case RD_KAFKA_OP_NAME:
+ RD_IF_FREE(rko->rko_u.name.str, rd_free);
+ break;
+
+ case RD_KAFKA_OP_CG_METADATA:
+ RD_IF_FREE(rko->rko_u.cg_metadata,
+ rd_kafka_consumer_group_metadata_destroy);
+ break;
+
+ case RD_KAFKA_OP_ERR:
+ case RD_KAFKA_OP_CONSUMER_ERR:
+ RD_IF_FREE(rko->rko_u.err.errstr, rd_free);
+ rd_kafka_msg_destroy(NULL, &rko->rko_u.err.rkm);
+ break;
+
+ break;
+
+ case RD_KAFKA_OP_THROTTLE:
+ RD_IF_FREE(rko->rko_u.throttle.nodename, rd_free);
+ break;
+
+ case RD_KAFKA_OP_STATS:
+ RD_IF_FREE(rko->rko_u.stats.json, rd_free);
+ break;
+
+ case RD_KAFKA_OP_XMIT_RETRY:
+ case RD_KAFKA_OP_XMIT_BUF:
+ case RD_KAFKA_OP_RECV_BUF:
+ if (rko->rko_u.xbuf.rkbuf)
+ rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY);
+
+ RD_IF_FREE(rko->rko_u.xbuf.rkbuf, rd_kafka_buf_destroy);
+ break;
+
+ case RD_KAFKA_OP_DR:
+ rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq);
+ if (rko->rko_u.dr.do_purge2)
+ rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq2);
+
+ if (rko->rko_u.dr.rkt)
+ rd_kafka_topic_destroy0(rko->rko_u.dr.rkt);
+ if (rko->rko_u.dr.presult)
+ rd_kafka_Produce_result_destroy(rko->rko_u.dr.presult);
+ break;
+
+ case RD_KAFKA_OP_OFFSET_RESET:
+ RD_IF_FREE(rko->rko_u.offset_reset.reason, rd_free);
+ break;
+
+ case RD_KAFKA_OP_METADATA:
+ RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy);
+ /* It's not needed to free metadata.mdi because they
+ are the in the same memory allocation. */
+ break;
+
+ case RD_KAFKA_OP_LOG:
+ rd_free(rko->rko_u.log.str);
+ break;
+
+ case RD_KAFKA_OP_ADMIN_FANOUT:
+ rd_assert(rko->rko_u.admin_request.fanout.outstanding == 0);
+ rd_list_destroy(&rko->rko_u.admin_request.fanout.results);
+ case RD_KAFKA_OP_CREATETOPICS:
+ case RD_KAFKA_OP_DELETETOPICS:
+ case RD_KAFKA_OP_CREATEPARTITIONS:
+ case RD_KAFKA_OP_ALTERCONFIGS:
+ case RD_KAFKA_OP_INCREMENTALALTERCONFIGS:
+ case RD_KAFKA_OP_DESCRIBECONFIGS:
+ case RD_KAFKA_OP_DELETERECORDS:
+ case RD_KAFKA_OP_LISTCONSUMERGROUPS:
+ case RD_KAFKA_OP_DESCRIBECONSUMERGROUPS:
+ case RD_KAFKA_OP_DELETEGROUPS:
+ case RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS:
+ case RD_KAFKA_OP_CREATEACLS:
+ case RD_KAFKA_OP_DESCRIBEACLS:
+ case RD_KAFKA_OP_DELETEACLS:
+ case RD_KAFKA_OP_ALTERCONSUMERGROUPOFFSETS:
+ case RD_KAFKA_OP_DESCRIBETOPICS:
+ case RD_KAFKA_OP_DESCRIBECLUSTER:
+ case RD_KAFKA_OP_LISTCONSUMERGROUPOFFSETS:
+ case RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS:
+ case RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS:
+ case RD_KAFKA_OP_LISTOFFSETS:
+ case RD_KAFKA_OP_ELECTLEADERS:
+ rd_kafka_replyq_destroy(&rko->rko_u.admin_request.replyq);
+ rd_list_destroy(&rko->rko_u.admin_request.args);
+ if (rko->rko_u.admin_request.options.match_consumer_group_states
+ .u.PTR) {
+ rd_list_destroy(rko->rko_u.admin_request.options
+ .match_consumer_group_states.u.PTR);
+ }
+ if (rko->rko_u.admin_request.options.match_consumer_group_types
+ .u.PTR) {
+ rd_list_destroy(rko->rko_u.admin_request.options
+ .match_consumer_group_types.u.PTR);
+ }
+ rd_assert(!rko->rko_u.admin_request.fanout_parent);
+ RD_IF_FREE(rko->rko_u.admin_request.coordkey, rd_free);
+ break;
+
+ case RD_KAFKA_OP_ADMIN_RESULT:
+ rd_list_destroy(&rko->rko_u.admin_result.args);
+ rd_list_destroy(&rko->rko_u.admin_result.results);
+ RD_IF_FREE(rko->rko_u.admin_result.errstr, rd_free);
+ rd_assert(!rko->rko_u.admin_result.fanout_parent);
+ ;
+ break;
+
+ case RD_KAFKA_OP_MOCK:
+ RD_IF_FREE(rko->rko_u.mock.name, rd_free);
+ RD_IF_FREE(rko->rko_u.mock.str, rd_free);
+ if (rko->rko_u.mock.metrics) {
+ int64_t i;
+ for (i = 0; i < rko->rko_u.mock.hi; i++)
+ rd_free(rko->rko_u.mock.metrics[i]);
+ rd_free(rko->rko_u.mock.metrics);
+ }
+ break;
+
+ case RD_KAFKA_OP_BROKER_MONITOR:
+ rd_kafka_broker_destroy(rko->rko_u.broker_monitor.rkb);
+ break;
+
+ case RD_KAFKA_OP_TXN:
+ RD_IF_FREE(rko->rko_u.txn.group_id, rd_free);
+ RD_IF_FREE(rko->rko_u.txn.offsets,
+ rd_kafka_topic_partition_list_destroy);
+ RD_IF_FREE(rko->rko_u.txn.cgmetadata,
+ rd_kafka_consumer_group_metadata_destroy);
+ break;
+
+ case RD_KAFKA_OP_LEADERS:
+ rd_assert(!rko->rko_u.leaders.eonce);
+ rd_assert(!rko->rko_u.leaders.replyq.q);
+ RD_IF_FREE(rko->rko_u.leaders.leaders, rd_list_destroy);
+ RD_IF_FREE(rko->rko_u.leaders.partitions,
+ rd_kafka_topic_partition_list_destroy);
+ break;
+
+ case RD_KAFKA_OP_METADATA_UPDATE:
+ RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy);
+ /* It's not needed to free metadata.mdi because they
+ are the in the same memory allocation. */
+ break;
+
+ case RD_KAFKA_OP_SET_TELEMETRY_BROKER:
+ RD_IF_FREE(rko->rko_u.telemetry_broker.rkb,
+ rd_kafka_broker_destroy);
+ break;
+
+ default:
+ break;
+ }
+
+ if(rko->rko_rktp != NULL){
+ rd_kafka_toppar_t *rkoTp;
+ rkoTp = rko->rko_rktp;
+ // may not need check
+ if(unlikely(rkoTp->rktp_refcnt.val <= 0)){
+ *add1 = 0;
+ }else{
+ *add1 = (uint64_t)rkoTp;
+ }
+ }
+
+ RD_IF_FREE(rko->rko_error, rd_kafka_error_destroy);
+
+ rd_kafka_replyq_destroy(&rko->rko_replyq);
+
+#if ENABLE_DEVEL
+ if (rd_atomic32_sub(&rd_kafka_op_cnt, 1) < 0)
+ rd_kafka_assert(NULL, !*"rd_kafka_op_cnt < 0");
+#endif
+ rd_free(rko);
+
+}
void rd_kafka_op_destroy(rd_kafka_op_t *rko) {
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -709,6 +709,7 @@
const char *rd_kafka_op2str(rd_kafka_op_type_t type);
void rd_kafka_op_destroy(rd_kafka_op_t *rko);
+void rd_kafka_op_destroy_1(rd_kafka_op_t *rko, uint64_t *add1, uint64_t *add2, uint64_t *add3);
rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type);
#if ENABLE_DEVEL
#define _STRINGIFYX(A) #A
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -1770,7 +1770,14 @@
rd_list_destroy(&query_topics);
}
-
+void rd_kafka_topic_destroy_sub_ref_batch(uint64_t add, uint32_t refNum) {
+ rd_kafka_topic_t *rkt =(rd_kafka_toppar_t *)add;
+ if(unlikely(rkt == NULL)) {
+ return;
+ }
+ if (unlikely(rd_refcnt_sub_n(&rkt->rkt_refcnt, refNum) == 0))
+ rd_kafka_topic_destroy_final(rkt);
+}
/**
* Locks: rd_kafka_topic_*lock() must be held.
*/
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -209,8 +209,14 @@
rd_kafka_topic_t *rd_kafka_topic_proper(rd_kafka_topic_t *app_rkt);
-
-
+static RD_INLINE RD_UNUSED void rd_kafka_topic_destroy1(rd_kafka_topic_t *rkt, uint64_t *add) {
+ rd_kafka_lwtopic_t *lrkt;
+ if (unlikely((lrkt = rd_kafka_rkt_get_lw(rkt)) != NULL))
+ rd_kafka_lwtopic_destroy(lrkt);
+ else{
+ *add = (uint64_t)rkt;
+ }
+}
/**
* @brief Loose reference to topic object as increased by ..topic_keep().
*/
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
@@ -686,9 +686,13 @@
rd_kafka_app_poll_start(rk, 0, timeout_ms);
rd_kafka_yield_thread = 0;
+ rd_kafka_op_t *batch[64];
+ int batch_cnt = 0;// 批量大小可调
while (cnt < rkmessages_size) {
rd_kafka_op_res_t res;
+ batch_cnt = 0;
+
mtx_lock(&rkq->rkq_lock);
while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) &&
@@ -704,50 +708,57 @@
break; /* Timed out */
}
- rd_kafka_q_deq0(rkq, rko);
+ while (batch_cnt < 64 && cnt + batch_cnt < rkmessages_size && (rko = TAILQ_FIRST(&rkq->rkq_q))) {
+ rd_kafka_q_deq0(rkq, rko);
+ batch[batch_cnt++] = rko;
+ }
mtx_unlock(&rkq->rkq_lock);
- if (unlikely(rko->rko_type == RD_KAFKA_OP_BARRIER)) {
- cnt = (unsigned int)rd_kafka_purge_outdated_messages(
- rko->rko_rktp, rko->rko_version, rkmessages, cnt,
- &ctrl_msg_q);
- rd_kafka_op_destroy(rko);
- continue;
- }
+ for (i = 0; i < batch_cnt; i++) {
+ rko = batch[i];
+ if (unlikely(rko->rko_type == RD_KAFKA_OP_BARRIER)) {
+ cnt = (unsigned int)rd_kafka_purge_outdated_messages(
+ rko->rko_rktp, rko->rko_version, rkmessages, cnt,
+ &ctrl_msg_q);
+ rd_kafka_op_destroy(rko);
+ continue;
+ }
- if (rd_kafka_op_version_outdated(rko, 0)) {
- /* Outdated op, put on discard queue */
- TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
- continue;
- }
+ if (rd_kafka_op_version_outdated(rko, 0)) {
+ /* Outdated op, put on discard queue */
+ TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
+ continue;
+ }
- /* Serve non-FETCH callbacks */
- res =
- rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL);
- if (res == RD_KAFKA_OP_RES_KEEP ||
- res == RD_KAFKA_OP_RES_HANDLED) {
- /* Callback served, rko is destroyed (if HANDLED). */
- continue;
- } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
- rd_kafka_yield_thread)) {
- /* Yield. */
- break;
- }
- rd_dassert(res == RD_KAFKA_OP_RES_PASS);
+ /* Serve non-FETCH callbacks */
+ res =
+ rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL);
+ if (res == RD_KAFKA_OP_RES_KEEP ||
+ res == RD_KAFKA_OP_RES_HANDLED) {
+ /* Callback served, rko is destroyed (if HANDLED). */
+ continue;
+ } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
+ rd_kafka_yield_thread)) {
+ /* Yield. */
+ break;
+ }
+ rd_dassert(res == RD_KAFKA_OP_RES_PASS);
- /* If this is a control messages, don't return message to
- * application. Add it to a tmp queue from where we can store
- * the offset and destroy the op */
- if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) {
- TAILQ_INSERT_TAIL(&ctrl_msg_q, rko, rko_link);
- continue;
- }
+ /* If this is a control messages, don't return message to
+ * application. Add it to a tmp queue from where we can store
+ * the offset and destroy the op */
+ if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) {
+ TAILQ_INSERT_TAIL(&ctrl_msg_q, rko, rko_link);
+ continue;
+ }
- /* Get rkmessage from rko and append to array. */
- rkmessages[cnt++] = rd_kafka_message_get(rko);
- }
+ /* Get rkmessage from rko and append to array. */
+ rkmessages[cnt++] = rd_kafka_message_get(rko);
+ }
+ }
+
for (i = cnt - 1; i >= 0; i--) {
rko = (rd_kafka_op_t *)rkmessages[i]->_private;
rd_kafka_toppar_t *rktp = rko->rko_rktp;