* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNILOCALCHANNELREADER_H
#define OMNILOCALCHANNELREADER_H
#include <memory>
#include <common.h>
#include <condition_variable>
#include <mutex>
#include <unordered_map>
#include "partition/BufferAvailabilityListener.h"
#include "partition/ResultPartitionManager.h"
#include "runtime/executiongraph/descriptor/ResultPartitionIDPOD.h"
#include <buffer/ReadOnlySlicedNetworkBuffer.h>
#include "buffer/ReadOnlySlicedVectorBatchBuffer.h"
#include "table/data/RowData.h"
namespace omnistream {
struct MemorySegmentInfo {
uint64_t memorySegmentAddress;
int32_t readIndex;
int32_t length;
int32_t currentDataType;
int32_t backlog;
int32_t nextDataType;
int32_t sequenceNumber;
};
class OmniLocalChannelReader : public BufferAvailabilityListener {
public:
OmniLocalChannelReader(ResultPartitionIDPOD partitionId,
int subPartitionIndex, long outputBufferStatus, std::string taskNameWithSubtas);
~OmniLocalChannelReader() override;
void requestSubpartitionView(
std::shared_ptr<ResultPartitionManager> resultPartitionManager,
ResultPartitionIDPOD partitionId, int subPartitionId);
void notifyDataAvailable();
bool checkIfDataAvailable();
uint8_t getNextBuffer();
void recycleMemorySegment(long memorySegmentAddress);
void Close();
void ReleaseAllResources();
void WrapBufferInfoIntoMemorySegmentInfo(::datastream::ReadOnlySlicedNetworkBuffer* nBuffer,
BufferAndBacklog* bufferAndLog);
void WrapBufferInfoIntoBinaryRowDataInfo(omnistream::VectorBatchBuffer* nBuffer,
BufferAndBacklog* bufferAndLog);
void ResumeConsumption();
private:
int calculateTotalRows(ObjectSegment *objectSegment, int offset, int vbNum);
void setRowDataToPtr(RowData* binaryRowData, uint8_t* dataResultContainer, unsigned int& position, int vectorBatchCol, VectorBatch* element, int index);
ResultPartitionIDPOD partitionId_;
int subPartitionIndex_;
std::shared_ptr<ResultSubpartitionView> subpartitionView;
std::recursive_mutex createViewMutex;
std::recursive_mutex fetchingDataMutex;
MemorySegmentInfo* memorySegmentInfo;
std::condition_variable_any dataAvailableCondition;
std::recursive_mutex dataAvailableMutex;
std::atomic<bool> dataAvailable = false;
std::unordered_map<uint64_t, Buffer*> pendingRecyclingBufferMap;
std::recursive_mutex recycleBufferMutex;
std::atomic<bool> isStopped = false;
std::string taskNameWithSubtask_;
};
}
#endif