/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <Functions/FunctionFactory.h>
#include <Parser/ParserContext.h>
#include <Parser/RelParsers/MergeTreeRelParser.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/SparkMergeTreeMeta.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <google/protobuf/util/json_util.h>
#include <google/protobuf/wrappers.pb.h>
#include <gtest/gtest.h>
#include <substrait/plan.pb.h>
#include <Common/DebugUtils.h>
#include <Common/QueryContext.h>

using namespace DB;
using namespace local_engine;

TEST(TestBatchParquetFileSource, blob)
{
    GTEST_SKIP();
    Context::ConfigurationPtr config;
    config->setString("blob.storage_account_url", "http://127.0.0.1:10000/devstoreaccount1");
    config->setString("blob.container_name", "libch");
    config->setString("blob.container_already_exists", "true");
    config->setString(
        "blob.connection_string",
        "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey="
        "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/"
        "devstoreaccount1;");

    auto builder = std::make_unique<QueryPipelineBuilder>();
    substrait::ReadRel::LocalFiles files;
    substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items();
    std::string file_path = "wasb://libch/parquet/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet";
    file->set_uri_file(file_path);
    substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format;
    file->mutable_parquet()->CopyFrom(parquet_format);

    const auto * type_string = "columns format version: 1\n"
                               "15 columns:\n"
                               "`l_partkey` Int64\n"
                               "`l_suppkey` Int64\n"
                               "`l_linenumber` Int32\n"
                               "`l_quantity` Float64\n"
                               "`l_extendedprice` Float64\n"
                               "`l_discount` Float64\n"
                               "`l_tax` Float64\n"
                               "`l_returnflag` String\n"
                               "`l_linestatus` String\n"
                               "`l_shipdate` Date\n"
                               "`l_commitdate` Date\n"
                               "`l_receiptdate` Date\n"
                               "`l_shipinstruct` String\n"
                               "`l_shipmode` String\n"
                               "`l_comment` String\n";
    auto names_and_types_list = NamesAndTypesList::parse(type_string);
    ColumnsWithTypeAndName columns;
    for (const auto & item : names_and_types_list)
    {
        ColumnWithTypeAndName col;
        col.column = item.type->createColumn();
        col.type = item.type;
        col.name = item.name;
        columns.emplace_back(std::move(col));
    }
    auto header = Block(std::move(columns));
    builder->init(Pipe(std::make_shared<local_engine::SubstraitFileSource>(QueryContext::globalContext(), header, files)));

    auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
    auto executor = PullingPipelineExecutor(pipeline);
    auto result = header.cloneEmpty();
    size_t total_rows = 0;
    bool is_first = true;
    while (executor.pull(result))
    {
        if (is_first)
            debug::headBlock(result);
        total_rows += result.rows();
        is_first = false;
    }

    ASSERT_TRUE(total_rows > 0);
    std::cerr << "rows:" << total_rows << std::endl;
}

TEST(TestBatchParquetFileSource, s3)
{
    GTEST_SKIP();
    Context::ConfigurationPtr config;
    config->setString("s3.endpoint", "http://localhost:9000/tpch/");
    config->setString("s3.region", "us-east-1");
    config->setString("s3.access_key_id", "admin");
    config->setString("s3.secret_access_key", "password");

    auto builder = std::make_unique<QueryPipelineBuilder>();
    substrait::ReadRel::LocalFiles files;
    substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items();
    std::string file_path = "s3://tpch/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet";
    file->set_uri_file(file_path);
    substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format;
    file->mutable_parquet()->CopyFrom(parquet_format);

    const auto * type_string = "columns format version: 1\n"
                               "15 columns:\n"
                               "`l_partkey` Int64\n"
                               "`l_suppkey` Int64\n"
                               "`l_linenumber` Int32\n"
                               "`l_quantity` Float64\n"
                               "`l_extendedprice` Float64\n"
                               "`l_discount` Float64\n"
                               "`l_tax` Float64\n"
                               "`l_returnflag` String\n"
                               "`l_linestatus` String\n"
                               "`l_shipdate` Date\n"
                               "`l_commitdate` Date\n"
                               "`l_receiptdate` Date\n"
                               "`l_shipinstruct` String\n"
                               "`l_shipmode` String\n"
                               "`l_comment` String\n";
    auto names_and_types_list = NamesAndTypesList::parse(type_string);
    ColumnsWithTypeAndName columns;
    for (const auto & item : names_and_types_list)
    {
        ColumnWithTypeAndName col;
        col.column = item.type->createColumn();
        col.type = item.type;
        col.name = item.name;
        columns.emplace_back(std::move(col));
    }
    auto header = Block(std::move(columns));
    builder->init(Pipe(std::make_shared<SubstraitFileSource>(QueryContext::globalContext(), header, files)));

    auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
    auto executor = PullingPipelineExecutor(pipeline);
    auto result = header.cloneEmpty();
    size_t total_rows = 0;
    bool is_first = true;
    while (executor.pull(result))
    {
        if (is_first)
            debug::headBlock(result);
        total_rows += result.rows();
        is_first = false;
    }

    ASSERT_TRUE(total_rows > 0);
    std::cerr << "rows:" << total_rows << std::endl;
}

TEST(TestBatchParquetFileSource, local_file)
{
    GTEST_SKIP();
    auto builder = std::make_unique<QueryPipelineBuilder>();

    substrait::ReadRel::LocalFiles files;
    substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items();
    file->set_uri_file(
        "file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00000-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet");
    substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format;
    file->mutable_parquet()->CopyFrom(parquet_format);
    file = files.add_items();
    file->set_uri_file(
        "file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00001-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet");
    file->mutable_parquet()->CopyFrom(parquet_format);
    file = files.add_items();
    file->set_uri_file(
        "file:///home/admin1/Documents/data/tpch/parquet/lineitem/part-00002-f83d0a59-2bff-41bc-acde-911002bf1b33-c000.snappy.parquet");
    file->mutable_parquet()->CopyFrom(parquet_format);

    const auto * type_string = "columns format version: 1\n"
                               "2 columns:\n"
                               //                               "`l_partkey` Int64\n"
                               //                               "`l_suppkey` Int64\n"
                               //                               "`l_linenumber` Int32\n"
                               //                               "`l_quantity` Float64\n"
                               //                               "`l_extendedprice` Float64\n"
                               "`l_discount` Float64\n"
                               "`l_tax` Float64\n";
    //                               "`l_returnflag` String\n"
    //                               "`l_linestatus` String\n"
    //                               "`l_shipdate` Date\n"
    //                               "`l_commitdate` Date\n"
    //                               "`l_receiptdate` Date\n"
    //                               "`l_shipinstruct` String\n"
    //                               "`l_shipmode` String\n"
    //                               "`l_comment` String\n";
    auto names_and_types_list = NamesAndTypesList::parse(type_string);
    ColumnsWithTypeAndName columns;
    for (const auto & item : names_and_types_list)
    {
        ColumnWithTypeAndName col;
        col.column = item.type->createColumn();
        col.type = item.type;
        col.name = item.name;
        columns.emplace_back(std::move(col));
    }
    auto header = Block(std::move(columns));
    builder->init(Pipe(std::make_shared<SubstraitFileSource>(QueryContext::globalContext(), header, files)));

    auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
    auto executor = PullingPipelineExecutor(pipeline);
    auto result = header.cloneEmpty();
    size_t total_rows = 0;
    bool is_first = true;
    while (executor.pull(result))
    {
        if (is_first)
            debug::headBlock(result);
        total_rows += result.rows();
        is_first = false;
    }
    std::cerr << "rows:" << total_rows << std::endl;
    ASSERT_TRUE(total_rows == 59986052);
}

TEST(TestPrewhere, OptimizePrewhereCondition)
{
    String filter(R"({"scalarFunction":{"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"scalarFunction":{"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"scalarFunction":{"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"scalarFunction":{"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},   "arguments":[{"value":{"scalarFunction":{"functionReference":1,"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}},    {"value":{"literal":{"date":8766}}}]}}},{"value":{"scalarFunction":{"functionReference":2,"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},    "arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}},{"value":{"literal":{"date":9131}}}]}}}]}}},     {"value":{"scalarFunction":{"functionReference":3,"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":     {"directReference":{"structField":{}}}}},{"value":{"literal":{"decimal":{"value":"YAkAAAAAAAAAAAAAAAAAAA==","precision":15,"scale":2}}}}]}}}]}}},     {"value":{"scalarFunction":{"functionReference":4,"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":     {"directReference":{"structField":{"field":1}}}}},{"value":{"literal":{"decimal":{"value":"BQAAAAAAAAAAAAAAAAAAAA==","precision":15,"scale":2}}}}]}}}]}}},{"value":     {"scalarFunction":{"functionReference":5,"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":     {"field":1}}}}},{"value":{"literal":{"decimal":{"value":"BwAAAAAAAAAAAAAAAAAAAA==","precision":15,"scale":2}}}}]}}}]}})");

    auto expr_ptr = std::make_unique<substrait::Expression>();
    auto s = google::protobuf::util::JsonStringToMessage(absl::string_view(filter), expr_ptr.get());
    ASSERT_TRUE(s.ok());

    auto plan_ptr = std::make_unique<substrait::Plan>();
    String extensions_str("{\"extensions\":[{\"extensionFunction\":{\"functionAnchor\":7,\"name\":\"sum:req_i32\"}},{\"extensionFunction\":{\"functionAnchor\":4,\"name\":\"lt:opt_date_date\"}},{\"extensionFunction\":{\"functionAnchor\":6,\"name\":\"multiply:opt_i32_i32\"}},{\"extensionFunction\":{\"name\":\"and:opt_bool_bool\"}},{\"extensionFunction\":{\"functionAnchor\":1,\"name\":\"gte:opt_dec\u003c15,2\u003e_dec\u003c15,2\u003e\"}},{\"extensionFunction\":{\"functionAnchor\":2,\"name\":\"lte:opt_dec\u003c15,2\u003e_dec\u003c15,2\u003e\"}},{\"extensionFunction\":{\"functionAnchor\":5,\"name\":\"lt:opt_dec\u003c15,2\u003e_dec\u003c15,2\u003e\"}},{\"extensionFunction\":{\"functionAnchor\":3,\"name\":\"gte:opt_date_date\"}}],\"relations\":[{\"root\":{\"input\":{\"aggregate\":{\"common\":{\"direct\":{}},\"input\":{\"project\":{\"common\":{\"emit\":{\"outputMapping\":[2]}},\"input\":{\"project\":{\"common\":{\"emit\":{\"outputMapping\":[5,6]}},\"input\":{\"read\":{\"common\":{\"direct\":{}},\"baseSchema\":{\"names\":[\"l_orderkey\",\"l_partkey\",\"l_quantity\",\"l_discount\",\"l_shipdate\"],\"struct\":{\"types\":[{\"i32\":{\"nullability\":\"NULLABILITY_REQUIRED\"}},{\"i32\":{\"nullability\":\"NULLABILITY_REQUIRED\"}},{\"decimal\":{\"scale\":2,\"precision\":15,\"nullability\":\"NULLABILITY_REQUIRED\"}},{\"decimal\":{\"scale\":2,\"precision\":15,\"nullability\":\"NULLABILITY_REQUIRED\"}},{\"date\":{\"nullability\":\"NULLABILITY_REQUIRED\"}}]},\"columnTypes\":[\"NORMAL_COL\",\"NORMAL_COL\",\"NORMAL_COL\",\"NORMAL_COL\",\"NORMAL_COL\"]},\"filter\":{\"scalarFunction\":{\"outputType\":{\"bool\":{\"nullability\":\"NULLABILITY_REQUIRED\"}},\"arguments\":[{\"value\":{\"scalarFunction\":{\"outputType\":{\"bool\":{\"nullability\":\"NULLABILITY_REQUIRED\"}},\"arguments\":[{\"value\":{\"scalarFunction\":{\"outputType\":{\"bool\":{\"nullability\":\"NULLABILITY_REQUIRED\"}},\"arguments\":[{\"value\":{\"scalarFunction\":{\"outputType\":{\"bool\":{\"nullability\":\"NULLABILITY_REQUIRED\"}},\"arguments\":[{\"value\":{\"scalarFunction\":{\"functionReference\":1,\"outputType\":{\"bool\":{\"nullability\":\"NULLABILITY_REQUIRED\"}},\"arguments\":[{\"value\":{\"selection\":{\"directReference\":{\"structField\":{\"field\":3}}}}},{\"value\":{\"literal\":{\"decimal\":{\"value\":\"BQAAAAAAAAAAAAAAAAAAAA==\",\"precision\":15,\"scale\":2}}}}]}}},{\"value\":{\"scalarFunction\":{\"functionReference\":2,\"outputType\":{\"bool\":{\"nullability\":\"NULLABILITY_REQUIRED\"}},\"arguments\":[{\"value\":{\"selection\":{\"directReference\":{\"structField\":{\"field\":3}}}}},{\"value\":{\"literal\":{\"decimal\":{\"value\":\"BwAAAAAAAAAAAAAAAAAAAA==\",\"precision\":15,\"scale\":2}}}}]}}}]}}},{\"value\":{\"scalarFunction\":{\"functionReference\":3,\"outputType\":{\"bool\":{\"nullability\":\"NULLABILITY_REQUIRED\"}},\"arguments\":[{\"value\":{\"selection\":{\"directReference\":{\"structField\":{\"field\":4}}}}},{\"value\":{\"literal\":{\"date\":8766}}}]}}}]}}},{\"value\":{\"scalarFunction\":{\"functionReference\":4,\"outputType\":{\"bool\":{\"nullability\":\"NULLABILITY_REQUIRED\"}},\"arguments\":[{\"value\":{\"selection\":{\"directReference\":{\"structField\":{\"field\":4}}}}},{\"value\":{\"literal\":{\"date\":9131}}}]}}}]}}},{\"value\":{\"scalarFunction\":{\"functionReference\":5,\"outputType\":{\"bool\":{\"nullability\":\"NULLABILITY_REQUIRED\"}},\"arguments\":[{\"value\":{\"selection\":{\"directReference\":{\"structField\":{\"field\":2}}}}},{\"value\":{\"literal\":{\"decimal\":{\"value\":\"YAkAAAAAAAAAAAAAAAAAAA==\",\"precision\":15,\"scale\":2}}}}]}}}]}},\"extensionTable\":{\"detail\":{\"@type\":\"type.googleapis.com/google.protobuf.StringValue\",\"value\":\"MergeTree;default\nlineitem\nvar/lib/clickhouse/data/tpch100_bak2/lineitem\n1\n574\n\"}}}},\"expressions\":[{\"selection\":{\"directReference\":{\"structField\":{}}}},{\"selection\":{\"directReference\":{\"structField\":{\"field\":1}}}}]}},\"expressions\":[{\"scalarFunction\":{\"functionReference\":6,\"outputType\":{\"i32\":{\"nullability\":\"NULLABILITY_REQUIRED\"}},\"arguments\":[{\"value\":{\"selection\":{\"directReference\":{\"structField\":{}}}}},{\"value\":{\"selection\":{\"directReference\":{\"structField\":{\"field\":1}}}}}]}}]}},\"groupings\":[{}],\"measures\":[{\"measure\":{\"functionReference\":7,\"phase\":\"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE\",\"outputType\":{\"i64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}},\"arguments\":[{\"value\":{\"selection\":{\"directReference\":{\"structField\":{}}}}}]}}]}},\"names\":[\"sum#909\"],\"outputSchema\":{\"types\":[{\"i64\":{\"nullability\":\"NULLABILITY_NULLABLE\"}}],\"nullability\":\"NULLABILITY_REQUIRED\"}}}]}");
    auto ext = google::protobuf::util::JsonStringToMessage(absl::string_view(extensions_str), plan_ptr.get());
    ASSERT_TRUE(ext.ok());

    const auto * type_string = "columns format version: 1\n"
                               "5 columns:\n"
                               "`l_quantity` decimal(15,2)\n"
                               "`l_discount` decimal(15,2)\n"
                               "`l_shipdate` date32\n"
                               "`l_orderkey` Int64\n"
                               "`l_partkey` Int64\n";
    auto names_and_types_list = NamesAndTypesList::parse(type_string);
    ColumnsWithTypeAndName columns;
    for (const auto & item : names_and_types_list)
    {
        ColumnWithTypeAndName col;
        col.column = item.type->createColumn();
        col.type = item.type;
        col.name = item.name;
        columns.emplace_back(std::move(col));
    }
    Block block(std::move(columns));

    ContextPtr context = QueryContext::globalContext();
    ParserContextPtr parser_context = ParserContext::build(context, *plan_ptr);
    SerializedPlanParser * parser = new SerializedPlanParser(parser_context);

    MergeTreeRelParser mergeTreeParser(parser_context, QueryContext::globalContext());

    mergeTreeParser.column_sizes["l_discount"] = 0;
    mergeTreeParser.column_sizes["l_quantity"] = 1;
    mergeTreeParser.column_sizes["l_partkey"] = 2;
    mergeTreeParser.column_sizes["l_orderkey"] = 3;
    mergeTreeParser.column_sizes["l_shipdate"] = 4;

    MergeTreeRelParser::Conditions res;
    std::set<Int64> pk_positions;
    mergeTreeParser.analyzeExpressions(res, *expr_ptr, pk_positions, block);
    res.sort();
    ASSERT_TRUE(res.front().table_columns.contains("l_discount"));
    ASSERT_TRUE(res.back().table_columns.contains("l_shipdate"));
}