Subject: [PATCH] modify source lock granularity from row to batch(64)
delete nouse test
message destruction enable batch sub ref count
reduce vector copy
optimize batch api by avoiding frequent resize
static thread batch kafka and partition range
fix mem bug
add batch produce in cpp remove debug print in kafka msg add test for interface
add consumebatch api
---
Index: src-cpp/KafkaConsumerImpl.cpp
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src-cpp/KafkaConsumerImpl.cpp b/src-cpp/KafkaConsumerImpl.cpp
--- a/src-cpp/KafkaConsumerImpl.cpp	(revision cb8c19c43011b66c4b08b25e5150455a247e1ff3)
+++ b/src-cpp/KafkaConsumerImpl.cpp	(revision 320e48c32dd77c8c64fc0c4d1e05818baf3782c2)
@@ -28,6 +28,10 @@
 
 #include <string>
 #include <vector>
+#include <chrono>
+#include <climits>
+#include <memory>
+#include <algorithm>
 
 #include "rdkafkacpp_int.h"
 
@@ -73,6 +77,7 @@
   }
 
   rkc->rk_ = rk;
+  rkc->rk_queue_ = rd_kafka_queue_get_consumer(rk);
 
   /* Redirect handle queue to cgrp's queue to provide a single queue point */
   rd_kafka_poll_set_consumer(rk);
@@ -119,6 +124,37 @@
 }
 
 
+std::unordered_map<RdKafka::TopicPartition*, std::vector<RdKafka::Message*>> RdKafka::KafkaConsumerImpl::consumeBatch(int timeout_ms, size_t maxPollRecords) {
+        std::vector<rd_kafka_message_t*> msgPtrArray(maxPollRecords);
+        auto msgReceived = rd_kafka_consume_batch_queue(rk_queue_, timeout_ms, msgPtrArray.data(), maxPollRecords);
+        std::unordered_map<RdKafka::TopicPartition*, std::vector<RdKafka::Message*>> result;
+        if (msgReceived < 0) {
+            return result;
+        }
+
+        for (int i = 0; i < msgReceived; ++i) {
+                rd_kafka_message_t *cur = msgPtrArray[i];
+             if (cur) {
+                     RdKafka::Message *msg = new RdKafka::MessageImpl(RD_KAFKA_CONSUMER, cur);
+                     const std::string &topic = msg->topic_name();
+                     int partition = msg->partition();
+
+                     auto key_it = std::find_if(result.begin(), result.end(), [&](const auto& pair) {
+                             return pair.first->topic() == topic && pair.first->partition() == partition;
+                     });
+                     if (key_it != result.end()) {
+                             key_it->second.push_back(msg);
+                     } else {
+                             auto key = RdKafka::TopicPartition::create(topic, partition);
+                             auto& msgVec = result[key];
+                             msgVec.reserve(maxPollRecords);
+                             msgVec.push_back(msg);
+                     }
+             }
+        }
+        return result;
+};
+
 
 RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::assignment(
     std::vector<RdKafka::TopicPartition *> &partitions) {
Index: src-cpp/rdkafkacpp.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src-cpp/rdkafkacpp.h b/src-cpp/rdkafkacpp.h
--- a/src-cpp/rdkafkacpp.h	(revision cb8c19c43011b66c4b08b25e5150455a247e1ff3)
+++ b/src-cpp/rdkafkacpp.h	(revision c5166872e7f4406cbe81a23ff1b40e2a8392059a)
@@ -55,6 +55,7 @@
 #include <cstring>
 #include <stdint.h>
 #include <sys/types.h>
+#include <unordered_map>
 
 #ifdef _WIN32
 #ifndef ssize_t
@@ -2080,6 +2081,8 @@
    * @returns \c rd_kafka_topic_t*
    */
   virtual struct rd_kafka_topic_s *c_ptr() = 0;
+
+  virtual int32_t get_partition_num() = 0;
 };
 
 
@@ -2358,7 +2361,12 @@
   virtual size_t size() const = 0;
 };
 
-
+class RD_EXPORT RdKafkaCInterface {
+      public:
+        static void DeleteTopParRefCnt(uint64_t add, uint32_t refNum);
+        static void DeleteTopicRecCnt(uint64_t add, uint32_t refNum);
+        static void DeleteBufRefCnt(uint64_t add, uint32_t refNum);
+};
 /**
  * @brief Message object
  *
@@ -2412,6 +2420,8 @@
   /** @returns Topic name (if applicable, else empty string) */
   virtual std::string topic_name() const = 0;
 
+  virtual void DeleteMessage(uint64_t *add1, uint64_t *add2, uint64_t *add3) = 0;
+
   /** @returns Partition (if applicable) */
   virtual int32_t partition() const = 0;
 
@@ -2737,6 +2747,8 @@
    */
   virtual Message *consume(int timeout_ms) = 0;
 
+  virtual std::unordered_map<RdKafka::TopicPartition*, std::vector<RdKafka::Message*>> consumeBatch(int timeout_ms, size_t maxPollRecords) = 0;
+
   /**
    * @brief Commit offsets for the current assignment.
    *
@@ -3351,6 +3363,16 @@
                             size_t len,
                             const std::string *key,
                             void *msg_opaque) = 0;
+
+
+  virtual ErrorCode produce(Topic *topic,
+                             int32_t partition,
+                             int msgflags,
+                             const std::vector<char*> &data,
+                             const std::vector<size_t> &lens,
+                             const void *key,
+                             size_t key_len,
+                             void *msg_opaque)  = 0;
 
   /**
    * @brief Variant produce() that passes the key as a pointer and length
Index: src-cpp/rdkafkacpp_int.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src-cpp/rdkafkacpp_int.h b/src-cpp/rdkafkacpp_int.h
--- a/src-cpp/rdkafkacpp_int.h	(revision cb8c19c43011b66c4b08b25e5150455a247e1ff3)
+++ b/src-cpp/rdkafkacpp_int.h	(revision c5166872e7f4406cbe81a23ff1b40e2a8392059a)
@@ -378,14 +378,21 @@
 class MessageImpl : public Message {
  public:
   ~MessageImpl() {
-    if (free_rkmessage_)
-      rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_));
+  //rkmessage_需要调用DeleteMessage手动释放,注意引用计数通过参数返回后需要批量处理
+//    if (free_rkmessage_)
+//      rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_));
     if (key_)
       delete key_;
     if (headers_)
       delete headers_;
   }
 
+  void DeleteMessage(uint64_t *add1, uint64_t *add2, uint64_t *add3) {
+          if (free_rkmessage_)
+                  rd_kafka_message_destroy_1(const_cast<rd_kafka_message_t *>(rkmessage_), add1, add2, add3);
+  }
+
+
   MessageImpl(rd_kafka_type_t rk_type,
               RdKafka::Topic *topic,
               rd_kafka_message_t *rkmessage) :
@@ -1180,6 +1187,7 @@
   }
 
   rd_kafka_t *rk_;
+  rd_kafka_queue_t* rk_queue_;
   /* All Producer and Consumer callbacks must reside in HandleImpl and
    * the opaque provided to rdkafka must be a pointer to HandleImpl, since
    * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to
@@ -1217,6 +1225,10 @@
         rd_kafka_offset_store(rkt_, partition, offset));
   }
 
+  int32_t get_partition_num () {
+    return rd_kafka_get_partition_num(rkt_);
+  }
+
   static Topic *create(Handle &base, const std::string &topic, Conf *conf);
 
   struct rd_kafka_topic_s *c_ptr() {
@@ -1339,8 +1351,10 @@
                           virtual public HandleImpl {
  public:
   ~KafkaConsumerImpl() {
-    if (rk_)
-      rd_kafka_destroy_flags(rk_, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
+    if (rk_) {
+            rd_kafka_destroy_flags(rk_, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
+            rd_kafka_queue_destroy(rk_queue_);
+    }
   }
 
   static KafkaConsumer *create(Conf *conf, std::string &errstr);
@@ -1360,6 +1374,7 @@
   Error *incremental_unassign(const std::vector<TopicPartition *> &partitions);
 
   Message *consume(int timeout_ms);
+  std::unordered_map<RdKafka::TopicPartition*, std::vector<RdKafka::Message*>> consumeBatch(int timeout_ms, size_t maxPollRecords);
   ErrorCode commitSync() {
     return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 0 /*sync*/));
   }
@@ -1507,10 +1522,16 @@
 
 
 class ProducerImpl : virtual public Producer, virtual public HandleImpl {
+private:
+  std::vector<rd_kafka_message_t> msgs;
  public:
-  ~ProducerImpl() {
+   explicit ProducerImpl(int32_t limit = 100000) {
+    msgs.reserve(limit);
+   }
+   ~ProducerImpl() {
     if (rk_)
       rd_kafka_destroy(rk_);
+      msgs.clear();
   }
 
   ErrorCode produce(Topic *topic,
@@ -1529,6 +1550,15 @@
                     const void *key,
                     size_t key_len,
                     void *msg_opaque);
+
+  ErrorCode produce(Topic *topic,
+                    int32_t partition,
+                    int msgflags,
+                    const std::vector<char*> &data,
+                    const std::vector<size_t> &lens,
+                    const void *key,
+                    size_t key_len,
+                    void *msg_opaque) ;
 
   ErrorCode produce(Topic *topic,
                     int32_t partition,
Index: src/rdkafka_conf.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h
--- a/src/rdkafka_conf.h	(revision cb8c19c43011b66c4b08b25e5150455a247e1ff3)
+++ b/src/rdkafka_conf.h	(revision e7d7e25803bf2045d4d40ec02883ffdf1eadf726)
@@ -197,6 +197,7 @@
         /*
          * Generic configuration
          */
+        int max_poll_records;
         int enabled_events;
         int max_msg_size;
         int msg_copy_max_size;
Index: src-cpp/ProducerImpl.cpp
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src-cpp/ProducerImpl.cpp b/src-cpp/ProducerImpl.cpp
--- a/src-cpp/ProducerImpl.cpp	(revision e7d7e25803bf2045d4d40ec02883ffdf1eadf726)
+++ b/src-cpp/ProducerImpl.cpp	(revision 2c281b0c9d3b0457f20381d3fa195c412d43d9fe)
@@ -127,6 +127,48 @@
 }
 
 
+RdKafka::ErrorCode RdKafka::ProducerImpl::produce(Topic *topic,
+                                            int32_t partition,
+                                            int msgflags,
+                                            const std::vector<char*> &data,
+                                            const std::vector<size_t> &lens,
+                                            const void *key,
+                                                  size_t keylen,
+                                            void *msg_opaque){
+  RdKafka::TopicImpl *topicimpl = reinterpret_cast<RdKafka::TopicImpl *>(topic);
+  int32_t cycles =  data.size();
+  msgs.resize(cycles);
+
+  std::vector<int> msgidps;
+  msgidps.resize(cycles);
+  for (int i = 0; i < cycles; i++) {
+          int* msgidp     = &msgidps[i];
+
+          msgs[i].payload   = data[i];
+          msgs[i].len       = lens[i];
+          msgs[i]._private  = msgidp;
+          msgs[i].partition = partition; /* Will be ignored since
+                                                * RD_KAFKA_MSG_F_PARTITION
+                                                * is not supplied. */
+  }
+
+  auto buffSize = rd_kafka_produce_batch(topicimpl->rkt_, partition,
+                                       msgflags | RD_KAFKA_MSG_F_PARTITION, msgs.data(), cycles);
+  if (buffSize != cycles) {
+        for (int i=0;i<cycles;++i) {
+      if (msgs[i].err != RD_KAFKA_RESP_ERR_NO_ERROR) {
+              std::cout<<"error : buffer size "<<buffSize <<
+                  ", but need buffer size is "<< cycles<<
+                  ",reason is "<<
+                  rd_kafka_err2str(msgs[i].err)<< std::endl;
+              return static_cast<RdKafka::ErrorCode>(msgs[i].err);
+      }
+    }
+
+  }
+  return RdKafka::ERR_NO_ERROR;
+}
+
 RdKafka::ErrorCode RdKafka::ProducerImpl::produce(
     RdKafka::Topic *topic,
     int32_t partition,
Index: src/rdkafka_msg.c
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rdkafka_msg.c b/src/rdkafka_msg.c
--- a/src/rdkafka_msg.c	(revision e7d7e25803bf2045d4d40ec02883ffdf1eadf726)
+++ b/src/rdkafka_msg.c	(revision c5166872e7f4406cbe81a23ff1b40e2a8392059a)
@@ -113,7 +113,22 @@
 
         return RD_KAFKA_RESP_ERR__STATE;
 }
-
+void rd_kafka_msg_destroy_1(rd_kafka_t *rk, rd_kafka_msg_t *rkm, uint64_t *add) {
+        // FIXME
+        if (rkm->rkm_flags & RD_KAFKA_MSG_F_ACCOUNT) {
+                rd_dassert(rk || rkm->rkm_rkmessage.rkt);
+                rd_kafka_curr_msgs_sub(rk ? rk : rkm->rkm_rkmessage.rkt->rkt_rk,
+                                       1, rkm->rkm_len);
+        }
+        if (rkm->rkm_headers)
+                rd_kafka_headers_destroy(rkm->rkm_headers);
+        if (likely(rkm->rkm_rkmessage.rkt != NULL))
+                rd_kafka_topic_destroy1(rkm->rkm_rkmessage.rkt, add);
+        if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE && rkm->rkm_payload)
+                rd_free(rkm->rkm_payload);
+        if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE_RKM)
+                rd_free(rkm);
+}
 
 void rd_kafka_msg_destroy(rd_kafka_t *rk, rd_kafka_msg_t *rkm) {
         // FIXME
@@ -337,7 +352,7 @@
 
 
         /* Partition the message */
-        err = rd_kafka_msg_partitioner(rkt, rkm, 1);
+        err = rd_kafka_msg_partitioner(rkt, rkm, 0);
         if (likely(!err)) {
                 rd_kafka_set_last_error(0, 0);
                 return 0;
@@ -1268,7 +1283,18 @@
         return 0;
 }
 
-
+/**
+ * @name refcnt will handle batch later
+ */
+void rd_kafka_message_destroy_1(rd_kafka_message_t *rkmessage, uint64_t *add1, uint64_t *add2, uint64_t *add3) {
+        rd_kafka_op_t *rko;
+        if (likely((rko = (rd_kafka_op_t *)rkmessage->_private) != NULL))
+                rd_kafka_op_destroy_1(rko, add1, add2, add3);
+        else {
+                rd_kafka_msg_t *rkm = rd_kafka_message2msg(rkmessage);
+                rd_kafka_msg_destroy(NULL, rkm);
+        }
+}
 
 /**
  * @name Public message type (rd_kafka_message_t)
@@ -1284,6 +1310,17 @@
         }
 }
 
+void rd_kafka_toppar_destroy_sub_ref_batch(uint64_t add, uint32_t refNum){
+        rd_kafka_toppar_t *rktp =(rd_kafka_toppar_t *)add;
+        if(unlikely(rktp == NULL)) {
+              return;
+        }
+
+        if (unlikely(rd_refcnt_sub_n(&rktp->rktp_refcnt, refNum) == 0)){
+                rd_kafka_toppar_destroy_final(rktp);
+        }
+}
+
 
 rd_kafka_message_t *rd_kafka_message_new(void) {
         rd_kafka_msg_t *rkm = rd_calloc(1, sizeof(*rkm));
@@ -1818,6 +1855,10 @@
             (msg_cnt > 0 && now >= rkmq->rkmq_wakeup.abstime)) {
                 /* Prevent further signalling */
                 rkmq->rkmq_wakeup.signalled = rd_true;
+//                printf("msg_cnt(%d)>= batch_msg_cnt(%d) || msg_bytes(%lld) >= batch_msg_bytes(%lld) "
+//                    "|| (msg_cnt(%d) > 0 && now(%lld) >= rkmq->rkmq_wakeup.abstime(%lld)): %d\n",
+//                    msg_cnt ,batch_msg_cnt, msg_bytes ,batch_msg_bytes,msg_cnt , now ,
+//                    rkmq->rkmq_wakeup.abstime, now >= rkmq->rkmq_wakeup.abstime?1:0);
 
                 /* Batch is ready */
                 return rd_true;
Index: src/rdkafka.c
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rdkafka.c b/src/rdkafka.c
--- a/src/rdkafka.c	(revision 9b0e18072108ece81031034b788ca3241ac3a4c9)
+++ b/src/rdkafka.c	(revision 57908603f9f04067f39feb6dace944444905d7b5)
@@ -3777,6 +3777,9 @@
         return RD_KAFKA_RESP_ERR_NO_ERROR;
 }
 
+int32_t rd_kafka_get_partition_num(rd_kafka_topic_t *rk) {
+        return rk->rkt_partition_cnt;
+}
 
 /**
  * @brief get_offsets_for_times() state
Index: src/rdkafka.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rdkafka.h b/src/rdkafka.h
--- a/src/rdkafka.h	(revision 9b0e18072108ece81031034b788ca3241ac3a4c9)
+++ b/src/rdkafka.h	(revision c5166872e7f4406cbe81a23ff1b40e2a8392059a)
@@ -1497,7 +1497,17 @@
 RD_EXPORT
 void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage);
 
+RD_EXPORT
+void rd_kafka_message_destroy_1(rd_kafka_message_t *rkmessage, uint64_t *add1, uint64_t *add2, uint64_t *add3);
+
+RD_EXPORT
+void rd_kafka_toppar_destroy_sub_ref_batch(uint64_t add, uint32_t refNum);
 
+RD_EXPORT
+void rd_kafka_buf_destroy_sub_ref_batch(uint64_t add, uint32_t refNum);
+
+RD_EXPORT
+void rd_kafka_topic_destroy_sub_ref_batch(uint64_t add, uint32_t refNum);
 
 /**
  * @brief Returns the error string for an errored rd_kafka_message_t or NULL if
@@ -3340,6 +3350,9 @@
                                                              int64_t *low,
                                                              int64_t *high);
 
+RD_EXPORT int32_t rd_kafka_get_partition_num(rd_kafka_topic_t *rk);
+
+
 
 
 /**
Index: src/rdkafka_int.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h
--- a/src/rdkafka_int.h	(revision 9b0e18072108ece81031034b788ca3241ac3a4c9)
+++ b/src/rdkafka_int.h	(revision 2c281b0c9d3b0457f20381d3fa195c412d43d9fe)
@@ -778,6 +778,17 @@
         if (rk->rk_type != RD_KAFKA_PRODUCER)
                 return RD_KAFKA_RESP_ERR_NO_ERROR;
 
+        /*static __thread int32_t count = 0;
+        ++count;
+        if (count == 5000) {
+                printf("Debug: rk->rk_curr_msgs.max_cnt = %d, k->rk_curr_msgs.cnt = %d,  "
+                    "rk->rk_curr_msgs.size = %zu"
+                    "rk->rk_curr_msgs.max_size = %zu, cnt = %d, size = %u \n",
+                    rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.cnt,
+                    rk->rk_curr_msgs.size, rk->rk_curr_msgs.max_size,
+                    cnt, size);
+                count = 0;
+        }*/
         mtx_lock(&rk->rk_curr_msgs.lock);
         while (
             unlikely((rk->rk_curr_msgs.max_cnt > 0 &&
Index: src-cpp/TopicPartitionImpl.cpp
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src-cpp/TopicPartitionImpl.cpp b/src-cpp/TopicPartitionImpl.cpp
--- a/src-cpp/TopicPartitionImpl.cpp	(revision 2c281b0c9d3b0457f20381d3fa195c412d43d9fe)
+++ b/src-cpp/TopicPartitionImpl.cpp	(revision c5166872e7f4406cbe81a23ff1b40e2a8392059a)
@@ -48,6 +48,18 @@
   return new TopicPartitionImpl(topic, partition, offset);
 }
 
+void RdKafka::RdKafkaCInterface::DeleteTopParRefCnt(uint64_t add, uint32_t refNum){
+        rd_kafka_toppar_destroy_sub_ref_batch(add, refNum);
+}
+
+void RdKafka::RdKafkaCInterface::DeleteTopicRecCnt(uint64_t add, uint32_t refNum){
+        rd_kafka_topic_destroy_sub_ref_batch(add, refNum);
+}
+
+void RdKafka::RdKafkaCInterface::DeleteBufRefCnt(uint64_t add, uint32_t refNum){
+        rd_kafka_buf_destroy_sub_ref_batch(add, refNum);
+}
+
 void RdKafka::TopicPartition::destroy(
     std::vector<TopicPartition *> &partitions) {
   for (std::vector<TopicPartition *>::iterator it = partitions.begin();
Index: src/rd.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rd.h b/src/rd.h
--- a/src/rd.h	(revision 2c281b0c9d3b0457f20381d3fa195c412d43d9fe)
+++ b/src/rd.h	(revision c5166872e7f4406cbe81a23ff1b40e2a8392059a)
@@ -339,6 +339,20 @@
 #define rd_refcnt_add0(R) rd_atomic32_add(R, 1)
 #endif
 
+static RD_INLINE RD_UNUSED int rd_refcnt_sub_n(rd_refcnt_t *R, uint32_t n) {
+        int r;
+#ifdef RD_REFCNT_USE_LOCKS
+        mtx_lock(&R->lock);
+        r = (R->v)-n;
+        mtx_unlock(&R->lock);
+#else
+        r = rd_atomic32_sub(R, n);
+#endif
+        if (r < 0)
+                rd_assert(!*"refcnt sub-zero");
+        return r;
+}
+
 static RD_INLINE RD_UNUSED int rd_refcnt_sub0(rd_refcnt_t *R) {
         int r;
 #ifdef RD_REFCNT_USE_LOCKS
Index: src/rdkafka_buf.c
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rdkafka_buf.c b/src/rdkafka_buf.c
--- a/src/rdkafka_buf.c	(revision 2c281b0c9d3b0457f20381d3fa195c412d43d9fe)
+++ b/src/rdkafka_buf.c	(revision c5166872e7f4406cbe81a23ff1b40e2a8392059a)
@@ -411,7 +411,53 @@
         return 1;
 }
 
+void rd_kafka_buf_destroy_sub_ref_batch(uint64_t add, uint32_t refNum){
+        rd_kafka_buf_t *request =(rd_kafka_buf_t *)add;
+        if(unlikely(request == NULL)) {
+                return;
+        }
+        if (rd_refcnt_sub_n(&(request)->rkbuf_refcnt, refNum) > 0){
+             return;
+        }
+        rd_kafka_buf_destroy_final(request);
+}
+
+void rd_kafka_buf_handle_op_1(rd_kafka_op_t *rko, rd_kafka_resp_err_t err, uint64_t *add) {
+        rd_kafka_buf_t *request, *response;
+        rd_kafka_t *rk;
+
+        request               = rko->rko_u.xbuf.rkbuf;
+        rko->rko_u.xbuf.rkbuf = NULL;
+
+        /* NULL on op_destroy() */
+        if (request->rkbuf_replyq.q) {
+                int32_t version = request->rkbuf_replyq.version;
+                /* Current queue usage is done, but retain original replyq for
+                 * future retries, stealing
+                 * the current reference. */
+                request->rkbuf_orig_replyq = request->rkbuf_replyq;
+                rd_kafka_replyq_clear(&request->rkbuf_replyq);
+                /* Callback might need to version check so we retain the
+                 * version across the clear() call which clears it. */
+                request->rkbuf_replyq.version = version;
+        }
+
+        if (!request->rkbuf_cb) {
+                *add = (uint64_t) request;
+                return;
+        }
 
+        /* Let buf_callback() do destroy()s */
+        response                = request->rkbuf_response; /* May be NULL */
+        request->rkbuf_response = NULL;
+
+        if (!(rk = rko->rko_rk)) {
+                rd_assert(request->rkbuf_rkb != NULL);
+                rk = request->rkbuf_rkb->rkb_rk;
+        }
+
+        rd_kafka_buf_callback(rk, request->rkbuf_rkb, err, response, request);
+}
 /**
  * @brief Handle RD_KAFKA_OP_RECV_BUF.
  */
Index: src/rdkafka_buf.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rdkafka_buf.h b/src/rdkafka_buf.h
--- a/src/rdkafka_buf.h	(revision 2c281b0c9d3b0457f20381d3fa195c412d43d9fe)
+++ b/src/rdkafka_buf.h	(revision c5166872e7f4406cbe81a23ff1b40e2a8392059a)
@@ -1027,7 +1027,7 @@
                         rd_kafka_bufq_t *rkbq);
 
 int rd_kafka_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
-
+void rd_kafka_buf_handle_op_1(rd_kafka_op_t *rko, rd_kafka_resp_err_t err, uint64_t *add);
 void rd_kafka_buf_handle_op(rd_kafka_op_t *rko, rd_kafka_resp_err_t err);
 void rd_kafka_buf_callback(rd_kafka_t *rk,
                            rd_kafka_broker_t *rkb,
Index: src/rdkafka_msg.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rdkafka_msg.h b/src/rdkafka_msg.h
--- a/src/rdkafka_msg.h	(revision 2c281b0c9d3b0457f20381d3fa195c412d43d9fe)
+++ b/src/rdkafka_msg.h	(revision c5166872e7f4406cbe81a23ff1b40e2a8392059a)
@@ -255,6 +255,7 @@
 
 
 void rd_kafka_msg_destroy(rd_kafka_t *rk, rd_kafka_msg_t *rkm);
+void rd_kafka_msg_destroy_1(rd_kafka_t *rk, rd_kafka_msg_t *rkm, uint64_t *add);
 
 int rd_kafka_msg_new(rd_kafka_topic_t *rkt,
                      int32_t force_partition,
Index: src/rdkafka_op.c
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c
--- a/src/rdkafka_op.c	(revision 2c281b0c9d3b0457f20381d3fa195c412d43d9fe)
+++ b/src/rdkafka_op.c	(revision c5166872e7f4406cbe81a23ff1b40e2a8392059a)
@@ -304,6 +304,234 @@
         return rko;
 }
 
+void rd_kafka_op_destroy_1(rd_kafka_op_t *rko, uint64_t *add1, uint64_t *add2, uint64_t *add3) {
+        *add2 = 0;
+        *add3 = 0;
+        /* Call ops callback with ERR__DESTROY to let it
+         * clean up its resources. */
+        if ((rko->rko_type & RD_KAFKA_OP_CB) && rko->rko_op_cb) {
+                rd_kafka_op_res_t res;
+                rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY;
+                res          = rko->rko_op_cb(rko->rko_rk, NULL, rko);
+                rd_assert(res != RD_KAFKA_OP_RES_YIELD);
+                rd_assert(res != RD_KAFKA_OP_RES_KEEP);
+        }
+
+        switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) {
+        case RD_KAFKA_OP_FETCH:
+                rd_kafka_msg_destroy_1(NULL, &rko->rko_u.fetch.rkm, add2);
+                /* Decrease refcount on rkbuf to eventually rd_free shared buf*/
+                if (rko->rko_u.fetch.rkbuf)
+                        rd_kafka_buf_handle_op_1(rko, RD_KAFKA_RESP_ERR__DESTROY, add3);
+
+                break;
+
+        case RD_KAFKA_OP_OFFSET_FETCH:
+                if (rko->rko_u.offset_fetch.partitions &&
+                    rko->rko_u.offset_fetch.do_free)
+                        rd_kafka_topic_partition_list_destroy(
+                            rko->rko_u.offset_fetch.partitions);
+                break;
+
+        case RD_KAFKA_OP_OFFSET_COMMIT:
+                RD_IF_FREE(rko->rko_u.offset_commit.partitions,
+                           rd_kafka_topic_partition_list_destroy);
+                RD_IF_FREE(rko->rko_u.offset_commit.reason, rd_free);
+                break;
+
+        case RD_KAFKA_OP_SUBSCRIBE:
+        case RD_KAFKA_OP_GET_SUBSCRIPTION:
+                RD_IF_FREE(rko->rko_u.subscribe.topics,
+                           rd_kafka_topic_partition_list_destroy);
+                break;
+
+        case RD_KAFKA_OP_ASSIGN:
+        case RD_KAFKA_OP_GET_ASSIGNMENT:
+                RD_IF_FREE(rko->rko_u.assign.partitions,
+                           rd_kafka_topic_partition_list_destroy);
+                break;
+
+        case RD_KAFKA_OP_REBALANCE:
+                RD_IF_FREE(rko->rko_u.rebalance.partitions,
+                           rd_kafka_topic_partition_list_destroy);
+                break;
+
+        case RD_KAFKA_OP_NAME:
+                RD_IF_FREE(rko->rko_u.name.str, rd_free);
+                break;
+
+        case RD_KAFKA_OP_CG_METADATA:
+                RD_IF_FREE(rko->rko_u.cg_metadata,
+                           rd_kafka_consumer_group_metadata_destroy);
+                break;
+
+        case RD_KAFKA_OP_ERR:
+        case RD_KAFKA_OP_CONSUMER_ERR:
+                RD_IF_FREE(rko->rko_u.err.errstr, rd_free);
+                rd_kafka_msg_destroy(NULL, &rko->rko_u.err.rkm);
+                break;
+
+                break;
+
+        case RD_KAFKA_OP_THROTTLE:
+                RD_IF_FREE(rko->rko_u.throttle.nodename, rd_free);
+                break;
+
+        case RD_KAFKA_OP_STATS:
+                RD_IF_FREE(rko->rko_u.stats.json, rd_free);
+                break;
+
+        case RD_KAFKA_OP_XMIT_RETRY:
+        case RD_KAFKA_OP_XMIT_BUF:
+        case RD_KAFKA_OP_RECV_BUF:
+                if (rko->rko_u.xbuf.rkbuf)
+                        rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY);
+
+                RD_IF_FREE(rko->rko_u.xbuf.rkbuf, rd_kafka_buf_destroy);
+                break;
+
+        case RD_KAFKA_OP_DR:
+                rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq);
+                if (rko->rko_u.dr.do_purge2)
+                        rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq2);
+
+                if (rko->rko_u.dr.rkt)
+                        rd_kafka_topic_destroy0(rko->rko_u.dr.rkt);
+                if (rko->rko_u.dr.presult)
+                        rd_kafka_Produce_result_destroy(rko->rko_u.dr.presult);
+                break;
+
+        case RD_KAFKA_OP_OFFSET_RESET:
+                RD_IF_FREE(rko->rko_u.offset_reset.reason, rd_free);
+                break;
+
+        case RD_KAFKA_OP_METADATA:
+                RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy);
+                /* It's not needed to free metadata.mdi because they
+                   are the in the same memory allocation. */
+                break;
+
+        case RD_KAFKA_OP_LOG:
+                rd_free(rko->rko_u.log.str);
+                break;
+
+        case RD_KAFKA_OP_ADMIN_FANOUT:
+                rd_assert(rko->rko_u.admin_request.fanout.outstanding == 0);
+                rd_list_destroy(&rko->rko_u.admin_request.fanout.results);
+        case RD_KAFKA_OP_CREATETOPICS:
+        case RD_KAFKA_OP_DELETETOPICS:
+        case RD_KAFKA_OP_CREATEPARTITIONS:
+        case RD_KAFKA_OP_ALTERCONFIGS:
+        case RD_KAFKA_OP_INCREMENTALALTERCONFIGS:
+        case RD_KAFKA_OP_DESCRIBECONFIGS:
+        case RD_KAFKA_OP_DELETERECORDS:
+        case RD_KAFKA_OP_LISTCONSUMERGROUPS:
+        case RD_KAFKA_OP_DESCRIBECONSUMERGROUPS:
+        case RD_KAFKA_OP_DELETEGROUPS:
+        case RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS:
+        case RD_KAFKA_OP_CREATEACLS:
+        case RD_KAFKA_OP_DESCRIBEACLS:
+        case RD_KAFKA_OP_DELETEACLS:
+        case RD_KAFKA_OP_ALTERCONSUMERGROUPOFFSETS:
+        case RD_KAFKA_OP_DESCRIBETOPICS:
+        case RD_KAFKA_OP_DESCRIBECLUSTER:
+        case RD_KAFKA_OP_LISTCONSUMERGROUPOFFSETS:
+        case RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS:
+        case RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS:
+        case RD_KAFKA_OP_LISTOFFSETS:
+        case RD_KAFKA_OP_ELECTLEADERS:
+                rd_kafka_replyq_destroy(&rko->rko_u.admin_request.replyq);
+                rd_list_destroy(&rko->rko_u.admin_request.args);
+                if (rko->rko_u.admin_request.options.match_consumer_group_states
+                        .u.PTR) {
+                        rd_list_destroy(rko->rko_u.admin_request.options
+                                            .match_consumer_group_states.u.PTR);
+                }
+                if (rko->rko_u.admin_request.options.match_consumer_group_types
+                        .u.PTR) {
+                        rd_list_destroy(rko->rko_u.admin_request.options
+                                            .match_consumer_group_types.u.PTR);
+                }
+                rd_assert(!rko->rko_u.admin_request.fanout_parent);
+                RD_IF_FREE(rko->rko_u.admin_request.coordkey, rd_free);
+                break;
+
+        case RD_KAFKA_OP_ADMIN_RESULT:
+                rd_list_destroy(&rko->rko_u.admin_result.args);
+                rd_list_destroy(&rko->rko_u.admin_result.results);
+                RD_IF_FREE(rko->rko_u.admin_result.errstr, rd_free);
+                rd_assert(!rko->rko_u.admin_result.fanout_parent);
+                ;
+                break;
+
+        case RD_KAFKA_OP_MOCK:
+                RD_IF_FREE(rko->rko_u.mock.name, rd_free);
+                RD_IF_FREE(rko->rko_u.mock.str, rd_free);
+                if (rko->rko_u.mock.metrics) {
+                        int64_t i;
+                        for (i = 0; i < rko->rko_u.mock.hi; i++)
+                                rd_free(rko->rko_u.mock.metrics[i]);
+                        rd_free(rko->rko_u.mock.metrics);
+                }
+                break;
+
+        case RD_KAFKA_OP_BROKER_MONITOR:
+                rd_kafka_broker_destroy(rko->rko_u.broker_monitor.rkb);
+                break;
+
+        case RD_KAFKA_OP_TXN:
+                RD_IF_FREE(rko->rko_u.txn.group_id, rd_free);
+                RD_IF_FREE(rko->rko_u.txn.offsets,
+                           rd_kafka_topic_partition_list_destroy);
+                RD_IF_FREE(rko->rko_u.txn.cgmetadata,
+                           rd_kafka_consumer_group_metadata_destroy);
+                break;
+
+        case RD_KAFKA_OP_LEADERS:
+                rd_assert(!rko->rko_u.leaders.eonce);
+                rd_assert(!rko->rko_u.leaders.replyq.q);
+                RD_IF_FREE(rko->rko_u.leaders.leaders, rd_list_destroy);
+                RD_IF_FREE(rko->rko_u.leaders.partitions,
+                           rd_kafka_topic_partition_list_destroy);
+                break;
+
+        case RD_KAFKA_OP_METADATA_UPDATE:
+                RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy);
+                /* It's not needed to free metadata.mdi because they
+                   are the in the same memory allocation. */
+                break;
+
+        case RD_KAFKA_OP_SET_TELEMETRY_BROKER:
+                RD_IF_FREE(rko->rko_u.telemetry_broker.rkb,
+                           rd_kafka_broker_destroy);
+                break;
+
+        default:
+                break;
+        }
+
+        if(rko->rko_rktp != NULL){
+                rd_kafka_toppar_t *rkoTp;
+                rkoTp = rko->rko_rktp;
+                // may not need check
+                if(unlikely(rkoTp->rktp_refcnt.val <= 0)){
+                        *add1 = 0;
+                }else{
+                        *add1 = (uint64_t)rkoTp;
+                }
+        }
+
+        RD_IF_FREE(rko->rko_error, rd_kafka_error_destroy);
+
+        rd_kafka_replyq_destroy(&rko->rko_replyq);
+
+#if ENABLE_DEVEL
+        if (rd_atomic32_sub(&rd_kafka_op_cnt, 1) < 0)
+                rd_kafka_assert(NULL, !*"rd_kafka_op_cnt < 0");
+#endif
+        rd_free(rko);
+
+}
 
 void rd_kafka_op_destroy(rd_kafka_op_t *rko) {
 
Index: src/rdkafka_op.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h
--- a/src/rdkafka_op.h	(revision 2c281b0c9d3b0457f20381d3fa195c412d43d9fe)
+++ b/src/rdkafka_op.h	(revision c5166872e7f4406cbe81a23ff1b40e2a8392059a)
@@ -709,6 +709,7 @@
 
 const char *rd_kafka_op2str(rd_kafka_op_type_t type);
 void rd_kafka_op_destroy(rd_kafka_op_t *rko);
+void rd_kafka_op_destroy_1(rd_kafka_op_t *rko, uint64_t *add1, uint64_t *add2, uint64_t *add3);
 rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type);
 #if ENABLE_DEVEL
 #define _STRINGIFYX(A) #A
Index: src/rdkafka_topic.c
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c
--- a/src/rdkafka_topic.c	(revision 2c281b0c9d3b0457f20381d3fa195c412d43d9fe)
+++ b/src/rdkafka_topic.c	(revision c5166872e7f4406cbe81a23ff1b40e2a8392059a)
@@ -1770,7 +1770,14 @@
         rd_list_destroy(&query_topics);
 }
 
-
+void rd_kafka_topic_destroy_sub_ref_batch(uint64_t add, uint32_t refNum) {
+        rd_kafka_topic_t *rkt =(rd_kafka_toppar_t *)add;
+        if(unlikely(rkt == NULL)) {
+                return;
+        }
+        if (unlikely(rd_refcnt_sub_n(&rkt->rkt_refcnt, refNum) == 0))
+                rd_kafka_topic_destroy_final(rkt);
+}
 /**
  * Locks: rd_kafka_topic_*lock() must be held.
  */
Index: src/rdkafka_topic.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rdkafka_topic.h b/src/rdkafka_topic.h
--- a/src/rdkafka_topic.h	(revision 2c281b0c9d3b0457f20381d3fa195c412d43d9fe)
+++ b/src/rdkafka_topic.h	(revision c5166872e7f4406cbe81a23ff1b40e2a8392059a)
@@ -209,8 +209,14 @@
 
 rd_kafka_topic_t *rd_kafka_topic_proper(rd_kafka_topic_t *app_rkt);
 
-
-
+static RD_INLINE RD_UNUSED void rd_kafka_topic_destroy1(rd_kafka_topic_t *rkt, uint64_t *add) {
+        rd_kafka_lwtopic_t *lrkt;
+        if (unlikely((lrkt = rd_kafka_rkt_get_lw(rkt)) != NULL))
+                rd_kafka_lwtopic_destroy(lrkt);
+        else{
+                *add = (uint64_t)rkt;
+        }
+}
 /**
  * @brief Loose reference to topic object as increased by ..topic_keep().
  */
Index: src/rdkafka_queue.c
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c
--- a/src/rdkafka_queue.c	(revision 826ad98bd5dcd81b93255030ddf58cab6e924665)
+++ b/src/rdkafka_queue.c	(revision 9de073d0523bb078eedb220d4c8e000e49ac5c4b)
@@ -686,9 +686,13 @@
         rd_kafka_app_poll_start(rk, 0, timeout_ms);
 
         rd_kafka_yield_thread = 0;
+        rd_kafka_op_t *batch[64];
+        int batch_cnt = 0;// 批量大小可调
         while (cnt < rkmessages_size) {
                 rd_kafka_op_res_t res;
 
+                batch_cnt = 0;
+
                 mtx_lock(&rkq->rkq_lock);
 
                 while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) &&
@@ -704,50 +708,57 @@
                         break; /* Timed out */
                 }
 
-                rd_kafka_q_deq0(rkq, rko);
+                while (batch_cnt < 64 && cnt + batch_cnt < rkmessages_size && (rko = TAILQ_FIRST(&rkq->rkq_q))) {
+                        rd_kafka_q_deq0(rkq, rko);
+                        batch[batch_cnt++] = rko;
+                }
 
                 mtx_unlock(&rkq->rkq_lock);
 
-                if (unlikely(rko->rko_type == RD_KAFKA_OP_BARRIER)) {
-                        cnt = (unsigned int)rd_kafka_purge_outdated_messages(
-                            rko->rko_rktp, rko->rko_version, rkmessages, cnt,
-                            &ctrl_msg_q);
-                        rd_kafka_op_destroy(rko);
-                        continue;
-                }
+                for (i = 0; i < batch_cnt; i++) {
+                        rko = batch[i];
+                        if (unlikely(rko->rko_type == RD_KAFKA_OP_BARRIER)) {
+                                cnt = (unsigned int)rd_kafka_purge_outdated_messages(
+                                    rko->rko_rktp, rko->rko_version, rkmessages, cnt,
+                                    &ctrl_msg_q);
+                                rd_kafka_op_destroy(rko);
+                                continue;
+                        }
 
-                if (rd_kafka_op_version_outdated(rko, 0)) {
-                        /* Outdated op, put on discard queue */
-                        TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
-                        continue;
-                }
+                        if (rd_kafka_op_version_outdated(rko, 0)) {
+                                /* Outdated op, put on discard queue */
+                                TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
+                                continue;
+                        }
 
-                /* Serve non-FETCH callbacks */
-                res =
-                    rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL);
-                if (res == RD_KAFKA_OP_RES_KEEP ||
-                    res == RD_KAFKA_OP_RES_HANDLED) {
-                        /* Callback served, rko is destroyed (if HANDLED). */
-                        continue;
-                } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
-                                    rd_kafka_yield_thread)) {
-                        /* Yield. */
-                        break;
-                }
-                rd_dassert(res == RD_KAFKA_OP_RES_PASS);
+                        /* Serve non-FETCH callbacks */
+                        res =
+                            rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL);
+                        if (res == RD_KAFKA_OP_RES_KEEP ||
+                            res == RD_KAFKA_OP_RES_HANDLED) {
+                                /* Callback served, rko is destroyed (if HANDLED). */
+                                continue;
+                        } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
+                                            rd_kafka_yield_thread)) {
+                                /* Yield. */
+                                break;
+                        }
+                        rd_dassert(res == RD_KAFKA_OP_RES_PASS);
 
-                /* If this is a control messages, don't return message to
-                 * application. Add it to a tmp queue from where we can store
-                 * the offset and destroy the op */
-                if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) {
-                        TAILQ_INSERT_TAIL(&ctrl_msg_q, rko, rko_link);
-                        continue;
-                }
+                        /* If this is a control messages, don't return message to
+                         * application. Add it to a tmp queue from where we can store
+                         * the offset and destroy the op */
+                        if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) {
+                                TAILQ_INSERT_TAIL(&ctrl_msg_q, rko, rko_link);
+                                continue;
+                        }
 
-                /* Get rkmessage from rko and append to array. */
-                rkmessages[cnt++] = rd_kafka_message_get(rko);
-        }
+                        /* Get rkmessage from rko and append to array. */
+                        rkmessages[cnt++] = rd_kafka_message_get(rko);
+                }
 
+        }
+
         for (i = cnt - 1; i >= 0; i--) {
                 rko = (rd_kafka_op_t *)rkmessages[i]->_private;
                 rd_kafka_toppar_t *rktp = rko->rko_rktp;