* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef VECTORBATCHBUFFER_H
#define VECTORBATCHBUFFER_H
#include "ObjectBuffer.h"
namespace omnistream {
class VectorBatchBuffer : public ObjectBuffer {
public:
VectorBatchBuffer(ObjectSegment *segment, std::shared_ptr<BufferRecycler> recycler);
explicit VectorBatchBuffer(ObjectSegment *segment) : objectSegment(segment), recycler(nullptr)
{
bufferType = 0;
event_type = -1;
readerIndex_ = -1;
isCompressed_ = false;
}
explicit VectorBatchBuffer(std::shared_ptr<ObjectSegment> segment)
: objectSegment(segment.get()), recycler(nullptr), ownedSegment_(std::move(segment))
{
bufferType = 0;
event_type = -1;
readerIndex_ = -1;
isCompressed_ = false;
}
explicit VectorBatchBuffer(int event_) : objectSegment(nullptr), recycler(nullptr), isCompressed_(false),
readerIndex_(-1)
{
bufferType = 1;
event_type = event_;
currentSize = 1;
}
~VectorBatchBuffer() override = default;
bool isBuffer() const override
{
return bufferType == 0;
}
void RecycleBuffer() override {
if (recycler == nullptr) {
return;
}
if (IsRecycled()) {
GErrorLog("Trying to recycle a VectorBatchBuffer that has already been recycled");
} else {
int prev = refCount_.fetch_sub(1);
if (prev == 1) {
recycler->recycle(this->GetObjectSegment());
isRecycled_.store(true);
}
}
}
bool IsRecycled() const override {
return isRecycled_.load();
}
Buffer* RetainBuffer() override
{
LOG_TRACE("retain ")
LOG_PART(
"RetainBuffer The buffer " << this << " refCount is incremented from " << refCount_.load() << " to "
<< (refCount_.load() + 1)
)
refCount_++;
return this;
}
Buffer* ReadOnlySlice() override
{
LOG_TRACE("ReadOnlySlice ")
return this;
}
Buffer* ReadOnlySlice(int index, int length) override;
int GetMaxCapacity() const override
{
return objectSegment->getSize();
}
int GetReaderIndex() const override
{
return readerIndex_;
}
void SetReaderIndex(int readerIndex) override
{
readerIndex_ = readerIndex;
}
int GetSize() const override
{
return currentSize;
}
void SetSize(int writerIndex) override
{
currentSize = writerIndex;
}
int ReadableObjects() const override
{
return 0;
}
bool IsCompressed() const override
{
return false;
}
void SetCompressed(bool isCompressed) override
{
isCompressed_ = isCompressed;
}
ObjectBufferDataType GetDataType() const override
{
if (bufferType == 0) {
return ObjectBufferDataType::DATA_BUFFER;
} else {
return ObjectBufferDataType::EVENT_BUFFER;
}
}
void SetDataType(ObjectBufferDataType dataType) override {};
int RefCount() const override
{
return refCount_.load();
}
std::string ToDebugString(bool includeHash) const override
{
std::stringstream ss;
ss << "buffertype =" << std::to_string(bufferType) << ", event_type " << std::to_string(event_type);
return ss.str();
};
ObjectSegment *GetObjectSegment() override;
std::shared_ptr<BufferRecycler> GetRecycler() override;
std::pair<uint8_t*, size_t> GetBytes() override
{
NOT_IMPL_EXCEPTION
};
[[nodiscard]] int EventType() const override
{
return event_type;
}
int GetBufferType() override
{
return bufferType;
}
private:
ObjectSegment *objectSegment;
std::shared_ptr<ObjectSegment> ownedSegment_;
std::shared_ptr<BufferRecycler> recycler;
int bufferType;
int event_type;
int currentSize = 0;
bool isCompressed_;
std::atomic<bool> isRecycled_ = false;
int readerIndex_;
std::atomic<int> refCount_;
};
}
#endif