/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <city.h>
#include <cstring>

#include <base/types.h>
#include <base/unaligned.h>
#include <base/defines.h>

#include <IO/WriteHelpers.h>

#include <Compression/CompressionFactory.h>
#include <Common/Stopwatch.h>
#include "CompressedWriteBuffer.h"

using namespace DB;

namespace local_engine
{

void CompressedWriteBuffer::nextImpl()
{
    if (!offset())
        return;

    chassert(offset() <= INT_MAX);
    UInt32 decompressed_size = static_cast<UInt32>(offset());
    UInt32 compressed_reserve_size = codec->getCompressedReserveSize(decompressed_size);

    /** During compression we need buffer with capacity >= compressed_reserve_size + CHECKSUM_SIZE.
      *
      * If output buffer has necessary capacity, we can compress data directly into the output buffer.
      * Then we can write checksum at the output buffer begin.
      *
      * If output buffer does not have necessary capacity. Compress data into a temporary buffer.
      * Then we can write checksum and copy the temporary buffer into the output buffer.
      */
    Stopwatch compress_time_watch;
    if (out.available() >= compressed_reserve_size + sizeof(CityHash_v1_0_2::uint128))
    {
        char * out_compressed_ptr = out.position() + sizeof(CityHash_v1_0_2::uint128);
        UInt32 compressed_size = codec->compress(working_buffer.begin(), decompressed_size, out_compressed_ptr);
        compress_time += compress_time_watch.elapsedNanoseconds();
        CityHash_v1_0_2::uint128 checksum_(0,0);
        if (checksum)
        {
            checksum_ = CityHash_v1_0_2::CityHash128(out_compressed_ptr, compressed_size);


        }
        writeBinaryLittleEndian(checksum_.low64, out);
        writeBinaryLittleEndian(checksum_.high64, out);

        out.position() += compressed_size;
    }
    else
    {
        compressed_buffer.resize_exact(compressed_reserve_size);
        UInt32 compressed_size = codec->compress(working_buffer.begin(), decompressed_size, compressed_buffer.data());
        compress_time += compress_time_watch.elapsedNanoseconds();
        CityHash_v1_0_2::uint128 checksum_(0,0);
        if (checksum)
        {
            checksum_ = CityHash_v1_0_2::CityHash128(compressed_buffer.data(), compressed_size);
        }
        writeBinaryLittleEndian(checksum_.low64, out);
        writeBinaryLittleEndian(checksum_.high64, out);

        Stopwatch write_time_watch;
        out.write(compressed_buffer.data(), compressed_size);
        write_time += write_time_watch.elapsedNanoseconds();
    }
}

CompressedWriteBuffer::~CompressedWriteBuffer()
{
    finalize();
}

CompressedWriteBuffer::CompressedWriteBuffer(WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size, bool checksum_)
    : BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), codec(std::move(codec_)), checksum(checksum_)
{
}

}