/*
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#pragma once
#include <unordered_set>
#include <Core/Block.h>
#include <Core/Joins.h>
#include <Functions/CastOverloadResolver.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/Context.h>
#include <Processors/Chunk.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <base/types.h>
#include <substrait/algebra.pb.h>
#include <Common/CurrentThread.h>
#include <Common/GlutenConfig.h>

namespace DB
{
class QueryPipeline;
class QueryPlan;
}

namespace local_engine
{
static const String MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE = "mergetree.insert_without_local_storage";
static const String MERGETREE_MERGE_AFTER_INSERT = "mergetree.merge_after_insert";
static const std::string DECIMAL_OPERATIONS_ALLOW_PREC_LOSS = "spark.sql.decimalOperations.allowPrecisionLoss";
static const std::string TIMER_PARSER_POLICY = "spark.sql.legacy.timeParserPolicy";

static const std::unordered_set<String> BOOL_VALUE_SETTINGS{
    MERGETREE_MERGE_AFTER_INSERT, MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE, DECIMAL_OPERATIONS_ALLOW_PREC_LOSS};
static const std::unordered_set<String> LONG_VALUE_SETTINGS{
    "optimize.maxfilesize", "optimize.minFileSize", "mergetree.max_num_part_per_merge_task"};

class BlockUtil
{
public:
    static constexpr auto VIRTUAL_ROW_COUNT_COLUMN = "__VIRTUAL_ROW_COUNT_COLUMN__";
    static constexpr auto RIHGT_COLUMN_PREFIX = "broadcast_right_";

    // Build a header block with a virtual column which will be
    // use to indicate the number of rows in a block.
    // Commonly seen in the following quries:
    // - select count(1) from t
    // - select 1 from t
    static DB::Block buildRowCountHeader();
    static DB::Chunk buildRowCountChunk(UInt64 rows);
    static DB::Block buildRowCountBlock(UInt64 rows);

    static DB::Block buildHeader(const DB::NamesAndTypesList & names_types_list);

    static constexpr UInt64 FLAT_STRUCT = 1;
    static constexpr UInt64 FLAT_NESTED_TABLE = 2;
    /// If it's a struct without named fields, also force to flatten it.
    static constexpr UInt64 FLAT_STRUCT_FORCE = 4;

    // flatten the struct and array(struct) columns.
    // It's different from Nested::flattend()
    static DB::Block flattenBlock(
        const DB::Block & block,
        UInt64 flags = FLAT_STRUCT | FLAT_NESTED_TABLE,
        bool recursively = false,
        const std::unordered_set<size_t> & columns_to_skip_flatten = {});

    static DB::Block concatenateBlocksMemoryEfficiently(std::vector<DB::Block> && blocks);

    /// The column names may be different in two blocks.
    /// and the nullability also could be different, with TPCDS-Q1 as an example.
    static DB::ColumnWithTypeAndName
    convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, const DB::ColumnWithTypeAndName & sample_column);
};

class PODArrayUtil
{
public:
    /// To allocate n bytes, PODArray will allocate n + pad_left + pad_right bytes in fact. So when
    /// we want to allocate 2^k bytes, 2^(k+1) bytes are allocated. This makes the memory usage far
    /// more than we expected, and easy to cause OOM. For example, we want to limit the max block size to be
    /// 64k rows, CH will make the memory usage equal to 128k rows, and half of the reserved memory is not used.
    /// So we adjust the size by considering the padding bytes, the return value may be samller then n.
    static size_t adjustMemoryEfficientSize(size_t n);
};

/// Use this class to extract element columns from columns of nested type in a block, e.g. named Tuple.
/// It can extract a column from a multiple nested type column, e.g. named Tuple in named Tuple
/// Keeps some intermediate data to avoid rebuild them multi-times.
class NestedColumnExtractHelper
{
public:
    explicit NestedColumnExtractHelper(const DB::Block & block_, bool case_insentive_);
    std::optional<DB::ColumnWithTypeAndName> extractColumn(const String & column_name);

private:
    std::optional<DB::ColumnWithTypeAndName>
    extractColumn(const String & original_column_name, const String & column_name_prefix, const String & column_name_suffix);
    const DB::Block & block;
    bool case_insentive;
    std::map<String, DB::BlockPtr> nested_tables;

    const DB::ColumnWithTypeAndName * findColumn(const DB::Block & block, const std::string & name) const;
};

class ActionsDAGUtil
{
public:
    static const DB::ActionsDAG::Node * convertNodeType(
        DB::ActionsDAG & actions_dag,
        const DB::ActionsDAG::Node * node_to_cast,
        const DB::DataTypePtr & cast_to_type,
        const std::string & result_name = "",
        DB::CastType cast_type = DB::CastType::nonAccurate);

    static const DB::ActionsDAG::Node * convertNodeTypeIfNeeded(
        DB::ActionsDAG & actions_dag,
        const DB::ActionsDAG::Node * node,
        const DB::DataTypePtr & dst_type,
        const std::string & result_name = "",
        DB::CastType cast_type = DB::CastType::nonAccurate);
};

class QueryPipelineUtil
{
public:
    static String explainPipeline(DB::QueryPipeline & pipeline);
};

void registerAllFunctions();
void registerGlutenDisks();

class BackendFinalizerUtil;
class JNIUtils;
class BackendInitializerUtil
{
public:
    static DB::Field toField(const String & key, const String & value);

    /// Initialize two kinds of resources
    /// 1. global level resources like global_context/shared_context, notice that they can only be initialized once in process lifetime
    /// 2. session level resources like settings/configs, they can be initialized multiple times following the lifetime of executor/driver
    static void initBackend(const SparkConfigs::ConfigMap & spark_conf_map);
    static void initSettings(const SparkConfigs::ConfigMap & spark_conf_map, DB::Settings & settings);

    inline static const String CH_BACKEND_PREFIX = "spark.gluten.sql.columnar.backend.ch";

    inline static const String CH_RUNTIME_CONFIG = "runtime_config";
    inline static const String CH_RUNTIME_CONFIG_PREFIX = CH_BACKEND_PREFIX + "." + CH_RUNTIME_CONFIG + ".";
    inline static const String CH_RUNTIME_CONFIG_FILE = CH_RUNTIME_CONFIG_PREFIX + "config_file";

    inline static const String CH_RUNTIME_SETTINGS = "runtime_settings";
    inline static const String CH_RUNTIME_SETTINGS_PREFIX = CH_BACKEND_PREFIX + "." + CH_RUNTIME_SETTINGS + ".";

    inline static const String SETTINGS_PATH = "local_engine.settings";
    inline static const String LIBHDFS3_CONF_KEY = "hdfs.libhdfs3_conf";
    inline static const std::string HADOOP_S3_ACCESS_KEY = "fs.s3a.access.key";
    inline static const std::string HADOOP_S3_SECRET_KEY = "fs.s3a.secret.key";
    inline static const std::string HADOOP_S3_ENDPOINT = "fs.s3a.endpoint";
    inline static const std::string HADOOP_S3_ASSUMED_ROLE = "fs.s3a.assumed.role.arn";
    inline static const std::string HADOOP_S3_ASSUMED_SESSION_NAME = "fs.s3a.assumed.role.session.name";
    // not hadoop official
    inline static const std::string HADOOP_S3_ASSUMED_EXTERNAL_ID = "fs.s3a.assumed.role.externalId";
    // hadoop official, this is used to ignore the cached client
    inline static const std::string HADOOP_S3_CLIENT_CACHE_IGNORE = "fs.s3a.client.cached.ignore";
    inline static const std::string SPARK_HADOOP_PREFIX = "spark.hadoop.";
    inline static const std::string S3A_PREFIX = "fs.s3a.";
    inline static const std::string SPARK_DELTA_PREFIX = "spark.databricks.delta.";
    inline static const std::string SPARK_SESSION_TIME_ZONE = "spark.sql.session.timeZone";

    inline static const String GLUTEN_TASK_OFFHEAP = "spark.gluten.memory.task.offHeap.size.in.bytes";

    inline static const String GLUTEN_LOCAL_CACHE_PREFIX = "gluten_cache.local.";

    /// On yarn mode, native writing on hdfs cluster takes yarn container user as the user passed to libhdfs3, which
    /// will cause permission issue because yarn container user is not the owner of the hdfs dir to be written.
    /// So we need to get the spark user from env and pass it to libhdfs3.
    inline static std::optional<String> spark_user;

private:
    friend class BackendFinalizerUtil;
    friend class JNIUtils;

    static DB::Context::ConfigurationPtr initConfig(const SparkConfigs::ConfigMap & spark_conf_map);
    static String tryGetConfigFile(const SparkConfigs::ConfigMap & spark_conf_map);
    static void initLoggers(DB::Context::ConfigurationPtr config);
    static void initEnvs(DB::Context::ConfigurationPtr config);

    static void initContexts(DB::Context::ConfigurationPtr config);
    static void initCompiledExpressionCache(DB::Context::ConfigurationPtr config);
    static void registerAllFactories();
    static void applyGlobalConfigAndSettings(const DB::Context::ConfigurationPtr & config, const DB::Settings & settings);
    static std::vector<String>
    wrapDiskPathConfig(const String & path_prefix, const String & path_suffix, Poco::Util::AbstractConfiguration & config);


    inline static std::once_flag init_flag;
    inline static Poco::Logger * logger;
};

class BackendFinalizerUtil
{
public:
    /// Release global level resources like global_context/shared_context. Invoked only once in the lifetime of process when JVM is shuting down.
    static void finalizeGlobally();

    /// Release session level resources like StorageJoinBuilder. Invoked every time executor/driver shutdown.
    static void finalizeSessionally();

    static std::vector<String> paths_need_to_clean;
    static std::mutex paths_mutex;
};

// Ignore memory track, memory should free before IgnoreMemoryTracker deconstruction
class IgnoreMemoryTracker
{
public:
    explicit IgnoreMemoryTracker(size_t limit_) : limit(limit_) { DB::CurrentThread::get().untracked_memory_limit += limit; }
    ~IgnoreMemoryTracker() { DB::CurrentThread::get().untracked_memory_limit -= limit; }

private:
    size_t limit;
};

class DateTimeUtil
{
public:
    static Int64 currentTimeMillis();
    static String convertTimeZone(const String & time_zone);
};

class MemoryUtil
{
public:
    static UInt64 getMemoryRSS();
};

class JoinUtil
{
public:
    static void reorderJoinOutput(DB::QueryPlan & plan, DB::Names cols);
    static std::pair<DB::JoinKind, DB::JoinStrictness>
    getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool is_existence_join);
    static std::pair<DB::JoinKind, DB::JoinStrictness> getCrossJoinKindAndStrictness(substrait::CrossRel_JoinType join_type);
};

}