/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "config.h"

#include <memory>
#include <shared_mutex>
#include <Core/Settings.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <IO/BoundedReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadSettings.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/S3Common.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/SharedThreadPools.h>
#include <IO/SplittableBzip2ReadBuffer.h>
#include <IO/ParallelReadBuffer.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Cache/FileCacheSettings.h>
#include <Interpreters/Context.h>
#include <Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h>
#include <Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <boost/compute/detail/lru_cache.hpp>
#include <sys/stat.h>
#include <Poco/Logger.h>
#include <Poco/URI.h>
#include <Common/CHUtil.h>
#include <Common/GlutenConfig.h>
#include <Common/GlutenSettings.h>
#include <Common/logger_useful.h>
#include <Common/safe_cast.h>

#if USE_AZURE_BLOB_STORAGE
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h>
#endif

#if USE_AWS_S3
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#endif

#if USE_HDFS
#include <hdfs/hdfs.h>
#endif

namespace DB
{
namespace Setting
{
extern const SettingsUInt64 s3_max_redirects;
extern const SettingsBool s3_disable_checksum;
extern const SettingsUInt64 s3_retry_attempts;
extern const SettingsMaxThreads max_download_threads;
extern const SettingsUInt64 max_download_buffer_size;
extern const SettingsBool input_format_allow_seeks;
extern const SettingsUInt64 max_read_buffer_size;
}
namespace ErrorCodes
{
    extern const int BAD_ARGUMENTS;
    extern const int CANNOT_OPEN_FILE;
    extern const int UNKNOWN_FILE_SIZE;
    extern const int CANNOT_SEEK_THROUGH_FILE;
    extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
}
}

namespace local_engine
{

using namespace DB;
FileCacheConcurrentMap ReadBufferBuilder::files_cache_time_map;

template <class key_type, class value_type>
class ConcurrentLRU
{
public:
    explicit ConcurrentLRU(size_t size) : cache(size) { }
    boost::optional<value_type> get(const key_type & key)
    {
        std::shared_lock<std::shared_mutex> lock(rwLock);
        return cache.get(key);
    }


    void insert(const key_type & key, const value_type & value)
    {
        std::unique_lock<std::shared_mutex> lock(rwLock);
        cache.insert(key, value);
    }

    void clear()
    {
        std::unique_lock<std::shared_mutex> lock(rwLock);
        cache.clear();
    }

private:
    boost::compute::detail::lru_cache<key_type, value_type> cache;
    std::shared_mutex rwLock;
};

static std::pair<size_t, size_t> getAdjustedReadRange(SeekableReadBuffer & buffer, const std::pair<size_t, size_t> & start_end)
{
    auto get_next_line_pos = [&](SeekableReadBuffer & buf) -> size_t
    {
        while (!buf.eof())
        {
            /// Search for \n or \r\n or \n\r or \r in buffer.
            if (*buf.position() == '\r')
            {
                ++buf.position();

                if (!buf.eof() && *buf.position() == '\n')
                {
                    ++buf.position();
                }

                return buf.getPosition();
            }
            else if (*buf.position() == '\n')
            {
                ++buf.position();

                if (!buf.eof() && *buf.position() == '\r')
                {
                    ++buf.position();
                }

                return buf.getPosition();
            }

            ++buf.position();
        }

        return buf.getPosition();
    };

    size_t read_start_pos = start_end.first;
    size_t read_end_pos = start_end.second;
    std::pair<size_t, size_t> result;

    if (read_start_pos == 0)
        result.first = read_start_pos;
    else
    {
        buffer.seek(read_start_pos, SEEK_SET);
        result.first = get_next_line_pos(buffer);
    }

    if (read_end_pos == 0)
        result.second = read_end_pos;
    else
    {
        buffer.seek(read_end_pos, SEEK_SET);
        result.second = get_next_line_pos(buffer);
    }

    return result;
}

static std::unique_ptr<SeekableReadBuffer>
adjustReadRangeIfNeeded(std::unique_ptr<SeekableReadBuffer> read_buffer, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info)
{
    /// Skip formats in which rows are not seperated by newline characters.
    if (!(file_info.has_text() || file_info.has_json()))
        return std::move(read_buffer);

    /// Skip text/json files with compression.
    /// When the file is compressed, its read range is adjusted in [[buildWithCompressionWrapper]]
    Poco::URI file_uri(file_info.uri_file());
    DB::CompressionMethod compression = DB::chooseCompressionMethod(file_uri.getPath(), "auto");
    if (compression != CompressionMethod::None)
        return std::move(read_buffer);

    std::pair<size_t, size_t> start_end{file_info.start(), file_info.start() + file_info.length()};
    start_end = getAdjustedReadRange(*read_buffer, start_end);

    LOG_DEBUG(
        &Poco::Logger::get("ReadBufferBuilder"),
        "File read start and end position adjusted from {},{} to {},{}",
        file_info.start(),
        file_info.start() + file_info.length(),
        start_end.first,
        start_end.second);
#if USE_HDFS
    /// If read buffer doesn't support right bounded reads, wrap it with BoundedReadBuffer to enable right bounded reads.
    if (dynamic_cast<DB::ReadBufferFromHDFS *>(read_buffer.get()) || dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(read_buffer.get())
        || dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
        read_buffer = std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));
#else 
    if (dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
        read_buffer = std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));
#endif
    read_buffer->seek(start_end.first, SEEK_SET);
    read_buffer->setReadUntilPosition(start_end.second);
    return std::move(read_buffer);
}

class LocalFileReadBufferBuilder : public ReadBufferBuilder
{
public:
    explicit LocalFileReadBufferBuilder(const DB::ContextPtr & context_) : ReadBufferBuilder(context_) { }
    ~LocalFileReadBufferBuilder() override = default;

    bool isRemote() const override { return false; }

    std::unique_ptr<DB::ReadBuffer>
    build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
    {
        Poco::URI file_uri(file_info.uri_file());
        std::unique_ptr<DB::ReadBufferFromFileBase> read_buffer;
        const String & file_path = file_uri.getPath();
        struct stat file_stat;
        if (stat(file_path.c_str(), &file_stat))
            throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "file stat failed for {}", file_path);

        if (S_ISREG(file_stat.st_mode))
            read_buffer = std::make_unique<DB::ReadBufferFromFilePRead>(file_path);
        else
            read_buffer = std::make_unique<DB::ReadBufferFromFile>(file_path);

        return adjustReadRangeIfNeeded(std::move(read_buffer), file_info);
    }
};

#if USE_HDFS
class HDFSFileReadBufferBuilder : public ReadBufferBuilder
{
    using ReadBufferCreator = std::function<std::unique_ptr<DB::ReadBufferFromFileBase>(bool restricted_seek, const DB::StoredObject & object)>;
public:
    explicit HDFSFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_), context(context_) { }
    ~HDFSFileReadBufferBuilder() override = default;

    std::unique_ptr<DB::ReadBuffer>
    build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
    {
        DB::ReadSettings read_settings = getReadSettings();
        const auto & config = context->getConfigRef();

        /// Get hdfs_uri
        Poco::URI uri(file_info.uri_file());
        auto hdfs_file_path = uri.getPath();

        std::string new_file_uri = uri.toString();
        if (uri.getUserInfo().empty() && BackendInitializerUtil::spark_user.has_value())
        {
            uri.setUserInfo(*BackendInitializerUtil::spark_user);
            new_file_uri = uri.toString();
        }

        auto begin_of_path = new_file_uri.find('/', new_file_uri.find("//") + 2);
        auto hdfs_uri = new_file_uri.substr(0, begin_of_path);

        std::optional<size_t> file_size;
        std::optional<size_t> modified_time;
        if (file_info.has_properties())
        {
            if (file_info.properties().filesize() > 0)
            {
                /// filesize may be zero, under such condition we should not set file_size
                file_size = file_info.properties().filesize();
            }

            modified_time = file_info.properties().modificationtime();
        }

        std::unique_ptr<SeekableReadBuffer> read_buffer;
        if (!read_settings.enable_filesystem_cache)
        {
            bool thread_pool_read = read_settings.remote_fs_method == DB::RemoteFSReadMethod::threadpool;
            /// ORC and Parquet reader had already implemented async prefetch. They don't rely on AsynchronousReadBufferFromHDFS
            bool use_async_prefetch
                = read_settings.remote_fs_prefetch && thread_pool_read && (file_info.has_text() || file_info.has_json());
            auto raw_read_buffer = std::make_unique<ReadBufferFromHDFS>(
                    hdfs_uri,
                    hdfs_file_path,
                    config,
                    read_settings,
                    /* read_until_position */ 0,
                    /* use_external_buffer */ true,
                    file_size);

            if (use_async_prefetch)
                read_buffer = std::make_unique<AsynchronousReadBufferFromHDFS>(
                    getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, std::move(raw_read_buffer));
            else
                read_buffer = std::move(raw_read_buffer);
        }
        else
        {
            if (!file_size.has_value())
            {
                // only for spark3.2 file partition not contained file size
                // so first compute file size first
                auto tmp_read_buffer = std::make_unique<DB::ReadBufferFromHDFS>(
                    hdfs_uri,
                    hdfs_file_path,
                    config,
                    read_settings,
                    /* read_until_position */ 0,
                    /* use_external_buffer */ true);
                file_size = tmp_read_buffer->getFileSize();
            }

            if (!modified_time.has_value())
                modified_time = 0;

            ReadBufferCreator read_buffer_creator
                = [this, hdfs_uri = hdfs_uri, hdfs_file_path = hdfs_file_path, read_settings, &config](
                      bool /* restricted_seek */, const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromHDFS> {
                return std::make_unique<DB::ReadBufferFromHDFS>(
                    hdfs_uri, hdfs_file_path, config, read_settings, 0, true, object.bytes_size);
            };

            auto remote_path = uri.getPath().substr(1);
            DB::StoredObjects stored_objects{DB::StoredObject{remote_path, "", *file_size}};
            auto cache_creator = wrapWithCache(
                read_buffer_creator, read_settings, remote_path, *modified_time, *file_size);
            size_t buffer_size = std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
            if (*file_size > 0)
                buffer_size = std::min(*file_size, buffer_size);
            auto cache_hdfs_read = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
                std::move(cache_creator), stored_objects, read_settings, nullptr, /* use_external_buffer */ false, buffer_size);
            read_buffer = std::move(cache_hdfs_read);
        }

        return adjustReadRangeIfNeeded(std::move(read_buffer), file_info);
    }

private:
    DB::ContextPtr context;
};
#endif

#if USE_AWS_S3
class S3FileReadBufferBuilder : public ReadBufferBuilder
{
    friend void registerReadBufferBuilders();


public:
    explicit S3FileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_)
    {
        auto config = S3Config::loadFromContext(context);
        // use gluten cache config is first priority
        if (!file_cache && config.s3_local_cache_enabled)
        {
            DB::FileCacheSettings file_cache_settings;
            file_cache_settings.max_size = config.s3_local_cache_max_size;
            auto cache_base_path = config.s3_local_cache_cache_path;

            if (!std::filesystem::exists(cache_base_path))
                std::filesystem::create_directories(cache_base_path);

            file_cache_settings.base_path = cache_base_path;
            file_cache = DB::FileCacheFactory::instance().getOrCreate("s3_local_cache", file_cache_settings, "");
            file_cache->initialize();
        }
    }

    ~S3FileReadBufferBuilder() override = default;

    std::unique_ptr<DB::ReadBuffer>
    build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
    {
        DB::ReadSettings read_settings = getReadSettings();
        Poco::URI file_uri(file_info.uri_file());
        // file uri looks like: s3a://my-dev-bucket/tpch100/part/0001.parquet
        const std::string& bucket = file_uri.getHost();
        const auto client = getClient(bucket);
        std::string pathKey = file_uri.getPath().substr(1);

        size_t object_size = 0;
        size_t object_modified_time = 0;
        if (file_info.has_properties())
        {
            object_size = file_info.properties().filesize();
            object_modified_time = file_info.properties().modificationtime();
        }
        else
        {
            DB::S3::ObjectInfo object_info =  DB::S3::getObjectInfo(*client, bucket, pathKey, "");
            object_size = object_info.size;
            object_modified_time = object_info.last_modification_time;
        }

        auto read_buffer_creator
            = [bucket, client, read_settings, this](bool restricted_seek, const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
        {
                return std::make_unique<DB::ReadBufferFromS3>(
                    client,
                    bucket,
                    object.remote_path,
                    "",
                    DB::S3::S3RequestSettings(),
                    read_settings,
                    /* use_external_buffer */ true,
                    /* offset */ 0,
                    /* read_until_position */ 0,
                    restricted_seek);
        };

        auto cache_creator = wrapWithCache(read_buffer_creator, read_settings, pathKey, object_modified_time, object_size);

        DB::StoredObjects stored_objects{DB::StoredObject{pathKey, "", object_size}};
        auto s3_impl = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
            std::move(cache_creator), stored_objects, read_settings, /* cache_log */ nullptr, /* use_external_buffer */ true, 0);

        auto & pool_reader = context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
        size_t buffer_size = std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
        if (object_size > 0)
            buffer_size = std::min(object_size, buffer_size);
        auto async_reader
            = std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl), pool_reader, read_settings, buffer_size,read_settings.remote_read_min_bytes_for_seek);
        if (read_settings.remote_fs_prefetch)
            async_reader->prefetch(Priority{});

        return adjustReadRangeIfNeeded(std::move(async_reader), file_info);
    }

private:
    static const std::string SHARED_CLIENT_KEY;
    static ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>> per_bucket_clients;

    static std::string toBucketNameSetting(const std::string & bucket_name, const std::string & config_name)
    {
        if (!config_name.starts_with(BackendInitializerUtil::S3A_PREFIX))
        {
            // Currently per bucket only support fs.s3a.xxx
            return config_name;
        }
        // like: fs.s3a.bucket.bucket_name.assumed.role.externalId
        return BackendInitializerUtil::S3A_PREFIX + "bucket." + bucket_name + "."
            + config_name.substr(BackendInitializerUtil::S3A_PREFIX.size());
    }

    static std::string getSetting(
        const DB::Settings & settings,
        const std::string & bucket_name,
        const std::string & config_name,
        const std::string & default_value = "",
        const bool require_per_bucket = false)
    {
        std::string ret;
        // if there's a bucket specific config, prefer it to non per bucket config
        if (tryGetString(settings, toBucketNameSetting(bucket_name, config_name), ret))
            return ret;

        if (!require_per_bucket && tryGetString(settings, config_name, ret))
            return ret;

        return default_value;
    }

    void cacheClient(const std::string & bucket_name, const bool is_per_bucket, std::shared_ptr<DB::S3::Client> client)
    {
        if (is_per_bucket)
        {
            per_bucket_clients.insert(bucket_name, client);
        }
        else
        {
            per_bucket_clients.insert(SHARED_CLIENT_KEY, client);
        }
    }

    std::shared_ptr<DB::S3::Client> getClient(const std::string & bucket_name)
    {
        const auto & config = context->getConfigRef();
        const auto & settings = context->getSettingsRef();
        bool use_assumed_role = false;
        bool is_per_bucket = false;

        if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE).empty())
            use_assumed_role = true;

        if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE, "", true).empty())
            is_per_bucket = true;

        if (is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE))
        {
            auto client = per_bucket_clients.get(bucket_name);
            if (client.has_value())
            {
                return client.get();
            }
        }

        if (!is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE))
        {
            auto client = per_bucket_clients.get(SHARED_CLIENT_KEY);
            if (client.has_value())
                return client.get();
        }

        String config_prefix = "s3";
        auto endpoint = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ENDPOINT, "https://s3.us-west-2.amazonaws.com");
        if (!endpoint.starts_with("https://"))
        {
            if (endpoint.starts_with("s3"))
                // as https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/bk_cloud-data-access/content/s3-config-parameters.html
                // the fs.s3a.endpoint does not contain https:// prefix
                endpoint = "https://" + endpoint;
            else
                throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "S3 Endpoint format not right: {}", endpoint);
        }
        String region_name;
        const char * amazon_suffix = ".amazonaws.com";
        const char * amazon_prefix = "https://s3.";
        auto pos = endpoint.find(amazon_suffix);
        if (pos != std::string::npos)
        {
            assert(endpoint.starts_with(amazon_prefix));
            region_name = endpoint.substr(strlen(amazon_prefix), pos - strlen(amazon_prefix));
            assert(region_name.find('.') == std::string::npos);
        }
        // for AWS CN, the endpoint is like: https://s3.cn-north-1.amazonaws.com.cn, can still work

        DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
            region_name,
            context->getRemoteHostFilter(),
            static_cast<unsigned>(context->getSettingsRef()[DB::Setting::s3_max_redirects]),
            static_cast<unsigned>(context->getSettingsRef()[DB::Setting::s3_retry_attempts]),
            false,
            false,
            nullptr,
            nullptr);

        client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000);
        client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000);
        client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
        client_configuration.endpointOverride = endpoint;

        client_configuration.retryStrategy
            = std::make_shared<Aws::Client::DefaultRetryStrategy>(config.getUInt(config_prefix + ".retry_attempts", 10));

        std::string ak;
        std::string sk;
        tryGetString(settings, BackendInitializerUtil::HADOOP_S3_ACCESS_KEY, ak);
        tryGetString(settings, BackendInitializerUtil::HADOOP_S3_SECRET_KEY, sk);
        const DB::Settings & global_settings = context->getGlobalContext()->getSettingsRef();
        const DB::Settings & local_settings = context->getSettingsRef();
        DB::S3::ClientSettings client_settings{
            .use_virtual_addressing = false,
            .disable_checksum = local_settings[DB::Setting::s3_disable_checksum],
            .gcs_issue_compose_request = context->getConfigRef().getBool("s3.gcs_issue_compose_request", false),
        };
        if (use_assumed_role)
        {
            auto new_client = DB::S3::ClientFactory::instance().create(
                client_configuration,
                client_settings,
                ak,     // access_key_id
                sk,     // secret_access_key
                "",     // server_side_encryption_customer_key_base64
                {},     // sse_kms_config
                {},     // headers
                DB::S3::CredentialsConfiguration{
                    .use_environment_credentials = true,
                    .use_insecure_imds_request = false,
                    .role_arn = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE),
                    .session_name = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_SESSION_NAME),
                    .external_id = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_EXTERNAL_ID)
                });

            //TODO: support online change config for cached per_bucket_clients
            std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
            cacheClient(bucket_name, is_per_bucket, ret);
            return ret;
        }
        else
        {
            auto new_client = DB::S3::ClientFactory::instance().create(
                client_configuration,
                client_settings,
                ak,     // access_key_id
                sk,     // secret_access_key
                "",     // server_side_encryption_customer_key_base64
                {},     // sse_kms_config
                {},     // headers
                DB::S3::CredentialsConfiguration{
                    .use_environment_credentials = true,
                    .use_insecure_imds_request = false});

            std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
            cacheClient(bucket_name, is_per_bucket, ret);
            return ret;
        }
    }
};
const std::string S3FileReadBufferBuilder::SHARED_CLIENT_KEY = "___shared-client___";
ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>> S3FileReadBufferBuilder::per_bucket_clients(100);

#endif

#if USE_AZURE_BLOB_STORAGE
class AzureBlobReadBuffer : public ReadBufferBuilder
{
public:
    explicit AzureBlobReadBuffer(const DB::ContextPtr & context_) : ReadBufferBuilder(context_) { }
    ~AzureBlobReadBuffer() override = default;

    std::unique_ptr<DB::ReadBuffer> build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
    {
        Poco::URI file_uri(file_info.uri_file());
        return std::make_unique<DB::ReadBufferFromAzureBlobStorage>(getClient(), file_uri.getPath(), DB::ReadSettings(), 5, 5);
    }

private:

    std::shared_ptr<DB::AzureBlobStorage::ContainerClient> shared_client;

    std::shared_ptr<DB::AzureBlobStorage::ContainerClient> getClient()
    {
        if (shared_client)
            return shared_client;

        const std::string config_prefix = "blob";
        const Poco::Util::AbstractConfiguration & config = context->getConfigRef();
        bool is_client_for_disk = false;
        auto new_settings = DB::AzureBlobStorage::getRequestSettings(config, config_prefix, context);
        DB::AzureBlobStorage::ConnectionParams params
        {
            .endpoint = DB::AzureBlobStorage::processEndpoint(config, config_prefix),
            .auth_method = DB::AzureBlobStorage::getAuthMethod(config, config_prefix),
            .client_options = DB::AzureBlobStorage::getClientOptions(*new_settings, is_client_for_disk),
        };

        shared_client = DB::AzureBlobStorage::getContainerClient(params, true);
        return shared_client;
    }
};
#endif

void registerReadBufferBuilders()
{
    auto & factory = ReadBufferBuilderFactory::instance();
    factory.registerBuilder("file", [](DB::ContextPtr context_) { return std::make_shared<LocalFileReadBufferBuilder>(context_); });

#if USE_HDFS
    factory.registerBuilder("hdfs", [](DB::ContextPtr context_) { return std::make_shared<HDFSFileReadBufferBuilder>(context_); });
#endif

#if USE_AWS_S3
    factory.registerBuilder("s3", [](DB::ContextPtr context_) { return std::make_shared<S3FileReadBufferBuilder>(context_); });
    factory.registerBuilder("s3a", [](DB::ContextPtr context_) { return std::make_shared<S3FileReadBufferBuilder>(context_); });
    factory.registerCleaner([]() { S3FileReadBufferBuilder::per_bucket_clients.clear(); });
#endif

#if USE_AZURE_BLOB_STORAGE
    factory.registerBuilder("wasb", [](DB::ContextPtr context_) { return std::make_shared<AzureBlobReadBuffer>(context_); });
    factory.registerBuilder("wasbs", [](DB::ContextPtr context_) { return std::make_shared<AzureBlobReadBuffer>(context_); });
#endif
}

DB::ReadSettings ReadBufferBuilder::getReadSettings() const
{
    DB::ReadSettings read_settings = context->getReadSettings();
    const auto & config = context->getConfigRef();

    /// Override enable_filesystem_cache with gluten config
    read_settings.enable_filesystem_cache = config.getBool("gluten_cache.local.enabled", false);

    /// Override remote_fs_prefetch with gluten config
    read_settings.remote_fs_prefetch = config.getBool("hdfs.enable_async_io", false);

    return read_settings;
}

ReadBufferBuilder::ReadBufferBuilder(const DB::ContextPtr & context_) : context(context_)
{
}

std::unique_ptr<DB::ReadBuffer>
ReadBufferBuilder::wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) const
{
    /// Bzip2 compressed file is splittable and we need to adjust read range for each split
    auto * seekable = dynamic_cast<SeekableReadBuffer *>(in.release());
    if (!seekable)
        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "ReadBuffer underlying BZIP2 decompressor must be seekable");
    std::unique_ptr<SeekableReadBuffer> seekable_in(seekable);

    size_t file_size = getFileSizeFromReadBuffer(*seekable_in);
    size_t start = file_info.start();
    size_t end = file_info.start() + file_info.length();

    /// No need to adjust start becuase it is already processed inside SplittableBzip2ReadBuffer
    size_t new_start = start;

    /// Extend end to the end of next block.
    size_t new_end = end;
    if (end < file_size)
    {
        Int64 bs_buff = 0;
        Int64 bs_live = 0;

        /// From end position skip to the second block delimiter.
        seekable_in->seek(end, SEEK_SET);
        for (size_t i = 0; i < 2; ++i)
        {
            size_t pos = seekable_in->getPosition();
            bool ok = SplittableBzip2ReadBuffer::skipToNextMarker(
                SplittableBzip2ReadBuffer::BLOCK_DELIMITER,
                SplittableBzip2ReadBuffer::DELIMITER_BIT_LENGTH,
                *seekable_in,
                bs_buff,
                bs_live);

            if (seekable_in->eof())
                break;

            if (!ok)
                throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find next block delimiter in after offset: {}", pos);
        }
        new_end = seekable->eof() ? file_size : seekable_in->getPosition() - SplittableBzip2ReadBuffer::DELIMITER_BIT_LENGTH / 8 + 1;
    }

    LOG_DEBUG(
        &Poco::Logger::get("ReadBufferBuilder"),
        "File read start and end position adjusted from {},{} to {},{}",
        start,
        end,
        new_start,
        new_end);

    std::unique_ptr<SeekableReadBuffer> bounded_in;
#if USE_HDFS
    if (dynamic_cast<DB::ReadBufferFromHDFS *>(seekable_in.get()) || dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(seekable_in.get())
        || dynamic_cast<DB::ReadBufferFromFile *>(seekable_in.get()))
        bounded_in = std::make_unique<BoundedReadBuffer>(std::move(seekable_in));
    else
        bounded_in = std::move(seekable_in);
#else
    if (dynamic_cast<DB::ReadBufferFromFile *>(seekable_in.get()))
        bounded_in = std::make_unique<BoundedReadBuffer>(std::move(seekable_in));
    else
        bounded_in = std::move(seekable_in);
#endif
    bounded_in->seek(new_start, SEEK_SET);
    bounded_in->setReadUntilPosition(new_end);
    bool first_block_need_special_process = (new_start > 0);
    bool last_block_need_special_process = (new_end < file_size);
    size_t buffer_size = context->getSettingsRef()[DB::Setting::max_read_buffer_size];
    auto decompressed_in = std::make_unique<SplittableBzip2ReadBuffer>(
        std::move(bounded_in), first_block_need_special_process, last_block_need_special_process, buffer_size);
    return std::move(decompressed_in);
}

std::unique_ptr<DB::ReadBuffer>
ReadBufferBuilder::buildWithCompressionWrapper(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info)
{
    auto in = build(file_info);

    /// Wrap the read buffer with compression method if exists
    Poco::URI file_uri(file_info.uri_file());
    DB::CompressionMethod compression = DB::chooseCompressionMethod(file_uri.getPath(), "auto");

    if (compression == CompressionMethod::Bzip2)
        return wrapWithBzip2(std::move(in), file_info);
    else
    {
        /// In this case we are pretty sure that current split covers the whole file because only bzip2 compression is splittable
        auto parallel = wrapWithParallelIfNeeded(std::move(in), file_info);
        return wrapReadBufferWithCompressionMethod(std::move(parallel), compression);
    }
}

ReadBufferBuilder::ReadBufferCreator ReadBufferBuilder::wrapWithCache(
    ReadBufferCreator read_buffer_creator,
    DB::ReadSettings & read_settings,
    const String & key,
    size_t last_modified_time,
    size_t file_size)
{
    const auto & config = context->getConfigRef();
    if (!config.getBool("gluten_cache.local.enabled", false))
        return read_buffer_creator;

    read_settings.enable_filesystem_cache = true;
    if (!file_cache)
    {
        DB::FileCacheSettings file_cache_settings;
        file_cache_settings.loadFromConfig(config, "gluten_cache.local");

        if (std::filesystem::path(file_cache_settings.base_path).is_relative())
            file_cache_settings.base_path = std::filesystem::path(context->getPath()) / "caches" / file_cache_settings.base_path;

        if (!std::filesystem::exists(file_cache_settings.base_path))
            std::filesystem::create_directories(file_cache_settings.base_path);

        const auto name = config.getString("gluten_cache.local.name");
        const auto * config_prefix = "";
        file_cache = DB::FileCacheFactory::instance().getOrCreate(name, file_cache_settings, config_prefix);
        file_cache->initialize();
    }

    if (!file_cache->isInitialized())
    {
        file_cache->throwInitExceptionIfNeeded();
        return read_buffer_creator;
    }

    updateCaches(key, last_modified_time, file_size);

    return [read_buffer_creator, read_settings, this](
                   bool restricted_seek, const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
    {
        auto cache_key = DB::FileCacheKey::fromPath(object.remote_path);
        auto modified_read_settings = read_settings.withNestedBuffer();
        auto rbc = [=, this]() { return read_buffer_creator(restricted_seek, object); };

        return std::make_unique<DB::CachedOnDiskReadBufferFromFile>(
            object.remote_path,
            cache_key,
            file_cache,
            DB::FileCache::getCommonUser(),
            rbc,
            modified_read_settings,
            std::string(DB::CurrentThread::getQueryId()),
            object.bytes_size,
            /* allow_seeks */ !read_settings.remote_read_buffer_restrict_seek,
            /* use_external_buffer */ true,
            /* read_until_position */ std::nullopt,
            context->getFilesystemCacheLog());
    };
}

void ReadBufferBuilder::updateCaches(const String & key, size_t last_modified_time, size_t file_size) const
{
    if (!file_cache)
        return;

    auto file_cache_key = DB::FileCacheKey::fromPath(key);
    auto last_cache_time = files_cache_time_map.get(file_cache_key);
    // quick check
    if (last_cache_time != std::nullopt && last_cache_time.has_value())
    {
        auto & [cached_modified_time, cached_file_size] = last_cache_time.value();
        if (cached_modified_time < last_modified_time || cached_file_size != file_size)
            files_cache_time_map.update_cache_time(file_cache_key, last_modified_time, file_size, file_cache);
    }
    else
    {
        // if process restart, cache map will be empty,
        //   we recommend continuing to use caching instead of renew it
        files_cache_time_map.insert(file_cache_key, last_modified_time, file_size);
    }
}

std::unique_ptr<DB::ReadBuffer> ReadBufferBuilder::wrapWithParallelIfNeeded(
    std::unique_ptr<DB::ReadBuffer> in, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info)
{
    /// Only use parallel downloading for text and json format because data are read serially in those formats.
    if (!file_info.has_text() && !file_info.has_json())
        return std::move(in);

    const auto & settings = context->getSettingsRef();
    auto max_download_threads = settings[DB::Setting::max_download_threads];
    auto max_download_buffer_size = settings[DB::Setting::max_download_buffer_size];

    bool parallel_read = isRemote() && max_download_threads > 1 && isBufferWithFileSize(*in);
    if (!parallel_read)
        return std::move(in);

    size_t file_size = getFileSizeFromReadBuffer(*in);
    if (file_size < 4 * max_download_buffer_size)
        return std::move(in);

    LOG_TRACE(
        getLogger("ReadBufferBuilder"),
        "Using ParallelReadBuffer with {} workers with chunks of {} bytes",
        max_download_threads,
        max_download_buffer_size);

    return wrapInParallelReadBufferIfSupported(
        {std::move(in)},
        DB::threadPoolCallbackRunnerUnsafe<void>(DB::getIOThreadPool().get(), "ParallelRead"),
        max_download_threads,
        max_download_buffer_size,
        file_size);
}

ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance()
{
    static ReadBufferBuilderFactory instance;
    return instance;
}

void ReadBufferBuilderFactory::registerBuilder(const String & schema, NewBuilder newer)
{
    auto it = builders.find(schema);
    if (it != builders.end())
        throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "readbuffer builder for {} has been registered", schema);
    builders[schema] = newer;
}

ReadBufferBuilderPtr ReadBufferBuilderFactory::createBuilder(const String & schema, DB::ContextPtr context)
{
    auto it = builders.find(schema);
    if (it == builders.end())
        throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Not found read buffer builder for {}", schema);
    return it->second(context);
}

void ReadBufferBuilderFactory::registerCleaner(Cleaner cleaner)
{
    cleaners.push_back(cleaner);
}

void ReadBufferBuilderFactory::clean()
{
    for (auto c : cleaners)
    {
        c();
    }
}
}