/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <string>
#include <vector>
#include <Core/Block.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/ReadBufferFromFile.h>
#include <Parser/CHColumnToSparkRow.h>
#include <Parser/SparkRowToCHColumn.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/Impl/ArrowColumnToCHColumn.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
#include <QueryPipeline/QueryPipeline.h>
#include <base/types.h>
#include <benchmark/benchmark.h>
#include <parquet/arrow/reader.h>

using namespace DB;
using namespace local_engine;

struct NameType
{
    String name;
    String type;
};

using NameTypes = std::vector<NameType>;

static Block getLineitemHeader(const NameTypes & name_types)
{
    auto & factory = DataTypeFactory::instance();
    ColumnsWithTypeAndName columns(name_types.size());
    for (size_t i = 0; i < columns.size(); ++i)
    {
        columns[i].name = name_types[i].name;
        columns[i].type = factory.get(name_types[i].type);
    }
    return std::move(Block(columns));
}

static void readParquetFile(const Block & header, const String & file, Block & block)
{
    auto in = std::make_unique<ReadBufferFromFile>(file);
    FormatSettings format_settings;
    auto format = std::make_shared<ParquetBlockInputFormat>(*in, header, format_settings, 1, 1, 8192);
    auto pipeline = QueryPipeline(std::move(format));
    auto reader = std::make_unique<PullingPipelineExecutor>(pipeline);
    while (reader->pull(block))
        return;
}

static void BM_CHColumnToSparkRow_Lineitem(benchmark::State & state)
{
    const NameTypes name_types = {
        {"l_orderkey", "Nullable(Int64)"},
        {"l_partkey", "Nullable(Int64)"},
        {"l_suppkey", "Nullable(Int64)"},
        {"l_linenumber", "Nullable(Int64)"},
        {"l_quantity", "Nullable(Float64)"},
        {"l_extendedprice", "Nullable(Float64)"},
        {"l_discount", "Nullable(Float64)"},
        {"l_tax", "Nullable(Float64)"},
        {"l_returnflag", "Nullable(String)"},
        {"l_linestatus", "Nullable(String)"},
        {"l_shipdate", "Nullable(Date32)"},
        {"l_commitdate", "Nullable(Date32)"},
        {"l_receiptdate", "Nullable(Date32)"},
        {"l_shipinstruct", "Nullable(String)"},
        {"l_shipmode", "Nullable(String)"},
        {"l_comment", "Nullable(String)"},
    };

    const Block header = std::move(getLineitemHeader(name_types));
    const String file = "/data1/liyang/cppproject/gluten/gluten-core/src/test/resources/tpch-data/lineitem/"
                        "part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet";
    Block block;
    readParquetFile(header, file, block);
    // std::cerr << "read_rows:" << block.rows() << std::endl;
    CHColumnToSparkRow converter;
    for (auto _ : state)
    {
        auto spark_row_info = converter.convertCHColumnToSparkRow(block);
        converter.freeMem(spark_row_info->getBufferAddress(), spark_row_info->getTotalBytes());
    }
}


static void BM_SparkRowToCHColumn_Lineitem(benchmark::State & state)
{
    const NameTypes name_types = {
        {"l_orderkey", "Nullable(Int64)"},
        {"l_partkey", "Nullable(Int64)"},
        {"l_suppkey", "Nullable(Int64)"},
        {"l_linenumber", "Nullable(Int64)"},
        {"l_quantity", "Nullable(Float64)"},
        {"l_extendedprice", "Nullable(Float64)"},
        {"l_discount", "Nullable(Float64)"},
        {"l_tax", "Nullable(Float64)"},
        {"l_returnflag", "Nullable(String)"},
        {"l_linestatus", "Nullable(String)"},
        {"l_shipdate", "Nullable(Date32)"},
        {"l_commitdate", "Nullable(Date32)"},
        {"l_receiptdate", "Nullable(Date32)"},
        {"l_shipinstruct", "Nullable(String)"},
        {"l_shipmode", "Nullable(String)"},
        {"l_comment", "Nullable(String)"},
    };

    const Block header = std::move(getLineitemHeader(name_types));
    const String file = "/data1/liyang/cppproject/gluten/gluten-core/src/test/resources/tpch-data/lineitem/"
                        "part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet";
    Block in_block;
    readParquetFile(header, file, in_block);

    CHColumnToSparkRow spark_row_converter;
    auto spark_row_info = spark_row_converter.convertCHColumnToSparkRow(in_block);
    for (auto _ : state) [[maybe_unused]]
        auto out_block = SparkRowToCHColumn::convertSparkRowInfoToCHColumn(*spark_row_info, header);
}

BENCHMARK(BM_CHColumnToSparkRow_Lineitem)->Unit(benchmark::kMillisecond)->Iterations(10);
BENCHMARK(BM_SparkRowToCHColumn_Lineitem)->Unit(benchmark::kMillisecond)->Iterations(10);