/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * We modify this part of the code based on Apache Flink to implement native execution of Flink operators.
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 */

#include "ObjectBufferBuilder.h"
#include <algorithm>
#include <sstream>
#include <climits>
#include <atomic>

#include "VectorBatchBuffer.h"

namespace omnistream {

ObjectBufferBuilder::ObjectBufferBuilder(ObjectSegment *objSegment, std::shared_ptr<BufferRecycler> recycler)
    : BufferBuilder(new VectorBatchBuffer(objSegment, recycler)), objSegment(objSegment) {
}

std::shared_ptr<BufferConsumer> ObjectBufferBuilder::createBufferConsumerFromBeginning()
{
    return createBufferConsumer(0);
}

std::shared_ptr<BufferConsumer> ObjectBufferBuilder::createBufferConsumer(int currentReaderPosition)
{
    if (bufferConsumerCreated) {
        throw std::runtime_error("Two BufferConsumer shouldn't exist for one BufferBuilder");
    }
    bufferConsumerCreated = true;
    positionMarker->addRef();
    return std::make_shared<ObjectBufferConsumer>(dynamic_cast<VectorBatchBuffer*>(buffer->RetainBuffer()), positionMarker, currentReaderPosition);
}

int ObjectBufferBuilder::appendAndCommit(void *source)
{
    int writtenBytes = append(source);
    commit();
    return writtenBytes;
}

int ObjectBufferBuilder::append(void *source)
{
    if (isFinished()) {
        throw std::runtime_error("BufferBuilder is finished");
    }
    LOG_PART(" Put a record to buffer builder :" << this  << " at positionMarker->getCached()" << positionMarker->getCached())

    objSegment->putObject(positionMarker->getCached(), reinterpret_cast<StreamElement*>(source));
    LOG("put source to objSegment")
    positionMarker->move(1);
    return 1;
}

StreamElement* ObjectBufferBuilder::getObject(int index)
{
    return objSegment->getObject(index);
}


std::string ObjectBufferBuilder::toString()
{
    std::stringstream ss;
    ss << "ObjectBufferBuilder{maxCapacity=" << maxCapacity
       << ", committedBytes=" << positionMarker->getCached()
       << ", finished=" << isFinished() << "}";
    return ss.str();
}

Segment* ObjectBufferBuilder::GetSegment()
{
    return objSegment;
}
} // namespace omnistream