/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */


// ResultPartitionManager.cpp
#include "ResultPartitionManager.h"
#include <iostream>
#include "PartitionNotFoundException.h"

namespace omnistream {

ResultPartitionManager::ResultPartitionManager() : registeredPartitions(), isShutdown(false)
{
}

ResultPartitionManager::~ResultPartitionManager()
{
    shutdown();
}

void ResultPartitionManager::registerResultPartition(std::shared_ptr<ResultPartition> partition)
{
    std::cout<<"ResultPartitionManager::registerResultPartition: "<<partition.use_count()<<std::endl;
    std::lock_guard<std::recursive_mutex> lock(mutex_);
    if (isShutdown) {
        throw std::runtime_error("Result partition manager already shut down.");
    }

    auto result = registeredPartitions.insert({partition->getPartitionId(), partition});
    if (!result.second) {
        THROW_RUNTIME_ERROR("Result partition already registered.")
    }

    LOG_PART("Registered " << partition->toString() << std::endl)
}

std::shared_ptr<ResultSubpartitionView> ResultPartitionManager::createSubpartitionView(
    const ResultPartitionIDPOD& partitionId,
    int subpartitionIndex,
    BufferAvailabilityListener* availabilityListener)
{
    LOG("Requesting subpartition " << subpartitionIndex << " of " << partitionId.toString() << std::endl)

    LOCK_BEFORE()
    std::lock_guard<std::recursive_mutex> lock(mutex_);
    LOCK_AFTER()

    auto it = registeredPartitions.find(partitionId);
    if (it == registeredPartitions.end()) {
        throw PartitionNotFoundException("Result partition not found: " + partitionId.toString());
    }

    std::shared_ptr<ResultPartition> partition = it->second;
    LOG_PART("Requesting subpartition " << subpartitionIndex << " of " << partition->toString() << std::endl)

    return partition->createSubpartitionView(subpartitionIndex, availabilityListener);
}

void ResultPartitionManager::releasePartition(const ResultPartitionIDPOD& partitionId, std::optional<std::exception_ptr>  cause)
{
    std::lock_guard<std::recursive_mutex> lock(mutex_);
    auto it = registeredPartitions.find(partitionId);
    if (it != registeredPartitions.end()) {
        std::shared_ptr<ResultPartition> resultPartition = it->second;
        registeredPartitions.erase(it);
        resultPartition->release(cause);
        std::cout << "Released partition " << partitionId.toString() << " produced by " << partitionId.toString() << std::endl;
    }
}

void ResultPartitionManager::shutdown()
{
    std::lock_guard<std::recursive_mutex> lock(mutex_);
    std::cout << "Releasing " << registeredPartitions.size() << " partitions because of shutdown." << std::endl;

    for (auto& pair : registeredPartitions) {
        pair.second->release();
    }

    registeredPartitions.clear();
    isShutdown = true;
    std::cout << "Successful shutdown." << std::endl;
}

void ResultPartitionManager::onConsumedPartition(std::shared_ptr<ResultPartition> partition)
{
    std::lock_guard<std::recursive_mutex> lock(mutex_);
    auto it = registeredPartitions.find(partition->getPartitionId());
    if (it != registeredPartitions.end() && it->second == partition) {
        registeredPartitions.erase(partition->getPartitionId());
        partition->release();
    }
}

std::vector<ResultPartitionIDPOD> ResultPartitionManager::getUnreleasedPartitions()
{
    std::lock_guard<std::recursive_mutex> lock(mutex_);
    std::vector<ResultPartitionIDPOD> result;
    for (const auto& pair : registeredPartitions) {
        result.push_back(pair.first);
    }
    return result;
}

std::string ResultPartitionManager::toString()
{
    return "ResultPartitionManager";
}
} // namespace omnistream