/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#pragma once
#include <memory>
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <Storages/IO/NativeWriter.h>
#include <Shuffle/SelectorBuilder.h>
#include <base/types.h>
#include <Storages/IO/CompressedWriteBuffer.h>


namespace local_engine
{
    class SparkExchangeManager;
}

namespace local_engine
{
struct SplitOptions
{
    size_t split_size = DB::DEFAULT_BLOCK_SIZE;
    size_t io_buffer_size = DB::DBMS_DEFAULT_BUFFER_SIZE;
    std::string data_file;
    std::vector<std::string> local_dirs_list;
    int num_sub_dirs;
    int shuffle_id;
    int map_id;
    size_t partition_num;
    std::string hash_exprs;
    std::string out_exprs;
    std::string compress_method = "zstd";
    std::optional<int> compress_level;
    size_t spill_threshold = 300 * 1024 * 1024;
    std::string hash_algorithm;
    size_t max_sort_buffer_size = 1_GiB;
    bool force_memory_sort = false;
};

class ColumnsBuffer
{
public:
    ColumnsBuffer(size_t prefer_buffer_size = 8192);
    ~ColumnsBuffer() = default;

    void add(DB::Block & columns, int start, int end);
    void appendSelective(size_t column_idx, const DB::Block & source, const DB::IColumn::Selector & selector, size_t from, size_t length);

    DB::Block getHeader();
    size_t size() const;
    bool empty() const;

    DB::Block releaseColumns();

    size_t bytes() const
    {
        size_t res = 0;
        for (const auto & col : accumulated_columns)
            res += col->byteSize();
        return res;
    }

private:
    DB::MutableColumns accumulated_columns;
    DB::Block header;
    size_t prefer_buffer_size;
};
using ColumnsBufferPtr = std::shared_ptr<ColumnsBuffer>;

struct SplitResult
{
    UInt64 total_compute_pid_time = 0;           // Total nanoseconds to compute partition id
    UInt64 total_write_time = 0;                 // Total nanoseconds to write data to local/celeborn, including the time writing to buffer
    UInt64 total_spill_time = 0;                 // Total nanoseconds to execute PartitionWriter::evictPartitions
    UInt64 total_compress_time = 0;              // Total nanoseconds to execute compression before writing data to local/celeborn
    UInt64 total_bytes_written = 0;              // Sum of partition_length
    UInt64 total_bytes_spilled = 0;              // Total bytes of blocks spilled to local/celeborn before serialization and compression
    std::vector<UInt64> partition_lengths;        // Total written bytes of each partition after serialization and compression
    std::vector<UInt64> raw_partition_lengths;    // Total written bytes of each partition after serialization
    UInt64 total_split_time = 0;                 // Total nanoseconds to execute CachedShuffleWriter::split, excluding total_compute_pid_time
    UInt64 total_io_time = 0;                    // Total nanoseconds to write data to local/celeborn, excluding the time writing to buffer
    UInt64 total_serialize_time = 0;             // Total nanoseconds to execute spill_to_file/spill_to_celeborn. Bad naming, it works not as the name suggests.
    UInt64 total_rows = 0;
    UInt64 total_blocks = 0;
    UInt64 wall_time = 0;                        // Wall nanoseconds time of shuffle.

    String toString() const
    {
        std::ostringstream oss;

        auto to_seconds = [](UInt64 nanoseconds) -> double {
            return static_cast<double>(nanoseconds) / 1000000000ULL;
        };

        oss << "compute_pid_time(s):" << to_seconds(total_compute_pid_time) << " split_time(s):" << to_seconds(total_split_time)
            << " spill time(s):" << to_seconds(total_spill_time) << " serialize_time(s):" << to_seconds(total_serialize_time)
            << " compress_time(s):" << to_seconds(total_compress_time) << " write_time(s):" << to_seconds(total_write_time)
            << " bytes_writen:" << total_bytes_written << " bytes_spilled:" << total_bytes_spilled
            << " partition_num: " << partition_lengths.size() << std::endl;
        return oss.str();
    }
};

struct SplitterHolder
{
    std::unique_ptr<SparkExchangeManager> exchange_manager;
};


}