/**
 * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/**
 * Description: Define the basic unit of the shared memory in the server side.
 * Support allocate and free shared memory.
 */

#include "datasystem/common/shared_memory/shm_unit.h"

#include <utility>

#include <securec.h>
#ifdef WITH_TESTS
#include "datasystem/common/inject/inject_point.h"
#endif
#include "datasystem/common/metrics/kv_metrics.h"
#include "datasystem/common/shared_memory/allocator.h"
#include "datasystem/common/string_intern/string_ref.h"
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/status_helper.h"
#include "datasystem/utils/status.h"

namespace datasystem {

ShmUnit::ShmUnit() : ShmUnitInfo()
{
    METRIC_INC(metrics::KvMetricId::WORKER_SHM_UNIT_CREATED_TOTAL);
}

ShmUnit::ShmUnit(int fd, uint64_t mmapSz) : ShmUnitInfo(fd, mmapSz)
{
    METRIC_INC(metrics::KvMetricId::WORKER_SHM_UNIT_CREATED_TOTAL);
}

ShmUnit::ShmUnit(ShmKey id, ShmView shmView, void *pointer) : ShmUnitInfo(std::move(id), shmView, pointer)
{
    METRIC_INC(metrics::KvMetricId::WORKER_SHM_UNIT_CREATED_TOTAL);
}

ShmUnit::~ShmUnit()
{
    VLOG(1) << "Release memory of " << id << " Size: " << size << " Off: " << offset;
    Status rc = this->FreeMemory();
    if (rc.IsError()) {
        LOG(WARNING) << "Destructor for a ShmUnit failed to free memory.";
    }
    METRIC_INC(metrics::KvMetricId::WORKER_SHM_UNIT_DESTROYED_TOTAL);
}

std::string ShmUnit::GetTenantId()
{
    return tenantId_;
}

ShmView ShmUnit::GetShmView()
{
    return { fd, mmapSize, offset, size };
}

void ShmUnit::SetHardFreeMemory()
{
    needHardFree_ = true;
}

Status ShmUnit::FreeMemory()
{
    RETURN_OK_IF_TRUE(pointer == nullptr);
    // If shm owner exists, the memory will be freed together at shmOwner destruction.
    if (shmOwner_) {
        shmOwner_.reset();
        return Status::OK();
    }
    // This call will set pointer to nullptr on success.
    VLOG(1) << "[ShmUnit] Arena FreeMemory, Tenant:" << (tenantId_.empty() ? "Default" : tenantId_)
            << ", needHardFree: " << needHardFree_;
#ifdef WITH_TESTS
    INJECT_POINT("ShmUnit.FreeMemory", [this]() {
        needHardFree_ = true;
        return Status::OK();
    });
#endif
    if (needHardFree_) {
        int ret = memset_s(pointer, size, 0, size);
        if (ret != EOK) {
            LOG(WARNING) << FormatString("[ShmId %s] memset failed, error code: %d.", id, ret);
        }
    }
    return datasystem::memory::Allocator::Instance()->FreeMemory(tenantId_, pointer, serviceType_, cacheType_);
}

Status ShmUnit::AllocateMemory(const std::string &tenantId, uint64_t needSize, bool populate, ServiceType serviceType,
                               memory::CacheType cacheType)
{
    VLOG(1) << "[ShmUnit] AllocateMemory, Tenant: " << (tenantId.empty() ? "Default" : tenantId)
            << ", size: " << needSize << ", cachetype: " << static_cast<int>(cacheType);
    serviceType_ = serviceType;
    cacheType_ = cacheType;
    RETURN_IF_NOT_OK(datasystem::memory::Allocator::Instance()->AllocateMemory(
        tenantId, needSize, populate, pointer, fd, offset, mmapSize, numaId, serviceType_, cacheType_));
    size = needSize;
    tenantId_ = tenantId;
    return Status::OK();
}

Status ShmOwner::DistributeMemory(uint64_t shmSize, ShmUnit &shmUnit)
{
    // Distribute allocated memory to individual shmUnit.
    // Note: Parallel distribute memory is supported via atomic cursor.
    uint64_t positionCursor = AllocatePosition(shmSize);
    CHECK_FAIL_RETURN_STATUS(positionCursor + shmSize <= size, K_RUNTIME_ERROR,
                             "Object needs more memory than available.");
    shmUnit.size = shmSize;
    shmUnit.pointer = reinterpret_cast<void *>(reinterpret_cast<uint64_t>(pointer) + positionCursor);
    shmUnit.fd = fd;
    shmUnit.offset = offset + positionCursor;
    shmUnit.mmapSize = mmapSize;
    shmUnit.serviceType_ = serviceType_;
    shmUnit.cacheType_ = cacheType_;
    shmUnit.tenantId_ = tenantId_;
    shmUnit.needHardFree_ = needHardFree_;
    shmUnit.shmOwner_ = shared_from_this();
    shmUnit.numaId = numaId;
    return Status::OK();
}

uint64_t ShmOwner::AllocatePosition(uint64_t shmSize)
{
    return cursor_.fetch_add(Align4BitsCeiling(shmSize), std::memory_order_acq_rel);
}

uint64_t Align4BitsCeiling(uint64_t size)
{
    const uint64_t alignmentMask = 3;
    return (size + alignmentMask) & ~(alignmentMask);
}
}  // namespace datasystem