* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <memory>
#include <string>
#include <vector>
#include "velox/common/time/CpuWallTimer.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/type/Type.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/VectorStream.h"
#include <arrow/array/util.h>
#include <arrow/ipc/writer.h>
#include <arrow/memory_pool.h>
#include <arrow/record_batch.h>
#include <arrow/result.h>
#include <arrow/type.h>
#include "VeloxShuffleWriter.h"
#include "memory/VeloxMemoryManager.h"
#include "shuffle/PartitionWriter.h"
#include "shuffle/Partitioner.h"
#include "shuffle/Utils.h"
#include "utils/Print.h"
namespace gluten {
#define VELOX_SHUFFLE_WRITER_PRINT 0
#if VELOX_SHUFFLE_WRITER_PRINT
#define VsPrint Print
#define VsPrintLF PrintLF
#define VsPrintSplit PrintSplit
#define VsPrintSplitLF PrintSplitLF
#define VsPrintVectorRange PrintVectorRange
#define VS_PRINT PRINT
#define VS_PRINTLF PRINTLF
#define VS_PRINT_FUNCTION_NAME PRINT_FUNCTION_NAME
#define VS_PRINT_FUNCTION_SPLIT_LINE PRINT_FUNCTION_SPLIT_LINE
#define VS_PRINT_CONTAINER PRINT_CONTAINER
#define VS_PRINT_CONTAINER_TO_STRING PRINT_CONTAINER_TO_STRING
#define VS_PRINT_CONTAINER_2_STRING PRINT_CONTAINER_2_STRING
#define VS_PRINT_VECTOR_TO_STRING PRINT_VECTOR_TO_STRING
#define VS_PRINT_VECTOR_2_STRING PRINT_VECTOR_2_STRING
#define VS_PRINT_VECTOR_MAPPING PRINT_VECTOR_MAPPING
#else
#define VsPrint(...)
#define VsPrintLF(...)
#define VsPrintSplit(...)
#define VsPrintSplitLF(...)
#define VsPrintVectorRange(...)
#define VS_PRINT(a)
#define VS_PRINTLF(a)
#define VS_PRINT_FUNCTION_NAME()
#define VS_PRINT_FUNCTION_SPLIT_LINE()
#define VS_PRINT_CONTAINER(c)
#define VS_PRINT_CONTAINER_TO_STRING(c)
#define VS_PRINT_CONTAINER_2_STRING(c)
#define VS_PRINT_VECTOR_TO_STRING(v)
#define VS_PRINT_VECTOR_2_STRING(v)
#define VS_PRINT_VECTOR_MAPPING(v)
#endif
enum SplitState { kInit, kPreAlloc, kSplit, kStopEvict, kStop };
struct BinaryArrayResizeState {
bool inResize;
uint32_t partitionId;
uint32_t binaryIdx;
BinaryArrayResizeState() : inResize(false) {}
BinaryArrayResizeState(uint32_t partitionId, uint32_t binaryIdx)
: inResize(false), partitionId(partitionId), binaryIdx(binaryIdx) {}
};
class VeloxHashShuffleWriter : public VeloxShuffleWriter {
enum {
kValidityBufferIndex = 0,
kFixedWidthValueBufferIndex = 1,
kBinaryValueBufferIndex = 2,
kBinaryLengthBufferIndex = kFixedWidthValueBufferIndex
};
public:
struct BinaryBuf {
BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacityIn, uint64_t valueOffsetIn)
: valuePtr(value), lengthPtr(length), valueCapacity(valueCapacityIn), valueOffset(valueOffsetIn) {}
BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacity) : BinaryBuf(value, length, valueCapacity, 0) {}
BinaryBuf() : BinaryBuf(nullptr, nullptr, 0) {}
uint8_t* valuePtr;
uint8_t* lengthPtr;
uint64_t valueCapacity;
uint64_t valueOffset;
};
static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create(
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
arrow::MemoryPool* arrowPool);
arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) override;
arrow::Status stop() override;
arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;
const uint64_t cachedPayloadSize() const override;
arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers) override;
void setPartitionBufferSize(uint32_t newSize) override;
void printColumnsInfo() const {
VS_PRINT_FUNCTION_SPLIT_LINE();
VS_PRINTLF(fixed_width_column_count_);
VS_PRINT_CONTAINER(simple_column_indices_);
VS_PRINT_CONTAINER(binary_column_indices_);
VS_PRINT_CONTAINER(complex_column_indices_);
VS_PRINT_VECTOR_2_STRING(velox_column_types_);
VS_PRINT_VECTOR_TO_STRING(arrow_column_types_);
}
void printPartition() const {
VS_PRINT_FUNCTION_SPLIT_LINE();
VS_PRINT_VECTOR_MAPPING(row_2_partition_);
VS_PRINT_VECTOR_MAPPING(partition_2_row_count_);
}
void printPartitionBuffer() const {
VS_PRINT_FUNCTION_SPLIT_LINE();
VS_PRINT_VECTOR_MAPPING(partition_2_buffer_size_);
VS_PRINT_VECTOR_MAPPING(partitionBufferBase_);
}
void printPartition2Row() const {
VS_PRINT_FUNCTION_SPLIT_LINE();
VS_PRINT_VECTOR_MAPPING(partition2RowOffsetBase_);
#if VELOX_SHUFFLE_WRITER_PRINT
for (auto pid = 0; pid < numPartitions_; ++pid) {
auto begin = partition2RowOffsetBase_[pid];
auto end = partition2RowOffsetBase_[pid + 1];
VsPrint("partition", pid);
VsPrintVectorRange(rowOffset2RowId_, begin, end);
}
#endif
}
void printInputHasNull() const {
VS_PRINT_FUNCTION_SPLIT_LINE();
VS_PRINT_CONTAINER(input_has_null_);
}
private:
VeloxHashShuffleWriter(
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
arrow::MemoryPool* pool)
: VeloxShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), std::move(veloxPool), pool) {}
arrow::Status init();
arrow::Status initPartitions();
arrow::Status initColumnTypes(const facebook::velox::RowVector& rv);
arrow::Status splitRowVector(const facebook::velox::RowVector& rv);
arrow::Status initFromRowVector(const facebook::velox::RowVector& rv);
arrow::Status buildPartition2Row(uint32_t rowNum);
arrow::Status updateInputHasNull(const facebook::velox::RowVector& rv);
void setSplitState(SplitState state);
arrow::Status doSplit(const facebook::velox::RowVector& rv, int64_t memLimit);
bool beyondThreshold(uint32_t partitionId, uint32_t newSize);
uint32_t calculatePartitionBufferSize(const facebook::velox::RowVector& rv, int64_t memLimit);
arrow::Status preAllocPartitionBuffers(uint32_t preAllocBufferSize);
arrow::Status updateValidityBuffers(uint32_t partitionId, uint32_t newSize);
arrow::Result<std::shared_ptr<arrow::ResizableBuffer>>
allocateValidityBuffer(uint32_t col, uint32_t partitionId, uint32_t newSize);
arrow::Status allocatePartitionBuffer(uint32_t partitionId, uint32_t newSize);
arrow::Status splitFixedWidthValueBuffer(const facebook::velox::RowVector& rv);
arrow::Status splitBoolType(const uint8_t* srcAddr, const std::vector<uint8_t*>& dstAddrs);
arrow::Status splitValidityBuffer(const facebook::velox::RowVector& rv);
arrow::Status splitBinaryArray(const facebook::velox::RowVector& rv);
arrow::Status splitComplexType(const facebook::velox::RowVector& rv);
arrow::Status evictBuffers(
uint32_t partitionId,
uint32_t numRows,
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
bool reuseBuffers);
arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> assembleBuffers(uint32_t partitionId, bool reuseBuffers);
template <typename T>
arrow::Status splitFixedType(const uint8_t* srcAddr, const std::vector<uint8_t*>& dstAddrs) {
for (auto& pid : partitionUsed_) {
auto dstPidBase = (T*)(dstAddrs[pid] + partitionBufferBase_[pid] * sizeof(T));
auto pos = partition2RowOffsetBase_[pid];
auto end = partition2RowOffsetBase_[pid + 1];
for (; pos < end; ++pos) {
auto rowId = rowOffset2RowId_[pos];
*dstPidBase++ = reinterpret_cast<const T*>(srcAddr)[rowId];
}
}
return arrow::Status::OK();
}
arrow::Status splitBinaryType(
uint32_t binaryIdx,
const facebook::velox::FlatVector<facebook::velox::StringView>& src,
std::vector<BinaryBuf>& dst);
arrow::Result<int64_t> evictCachedPayload(int64_t size);
arrow::Result<std::shared_ptr<arrow::Buffer>> generateComplexTypeBuffers(facebook::velox::RowVectorPtr vector);
arrow::Status resetValidityBuffer(uint32_t partitionId);
arrow::Result<int64_t> shrinkPartitionBuffersMinSize(int64_t size);
arrow::Result<int64_t> evictPartitionBuffersMinSize(int64_t size);
arrow::Status shrinkPartitionBuffer(uint32_t partitionId);
arrow::Status resetPartitionBuffer(uint32_t partitionId);
arrow::Status resizePartitionBuffer(uint32_t partitionId, uint32_t newSize, bool preserveData);
uint64_t valueBufferSizeForBinaryArray(uint32_t binaryIdx, uint32_t newSize);
uint64_t valueBufferSizeForFixedWidthArray(uint32_t fixedWidthIndex, uint32_t newSize);
void calculateSimpleColumnBytes();
void stat() const;
bool shrinkPartitionBuffersAfterSpill() const;
bool evictPartitionBuffersAfterSpill() const;
arrow::Result<uint32_t> partitionBufferSizeAfterShrink(uint32_t partitionId) const;
bool isExtremelyLargeBatch(facebook::velox::RowVectorPtr& rv) const;
arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit);
class PartitionBufferGuard {
public:
PartitionBufferGuard(std::optional<uint32_t>& partitionInUse, uint32_t partitionId)
: partitionBufferInUse_(partitionInUse) {
partitionBufferInUse_ = partitionId;
}
~PartitionBufferGuard() {
partitionBufferInUse_ = std::nullopt;
}
private:
std::optional<uint32_t>& partitionBufferInUse_;
};
std::shared_ptr<arrow::Schema> schema_;
std::vector<std::vector<std::vector<std::shared_ptr<arrow::ResizableBuffer>>>> partitionBuffers_;
BinaryArrayResizeState binaryArrayResizeState_{};
bool hasComplexType_ = false;
std::vector<bool> isValidityBuffer_;
std::vector<std::shared_ptr<arrow::DataType>> arrowColumnTypes_;
std::vector<std::shared_ptr<const facebook::velox::Type>> veloxColumnTypes_;
uint32_t fixedWidthColumnCount_ = 0;
std::vector<uint32_t> binaryColumnIndices_;
std::vector<uint32_t> simpleColumnIndices_;
std::vector<uint32_t> complexColumnIndices_;
uint32_t fixedWidthBufferBytes_ = 0;
uint64_t totalInputNumRows_ = 0;
std::vector<uint64_t> binaryArrayTotalSizeBytes_;
size_t complexTotalSizeBytes_ = 0;
std::vector<bool> inputHasNull_;
std::vector<uint32_t> partitionUsed_;
std::vector<uint32_t> row2Partition_;
std::vector<uint32_t> partition2RowCount_;
std::vector<uint32_t> partition2RowOffsetBase_;
std::vector<uint32_t> rowOffset2RowId_;
std::vector<uint32_t> partitionBufferSize_;
std::vector<uint32_t> partitionBufferBase_;
std::vector<std::vector<uint8_t*>> partitionValidityAddrs_;
std::vector<std::vector<uint8_t*>> partitionFixedWidthValueAddrs_;
std::vector<std::vector<BinaryBuf>> partitionBinaryAddrs_;
std::vector<std::unique_ptr<facebook::velox::IterativeVectorSerializer>> complexTypeData_;
std::vector<std::shared_ptr<arrow::ResizableBuffer>> complexTypeFlushBuffer_;
std::shared_ptr<const facebook::velox::RowType> complexWriteType_;
facebook::velox::serializer::presto::PrestoVectorSerde serde_;
SplitState splitState_{kInit};
std::optional<uint32_t> partitionBufferInUse_{std::nullopt};
};
}