* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef NETWORKBUFFER_H
#define NETWORKBUFFER_H
#include <memory>
#include <stdexcept>
#include <cstring>
#include <cstdlib>
#include "Buffer.h"
#include "ObjectBuffer.h"
#include "core/memory/MemorySegment.h"
using namespace omnistream;
namespace omnistream::datastream {
class NetworkBuffer : public Buffer {
public:
NetworkBuffer(MemorySegment *memorySegment, std::shared_ptr<BufferRecycler> recycler, bool segmentOwner = false);
NetworkBuffer(MemorySegment *memorySegment, int bufferLength, int readIndex,
std::shared_ptr<BufferRecycler> recycler, int bufferType, bool segmentOwner = false);
NetworkBuffer(MemorySegment *memorySegment, int bufferLength, int readIndex,
std::shared_ptr<BufferRecycler> recycler, bool segmentOwner = false);
NetworkBuffer(MemorySegment *memorySegment, int bufferLength, int readIndex,
std::shared_ptr<BufferRecycler> recycler, ObjectBufferDataType dataType, bool segmentOwner = false);
explicit NetworkBuffer(MemorySegment *memorySegment, bool segmentOwner = false)
: NetworkBuffer(memorySegment, nullptr, segmentOwner) {}
explicit NetworkBuffer(int event_) : NetworkBuffer(nullptr)
{
bufferType = 1;
event_type = event_;
currentSize = 1;
dataType =ObjectBufferDataType::EVENT_BUFFER;
}
~NetworkBuffer() override {
if (segmentOwner) {
delete memorySegment;
}
}
bool isBuffer() const override
{
return dataType.isBuffer();
}
void RecycleBuffer() override
{
if (recycler == nullptr) {
return;
}
if (IsRecycled()) {
GErrorLog("Trying to recycle a NetworkBuffer that has already been recycled");
} else {
int prev = refCount_.fetch_sub(1);
if (prev == 1) {
recycler->recycle(this->getMemorySegment());
isRecycled_.store(true);
if (sleep_us > 0) {
std::this_thread::sleep_for(std::chrono::microseconds(sleep_us));
delete this;
}
}
}
}
bool IsRecycled() const override
{
return isRecycled_.load();
}
bool ShouldBeDeleted() override {
int expected = 0;
return refCount_.compare_exchange_strong(expected, -1);
}
Buffer* RetainBuffer() override
{
refCount_.fetch_add(1);
return this;
}
Buffer* ReadOnlySlice() override
{
LOG("EventBuffer::ReadOnlySlice");
return ReadOnlySlice(GetReaderIndex(), GetSize() - GetReaderIndex());
}
Buffer* ReadOnlySlice(int index, int length) override;
int GetMaxCapacity() const override
{
return memorySegment->getSize();
}
int GetReaderIndex() const override
{
return readerIndex_;
}
void SetReaderIndex(int readerIndex) override
{
readerIndex_ = readerIndex;
}
int GetSize() const override
{
return currentSize;
}
void SetSize(int writerIndex) override
{
currentSize = writerIndex;
}
int ReadableObjects() const override
{
return 0;
}
bool IsCompressed() const override
{
return false;
}
void SetCompressed(bool isCompressed) override
{
isCompressed_ = isCompressed;
}
ObjectBufferDataType GetDataType() const override
{
return dataType;
}
void SetDataType(ObjectBufferDataType dataType_) override
{
this->dataType = dataType_;
};
int RefCount() const override
{
return refCount_.load();
}
std::string ToDebugString(bool includeHash) const override
{
std::stringstream ss;
ss << "buffertype =" << std::to_string(bufferType) << ", event_type " << std::to_string(event_type);
return ss.str();
};
Segment *GetSegment() override
{
return getMemorySegment();
}
MemorySegment *getMemorySegment();
std::shared_ptr<BufferRecycler> GetRecycler() override;
static std::pair<uint8_t *, size_t> GetBytes()
{
NOT_IMPL_EXCEPTION
};
[[nodiscard]] int EventType() const override
{
return event_type;
}
int GetBufferType() override
{
return bufferType;
}
int GetOffset() const override
{
return 0;
};
virtual int GetMemorySegmentOffset() const
{
return 0;
}
void SetBufferType(int bufferType_)
{
bufferType = bufferType_;
if (bufferType_ == 1) {
dataType = ObjectBufferDataType::EVENT_BUFFER;
}
}
int32_t getSleepUs() const {
return sleep_us;
}
private:
MemorySegment *memorySegment;
std::shared_ptr<BufferRecycler> recycler;
int bufferType;
int event_type;
int currentSize;
bool isCompressed_ = false;
std::atomic<bool> isRecycled_ = false;
int readerIndex_;
std::atomic<int> refCount_ = 0;
ObjectBufferDataType dataType = ObjectBufferDataType::DATA_BUFFER;
bool segmentOwner = false;
int32_t sleep_us = 0;
};
enum class DataType {
NONE,
DATA_BUFFER,
EVENT_BUFFER,
PRIORITIZED_EVENT_BUFFER,
ALIGNED_CHECKPOINT_BARRIER,
TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER,
RECOVERY_COMPLETION
};
}
#endif