* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "InputFileNameParser.h"
#include <ranges>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace local_engine
{
static DB::ITransformingStep::Traits getTraits()
{
return DB::ITransformingStep::Traits{
{
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}};
}
static DB::Block createOutputHeader(
const DB::Block & header,
const std::optional<String> & file_name,
const std::optional<Int64> & block_start,
const std::optional<Int64> & block_length)
{
DB::Block output_header{header};
if (file_name.has_value())
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeString>(), InputFileNameParser::INPUT_FILE_NAME});
if (block_start.has_value())
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeInt64>(), InputFileNameParser::INPUT_FILE_BLOCK_START});
if (block_length.has_value())
{
output_header.insert(
DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeInt64>(), InputFileNameParser::INPUT_FILE_BLOCK_LENGTH});
}
return output_header;
}
class InputFileExprProjectTransform : public DB::ISimpleTransform
{
public:
InputFileExprProjectTransform(
const DB::Block & input_header_,
const DB::Block & output_header_,
const std::optional<String> & file_name,
const std::optional<Int64> & block_start,
const std::optional<Int64> & block_length)
: ISimpleTransform(input_header_, output_header_, true), file_name(file_name), block_start(block_start), block_length(block_length)
{
}
String getName() const override { return "InputFileExprProjectTransform"; }
void transform(DB::Chunk & chunk) override
{
InputFileNameParser::addInputFileColumnsToChunk(output.getHeader(), chunk, file_name, block_start, block_length);
}
private:
std::optional<String> file_name;
std::optional<Int64> block_start;
std::optional<Int64> block_length;
};
class InputFileExprProjectStep : public DB::ITransformingStep
{
public:
InputFileExprProjectStep(
const DB::Block & input_header,
const std::optional<String> & file_name,
const std::optional<Int64> & block_start,
const std::optional<Int64> & block_length)
: ITransformingStep(input_header, createOutputHeader(input_header, file_name, block_start, block_length), getTraits(), true)
, file_name(file_name)
, block_start(block_start)
, block_length(block_length)
{
}
String getName() const override { return "InputFileExprProjectStep"; }
void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings & ) override
{
pipeline.addSimpleTransform(
[&](const DB::Block & header)
{ return std::make_shared<InputFileExprProjectTransform>(header, *output_header, file_name, block_start, block_length); });
}
protected:
void updateOutputHeader() override
{
}
private:
std::optional<String> file_name;
std::optional<Int64> block_start;
std::optional<Int64> block_length;
};
bool InputFileNameParser::hasInputFileNameColumn(const DB::Block & block)
{
auto names = block.getNames();
return std::find(names.begin(), names.end(), INPUT_FILE_NAME) != names.end();
}
bool InputFileNameParser::hasInputFileBlockStartColumn(const DB::Block & block)
{
auto names = block.getNames();
return std::find(names.begin(), names.end(), INPUT_FILE_BLOCK_START) != names.end();
}
bool InputFileNameParser::hasInputFileBlockLengthColumn(const DB::Block & block)
{
auto names = block.getNames();
return std::find(names.begin(), names.end(), INPUT_FILE_BLOCK_LENGTH) != names.end();
}
void InputFileNameParser::addInputFileColumnsToChunk(
const DB::Block & header,
DB::Chunk & chunk,
const std::optional<String> & file_name,
const std::optional<Int64> & block_start,
const std::optional<Int64> & block_length)
{
auto output_columns = chunk.getColumns();
for (size_t i = 0; i < header.columns(); ++i)
{
const auto & column = header.getByPosition(i);
if (column.name == INPUT_FILE_NAME)
{
if (!file_name.has_value())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Input file name is not set");
auto type_string = std::make_shared<DB::DataTypeString>();
auto file_name_column = type_string->createColumnConst(chunk.getNumRows(), file_name.value());
output_columns.insert(output_columns.begin() + i, std::move(file_name_column));
}
else if (column.name == INPUT_FILE_BLOCK_START)
{
if (!block_start.has_value())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "block_start is not set");
auto type_int64 = std::make_shared<DB::DataTypeInt64>();
auto block_start_column = type_int64->createColumnConst(chunk.getNumRows(), block_start.value());
output_columns.insert(output_columns.begin() + i, std::move(block_start_column));
}
else if (column.name == INPUT_FILE_BLOCK_LENGTH)
{
if (!block_length.has_value())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "block_length is not set");
auto type_int64 = std::make_shared<DB::DataTypeInt64>();
auto block_length_column = type_int64->createColumnConst(chunk.getNumRows(), block_length.value());
output_columns.insert(output_columns.begin() + i, std::move(block_length_column));
}
}
chunk.setColumns(output_columns, chunk.getNumRows());
}
bool InputFileNameParser::containsInputFileColumns(const DB::Block & block)
{
return hasInputFileNameColumn(block) || hasInputFileBlockStartColumn(block) || hasInputFileBlockLengthColumn(block);
}
DB::Block InputFileNameParser::removeInputFileColumn(const DB::Block & block)
{
const auto & columns = block.getColumnsWithTypeAndName();
DB::ColumnsWithTypeAndName result_columns;
for (const auto & column : columns)
if (!INPUT_FILE_COLUMNS_SET.contains(column.name))
result_columns.push_back(column);
return result_columns;
}
std::optional<DB::IQueryPlanStep *> InputFileNameParser::addInputFileProjectStep(DB::QueryPlan & plan)
{
if (!file_name.has_value() && !block_start.has_value() && !block_length.has_value())
return std::nullopt;
auto step = std::make_unique<InputFileExprProjectStep>(plan.getCurrentHeader(), file_name, block_start, block_length);
step->setStepDescription("Input file expression project");
std::optional<DB::IQueryPlanStep *> result = step.get();
plan.addStep(std::move(step));
return result;
}
void InputFileNameParser::addInputFileColumnsToChunk(const DB::Block & header, DB::Chunk & chunk) const
{
addInputFileColumnsToChunk(header, chunk, file_name, block_start, block_length);
}
}