/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

#include "MemoryBufferBuilder.h"

#include <streaming/runtime/streamrecord/StreamRecord.h>


namespace omnistream::datastream {
    MemoryBufferBuilder::MemoryBufferBuilder(MemorySegment *memorySegment,
                                                 std::shared_ptr<BufferRecycler> recycler)
            : BufferBuilder(new NetworkBuffer(memorySegment, recycler)), memorySegment(memorySegment) {
        positionMarker->addRef();
        auto* marker = positionMarker;
        taskId = TimerThreadPool::GetTimerThreadPoolInstance()->addPeriodicTask(200,[marker]() {
            marker->commit();
        });
    }

    MemoryBufferBuilder::~MemoryBufferBuilder() {
        TimerThreadPool::GetTimerThreadPoolInstance()->cancel(taskId);
        positionMarker->release();
    }

    int MemoryBufferBuilder::appendAndCommit(void *source)
    {
        int writtenBytes = append(source);
        commitCount++;
        if (commitCount > MAX_COMMIT_COUNT) {
            commit();
            commitCount = 0;
        }
        return writtenBytes;
    }

    int MemoryBufferBuilder::append(void *source)
    {
        if (isFinished()) {
            throw std::runtime_error("BufferBuilder is finished");
        }
        LOG_PART(" Put a record to buffer builder :" << this  << " at positionMarker->getCached()" << positionMarker->getCached())

        auto record = reinterpret_cast<StreamRecord*>(source);
        auto value = reinterpret_cast<ByteBuffer*>(record->getValue());

        int needed = value->remaining();
        int maxCapacity = getMaxCapacity();
        int cached = positionMarker->getCached();
        int available = maxCapacity - cached;
        int toCopy = std::min(needed, available);

        LOG("put source to memorySegment")
        int position_ = value->position();
        memorySegment->put(cached, value->getValue(), position_, toCopy);
        value->setPosition(position_ + toCopy);

        LOG("toCopy is " << toCopy << " current mark is " << positionMarker->getCached() << " value position is " << value->position())
        positionMarker->move(toCopy);
        return toCopy;
    }

    int MemoryBufferBuilder::appendRawBytes(const uint8_t* source, int length)
    {
        if (isFinished()) {
            throw std::runtime_error("BufferBuilder is finished");
        }
        if (length < 0) {
            throw std::invalid_argument("length must be non-negative");
        }
        if (length == 0) {
            return 0;
        }
        if (source == nullptr) {
            throw std::invalid_argument("source must not be null when length > 0");
        }

        int available = getMaxCapacity() - positionMarker->getCached();
        int toCopy = std::min(length, available);
        if (toCopy <= 0) {
            return 0;
        }

        memorySegment->put(positionMarker->getCached(), source, 0, toCopy);
        positionMarker->move(toCopy);
        commit();
        return toCopy;
    }

    std::shared_ptr<BufferConsumer> MemoryBufferBuilder::createBufferConsumerFromBeginning()
    {
        return createBufferConsumer(0);
    }

    std::shared_ptr<BufferConsumer> MemoryBufferBuilder::createBufferConsumer(int currentReaderPosition)
    {
        if (bufferConsumerCreated) {
            throw std::runtime_error("Two BufferConsumer shouldn't exist for one BufferBuilder");
        }
        bufferConsumerCreated = true;
        positionMarker->addRef();
        return std::make_shared<MemoryBufferConsumer>(reinterpret_cast<NetworkBuffer*>(buffer->RetainBuffer()), positionMarker, currentReaderPosition);
    }


    std::string MemoryBufferBuilder::toString()
    {
        std::stringstream ss;
        ss << "ObjectBufferBuilder{maxCapacity=" << maxCapacity
           << ", committedBytes=" << positionMarker->getCached()
           << ", finished=" << isFinished() << "}";
        return ss.str();
    }
    Segment* MemoryBufferBuilder::GetSegment()
    {
        return memorySegment;
    }
}