/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <memory>
#include <Poco/Logger.h>

#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/IColumn.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/castColumn.h>
#include <Processors/IProcessor.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>

#include "ExpandTransform.h"

namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}

namespace local_engine
{
ExpandTransform::ExpandTransform(const DB::Block & input_, const DB::Block & output_, const ExpandField & project_set_exprs_)
    : DB::IProcessor({input_}, {output_}), project_set_exprs(project_set_exprs_)
{
}

ExpandTransform::Status ExpandTransform::prepare()
{
    auto & output = outputs.front();
    auto & input = inputs.front();

    if (output.isFinished() || isCancelled())
    {
        input.close();
        return Status::Finished;
    }

    if (!output.canPush())
    {
        input.setNotNeeded();
        return Status::PortFull;
    }

    if (has_output)
    {
        output.push(std::move(output_chunk));
        has_output = false;
        return Status::PortFull;
    }

    if (!has_input)
    {
        if (input.isFinished())
        {
            output.finish();
            return Status::Finished;
        }

        input.setNeeded();

        if (!input.hasData())
            return Status::NeedData;

        input_chunk = input.pull(true);
        has_input = true;
        expand_expr_iterator = 0;
    }

    return Status::Ready;
}

void ExpandTransform::work()
{
    if (expand_expr_iterator >= project_set_exprs.getExpandRows())
        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "expand_expr_iterator >= project_set_exprs.getExpandRows()");

    const auto & input_header = getInputs().front().getHeader();
    const auto & input_columns = input_chunk.getColumns();
    const auto & types = project_set_exprs.getTypes();
    const auto & kinds = project_set_exprs.getKinds()[expand_expr_iterator];
    const auto & fields = project_set_exprs.getFields()[expand_expr_iterator];
    size_t rows = input_chunk.getNumRows();

    DB::Columns columns(types.size());
    for (size_t col_i = 0; col_i < types.size(); ++col_i)
    {
        const auto & type = types[col_i];
        const auto & kind = kinds[col_i];
        const auto & field = fields[col_i];

        if (kind == EXPAND_FIELD_KIND_SELECTION)
        {
            auto index = field.safeGet<Int32>();
            const auto & input_column = input_columns[index];

            DB::ColumnWithTypeAndName input_arg;
            input_arg.column = input_column;
            input_arg.type = input_header.getByPosition(index).type;
            /// input_column maybe non-Nullable
            columns[col_i] = DB::castColumn(input_arg, type);
        }
        else if (kind == EXPAND_FIELD_KIND_LITERAL)
        {
            /// Add const column with field value
            auto column = type->createColumnConst(rows, field)->convertToFullColumnIfConst();
            columns[col_i] = std::move(column);
        }
        else
            throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown ExpandFieldKind {}", magic_enum::enum_name(kind));
    }

    output_chunk = DB::Chunk(std::move(columns), rows);
    has_output = true;

    ++expand_expr_iterator;
    has_input = expand_expr_iterator < project_set_exprs.getExpandRows();
}
}