* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <Core/Field.h>
#include <Processors/Chunk.h>
#include <Storages/Parquet/ColumnIndexFilter.h>
#include <Storages/Parquet/VectorizedParquetRecordReader.h>
#include <Storages/SubstraitSource/FormatFile.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <base/types.h>
#include <Parser/InputFileNameParser.h>
namespace local_engine
{
class FileReaderWrapper
{
public:
explicit FileReaderWrapper(const FormatFilePtr & file_) : file(file_) { }
virtual ~FileReaderWrapper() = default;
virtual bool pull(DB::Chunk & chunk) = 0;
void cancel()
{
bool already_cancelled = is_cancelled.exchange(true, std::memory_order_acq_rel);
if (!already_cancelled)
onCancel();
}
bool isCancelled() const { return is_cancelled.load(std::memory_order_acquire); }
virtual void applyKeyCondition(
const std::shared_ptr<const DB::KeyCondition> & , const ColumnIndexFilterPtr & )
{
}
protected:
virtual void onCancel() {};
FormatFilePtr file;
std::atomic<bool> is_cancelled{false};
static DB::ColumnPtr createConstColumn(DB::DataTypePtr type, const DB::Field & field, size_t rows);
static DB::ColumnPtr createColumn(const String & value, DB::DataTypePtr type, size_t rows);
static DB::Field buildFieldFromString(const String & value, DB::DataTypePtr type);
};
class NormalFileReader : public FileReaderWrapper
{
public:
NormalFileReader(
const FormatFilePtr & file_, const DB::ContextPtr & context_, const DB::Block & to_read_header_, const DB::Block & output_header_);
~NormalFileReader() override = default;
bool pull(DB::Chunk & chunk) override;
void applyKeyCondition(
const std::shared_ptr<const DB::KeyCondition> & key_condition, const ColumnIndexFilterPtr & column_index_filter) override
{
if (auto * const vectorized = dynamic_cast<VectorizedParquetBlockInputFormat *>(input_format->input.get()))
vectorized->setColumnIndexFilter(column_index_filter);
else
input_format->input->setKeyCondition(key_condition);
}
private:
void onCancel() override
{
input_format->input->cancel();
}
DB::ContextPtr context;
DB::Block to_read_header;
DB::Block output_header;
FormatFile::InputFormatPtr input_format;
};
class EmptyFileReader : public FileReaderWrapper
{
public:
explicit EmptyFileReader(FormatFilePtr file_) : FileReaderWrapper(file_) { }
~EmptyFileReader() override = default;
bool pull(DB::Chunk &) override { return false; }
};
class ConstColumnsFileReader : public FileReaderWrapper
{
public:
ConstColumnsFileReader(
FormatFilePtr file_, DB::ContextPtr context_, const DB::Block & header_, size_t block_size_ = DB::DEFAULT_BLOCK_SIZE);
~ConstColumnsFileReader() override = default;
bool pull(DB::Chunk & chunk) override;
private:
DB::ContextPtr context;
DB::Block header;
size_t remained_rows;
size_t block_size;
};
class SubstraitFileSource : public DB::SourceWithKeyCondition
{
public:
SubstraitFileSource(const DB::ContextPtr & context_, const DB::Block & header_, const substrait::ReadRel::LocalFiles & file_infos);
~SubstraitFileSource() override = default;
String getName() const override { return "SubstraitFileSource"; }
void setKeyCondition(const std::optional<DB::ActionsDAG> & filter_actions_dag, DB::ContextPtr context_) override;
protected:
DB::Chunk generate() override;
private:
bool tryPrepareReader();
void onCancel() noexcept override;
DB::ContextPtr context;
DB::Block output_header;
DB::Block to_read_header;
FormatFiles files;
bool input_file_name = false;
InputFileNameParser input_file_name_parser;
UInt32 current_file_index = 0;
std::unique_ptr<FileReaderWrapper> file_reader;
ReadBufferBuilderPtr read_buffer_builder;
ColumnIndexFilterPtr column_index_filter;
};
}