/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

#include "ResultPartition.h"
#include <iostream>
#include <buffer/ObjectBufferPoolFactory.h>
#include <executiongraph/descriptor/ResultPartitionIDPOD.h>
#include <partition/ResultPartitionManager.h>

namespace omnistream {

const std::string ResultPartition::LOG_NAME = "ResultPartition";

ResultPartition::ResultPartition(
    const std::string& owningTaskName,
    int partitionIndex,
    const ResultPartitionIDPOD& partitionId,
    int partitionType,
    int numSubpartitions,
    int numTargetKeyGroups,
    std::shared_ptr<ResultPartitionManager> partitionManager,
    std::shared_ptr<Supplier<BufferPool>> bufferPoolFactory,
    int taskType)
    : owningTaskName(owningTaskName),
      partitionIndex(partitionIndex),
      partitionId(partitionId),
      partitionType(partitionType),
      partitionManager(partitionManager),
      numSubpartitions(numSubpartitions),
      numTargetKeyGroups(numTargetKeyGroups),
      bufferPoolFactory_(bufferPoolFactory),
      taskType(taskType)
{
    LOG_PART("Inside constructor");
}

void ResultPartition::setup()
{
    if (bufferPool != nullptr) {
        THROW_RUNTIME_ERROR("Bug in result partition setup logic: Already registered buffer pool.")
    }

    LOG_PART("Before get buffer pool");
    bufferPool = bufferPoolFactory_->get();
    LOG_PART("before register partition");
    this->partitionManager->registerResultPartition(shared_from_this());

    LOG_PART(" after register partition")
}

std::string ResultPartition::getOwningTaskName() const
{
    return owningTaskName;
};

    ResultPartitionIDPOD ResultPartition::getPartitionId()
    {
    return partitionId;
}

int ResultPartition::getPartitionIndex() const
{
    return partitionIndex;
}

int ResultPartition::getNumberOfSubpartitions()
{
    return numSubpartitions;
}

std::shared_ptr<BufferPool> ResultPartition::getBufferPool()
{
    return bufferPool;
}

int ResultPartition::getPartitionType() const
{
    return partitionType;
}

std::shared_ptr<CompletableFuture> ResultPartition::getAllDataProcessedFuture()
{
    throw std::runtime_error("UnsupportedOperationException");
}

void ResultPartition::onSubpartitionAllDataProcessed(int subpartition) {}

void ResultPartition::finish()
{
    checkInProduceState();
    isFinished_ = true;
}

bool ResultPartition::isFinished()
{
    return isFinished_;
}

void ResultPartition::release()
{
    release(nullptr);
}

void ResultPartition::release(std::optional<std::exception_ptr> cause)
{
        bool expected = false;
        if (isReleased_.compare_exchange_strong(expected, true)) {
            std::cout << owningTaskName << ": Releasing " << toString() << std::endl;

            if (cause != nullptr) {
                this->cause = cause;
            }

            releaseInternal();
        }
    }
    void ResultPartition::closeBufferPool()
    {
        if (bufferPool != nullptr) {
            bufferPool->lazyDestroy();
        }
    }
    void ResultPartition::close()
    {
        this->closeBufferPool();
    }

void ResultPartition::fail(std::optional<std::exception_ptr>  throwable)
{
    if (bufferPool != nullptr) {
        bufferPool->lazyDestroy();
    }
    partitionManager->releasePartition(partitionId, throwable);
}

std::optional<std::exception_ptr>  ResultPartition::getFailureCause()
{
    return cause;
}

int ResultPartition::getNumTargetKeyGroups()
{
    return numTargetKeyGroups;
}


bool ResultPartition::isReleased()
{
    return isReleased_.load();
}

std::shared_ptr<CompletableFuture> ResultPartition::GetAvailableFuture()
{
    return bufferPool->GetAvailableFuture();
}

std::string ResultPartition::toString() const
{
    return "ResultPartition " + partitionId.toString() + " [" + std::to_string(partitionType) + ", " + std::to_string(numSubpartitions) + " subpartitions]";
}

std::shared_ptr<ResultPartitionManager> ResultPartition::getPartitionManager()
{
    return partitionManager;
}

void ResultPartition::checkInProduceState() const
{
    if (isFinished_) {
        throw std::runtime_error("Partition already finished.");
    }
}

void ResultPartition::OnConsumedSubpartition(int subpartitionIndex)
    {
        std::cout<<"you are in ResultPartition::OnConsumedSubpartition"<<std::endl;
        if (isReleased_.load()) {
            return;
        }
    std::cout << toString() << ": Received release notification for subpartition " << subpartitionIndex << "." << std::endl;
}

} // namespace omnistream