* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef PIPELINED_RESULT_PARTITION_H
#define PIPELINED_RESULT_PARTITION_H
#include <memory>
#include <vector>
#include <mutex>
#include <future>
#include <buffer/ObjectBufferPool.h>
#include <executiongraph/descriptor/ResultPartitionIDPOD.h>
#include "BufferWritingResultPartition.h"
#include "ResultPartition.h"
#include "ResultSubpartition.h"
#include "ResultPartitionManager.h"
#include "CheckpointedResultPartition.h"
#include "ChannelStateHolder.h"
namespace omnistream {
class PipelinedResultPartition : public BufferWritingResultPartition,
public CheckpointedResultPartition,
public ChannelStateHolder {
public:
PipelinedResultPartition(
const std::string& owningTaskName,
int partitionIndex,
const ResultPartitionIDPOD& partitionId,
int partitionType,
std::vector<std::shared_ptr<ResultSubpartition>> subpartitions,
int numTargetKeyGroups,
std::shared_ptr<ResultPartitionManager> partitionManager,
std::shared_ptr<Supplier<BufferPool>> bufferPoolFactory);
PipelinedResultPartition(
const std::string& owningTaskName,
int partitionIndex,
const ResultPartitionIDPOD& partitionId,
int partitionType,
int numSubpartitions,
int numTargetKeyGroups,
std::shared_ptr<ResultPartitionManager> partitionManager,
std::shared_ptr<Supplier<BufferPool>> bufferPoolFactory,
int taskType);
void flushAll() override;
void flush(int targetSubpartition) override;
void NotifyEndOfData(StopMode mode) override;
std::shared_ptr<CompletableFuture> getAllDataProcessedFuture() override;
void onSubpartitionAllDataProcessed(int subpartition) override;
void close() override;
std::string toString() override;
void OnConsumedSubpartition(int subpartitionIndex) override;
void setChannelStateWriter(std::shared_ptr<ChannelStateWriter> channelStateWriter) override;
std::shared_ptr<CheckpointedResultSubpartition> getCheckpointedSubpartition(int subpartitionIndex) override;
void finishReadRecoveredState(bool notifyAndBlockOnCompletion) override;
private:
void decrementNumberOfUsers(int subpartitionIndex);
static int checkResultPartitionType(int type);
static const int PIPELINED_RESULT_PARTITION_ITSELF = -42;
std::recursive_mutex lock;
std::vector<bool> allRecordsProcessedSubpartitions;
int numNotAllRecordsProcessedSubpartitions;
bool hasNotifiedEndOfUserRecords = false;
std::shared_ptr<CompletableFuture> allRecordsProcessedFuture;
std::vector<bool> consumedSubpartitions;
int numberOfUsers;
};
}
#endif