* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "operators/serializer/VeloxColumnarBatchSerializer.h"
#include "shuffle/Payload.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/VeloxSortShuffleWriter.h"
#include "utils/Timer.h"
#include "velox/type/Type.h"
#include "velox/vector/ComplexVector.h"
#include <velox/serializers/PrestoSerializer.h>
namespace gluten {
class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator {
public:
VeloxHashShuffleReaderDeserializer(
std::shared_ptr<arrow::io::InputStream> in,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
int64_t bufferSize,
arrow::MemoryPool* memoryPool,
facebook::velox::memory::MemoryPool* veloxPool,
std::vector<bool>* isValidityBuffer,
bool hasComplexType,
int64_t& deserializeTime,
int64_t& decompressTime);
std::shared_ptr<ColumnarBatch> next() override;
private:
std::shared_ptr<arrow::io::InputStream> in_;
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::util::Codec> codec_;
facebook::velox::RowTypePtr rowType_;
int32_t batchSize_;
arrow::MemoryPool* memoryPool_;
facebook::velox::memory::MemoryPool* veloxPool_;
std::vector<bool>* isValidityBuffer_;
bool hasComplexType_;
int64_t& deserializeTime_;
int64_t& decompressTime_;
std::unique_ptr<InMemoryPayload> merged_{nullptr};
bool reachEos_{false};
};
class VeloxSortShuffleReaderDeserializer final : public ColumnarBatchIterator {
public:
using RowSizeType = VeloxSortShuffleWriter::RowSizeType;
VeloxSortShuffleReaderDeserializer(
std::shared_ptr<arrow::io::InputStream> in,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
int64_t bufferSize,
arrow::MemoryPool* memoryPool,
facebook::velox::memory::MemoryPool* veloxPool,
int64_t& deserializeTime,
int64_t& decompressTime);
std::shared_ptr<ColumnarBatch> next() override;
private:
std::shared_ptr<ColumnarBatch> deserializeToBatch();
void readLargeRow(std::vector<std::shared_ptr<arrow::Buffer>>& arrowBuffers);
std::shared_ptr<arrow::io::InputStream> in_;
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::util::Codec> codec_;
facebook::velox::RowTypePtr rowType_;
uint32_t batchSize_;
arrow::MemoryPool* arrowPool_;
facebook::velox::memory::MemoryPool* veloxPool_;
int64_t& deserializeTime_;
int64_t& decompressTime_;
std::list<std::pair<uint32_t, facebook::velox::BufferPtr>> cachedInputs_;
uint32_t cachedRows_{0};
bool reachedEos_{false};
uint32_t rowOffset_{0};
size_t byteOffset_{0};
};
class VeloxRssSortShuffleReaderDeserializer : public ColumnarBatchIterator {
public:
VeloxRssSortShuffleReaderDeserializer(
const std::shared_ptr<facebook::velox::memory::MemoryPool>& veloxPool,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
facebook::velox::common::CompressionKind veloxCompressionType,
int64_t& deserializeTime,
std::shared_ptr<arrow::io::InputStream> in);
std::shared_ptr<ColumnarBatch> next();
private:
class VeloxInputStream;
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
facebook::velox::RowTypePtr rowType_;
std::vector<facebook::velox::RowVectorPtr> batches_;
bool reachEos_{false};
int32_t rowCount_;
int32_t batchSize_;
facebook::velox::common::CompressionKind veloxCompressionType_;
facebook::velox::VectorSerde* const serde_;
facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions_;
int64_t& deserializeTime_;
std::shared_ptr<VeloxInputStream> in_;
};
class VeloxColumnarBatchDeserializerFactory : public DeserializerFactory {
public:
VeloxColumnarBatchDeserializerFactory(
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
const facebook::velox::common::CompressionKind veloxCompressionType,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
int64_t bufferSize,
arrow::MemoryPool* memoryPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
ShuffleWriterType shuffleWriterType);
std::unique_ptr<ColumnarBatchIterator> createDeserializer(std::shared_ptr<arrow::io::InputStream> in) override;
arrow::MemoryPool* getPool() override;
int64_t getDecompressTime() override;
int64_t getDeserializeTime() override;
ShuffleWriterType getShuffleWriterType() override;
private:
void initFromSchema();
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::util::Codec> codec_;
facebook::velox::common::CompressionKind veloxCompressionType_;
facebook::velox::RowTypePtr rowType_;
int32_t batchSize_;
int64_t bufferSize_;
arrow::MemoryPool* memoryPool_;
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
std::vector<bool> isValidityBuffer_;
bool hasComplexType_{false};
ShuffleWriterType shuffleWriterType_;
int64_t deserializeTime_{0};
int64_t decompressTime_{0};
};
class VeloxShuffleReader final : public ShuffleReader {
public:
VeloxShuffleReader(std::unique_ptr<DeserializerFactory> factory);
};
}