/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

# pragma once

#include <emhash7.hpp>
#include "core/typeutils/TypeSerializer.h"
#include "core/api/common/state/StateDescriptor.h"
#include "table/data/vectorbatch/VectorBatch.h"
#include "runtime/state/RocksDbKvStateInfo.h"
#include "RocksdbMapStateTable.h"
#include "state/RocksIteratorWrapper.h"
#include "state/RocksDBWriteBatchWrapper.h"
#include "RocksDbOperationUtils.h"

// The state is a map. in the InternalKvState, the state is stored as a pointer to emhash7
template<typename K, typename N, typename UK, typename UV>
class RocksdbMapState : public MapState<UK, UV>, public InternalKvState<K, N, emhash7::HashMap<UK, UV> *> {
public:
    RocksdbMapState(RocksdbMapStateTable<K, N, UK, UV> *stateTable, TypeSerializer *keySerializer,
            TypeSerializer *valueSerializer, TypeSerializer *namespaceSerializer);

    ~RocksdbMapState() = default;

    TypeSerializer *getKeySerializer() const { return keySerializer; };

    TypeSerializer *getNamespaceSerializer() const { return namespaceSerializer; };

    TypeSerializer *getValueSerializer() const { return valueSerializer; };

    void setNamespaceSerializer(TypeSerializer *serializer) { namespaceSerializer = serializer; };

    void setValueSerializer(TypeSerializer *serializer) { valueSerializer = serializer; };

    std::optional<UV> get(const UK &userKey) override;

    // for DataStream used
    Object* Get(Object* userKey) override;
    void GetByBatch(std::unordered_map<K,std::unordered_set<XXH128_hash_t>> &dataToGet,std::unordered_map<std::pair<K,XXH128_hash_t>,UV> &result);


    void put(const UK &userKey, const UV &userValue) override;
    void putByBatch(const K &key,const std::unordered_map<UK,UV> &dataToAdd);
    void putByBatch(std::unordered_map<K,std::unordered_map<UK,UV>> &dataToAdd);
    void putByBatch(std::unordered_map<K, std::vector<std::tuple<UK, UV>>> &dataToAdd);
    void putByBatch(std::vector<std::shared_ptr<std::tuple<K,UK,std::shared_ptr<std::string>>>> &dataToAdd);
    void putByBatch(std::vector<std::shared_ptr<std::tuple<K,UK,UV>>> &dataToAdd);
    void putByBatch(std::vector<std::tuple<K,UK,UV>> &dataToAdd);

    void remove(const UK &userKey) override;
    void removeByBatch(std::unordered_map<K,std::unordered_set<UK>> &dataToRemove);

    bool contains(const UK &userKey) override;

    void update(const UK &key, const UV &value) override;

    void setCurrentNamespace(N nameSpace) override;

    void clear() override;
    void clearEntriesCache();
    static RocksdbMapState<K, N, UK, UV> *
    create(StateDescriptor *stateDesc, RocksdbMapStateTable<K, N, UK, UV> *stateTable,
    TypeSerializer *keySerializer);

    static RocksdbMapState<K, N, UK, UV> *
    update(StateDescriptor *stateDesc, RocksdbMapStateTable<K, N, UK, UV> *stateTable,
    RocksdbMapState<K, N, UK, UV> *existingState);

    // This gets the pointer to the actual map (a value for state),
    // like Join's emhash<RowData*, int> with currentNamespace and currentKey
    emhash7::HashMap<UK, UV> *entries() override;

    // for DataStream used.
    java_util_Iterator* iterator() override;

    // for common scenario
    std::unique_ptr<typename MapState<UK, UV>::IteratorV2> iteratorV2() override;

    void addVectorBatch(omnistream::VectorBatch* vectorBatch) override;
    omnistream::VectorBatch *getVectorBatch(int batchId) override;
    long getVectorBatchesSize() override;
    void clearVectors(int64_t currentTimestamp) override;
    void clearVectors(std::vector<size_t>& indicesToDelete) override;

    void createTable(ROCKSDB_NAMESPACE::DB* db, std::string cfName,
                     std::unordered_map<std::string, std::shared_ptr<RocksDbKvStateInfo>> *kvStateInformation);
    std::shared_ptr<std::string> getRawBytes(UK& uk);

    void setMaxParallelism(const int32_t maxParallelism) {
        maxParallelism_ = maxParallelism;
    }

private:
    RocksdbMapStateTable<K, N, UK, UV> *stateTable;
    TypeSerializer *keySerializer;
    TypeSerializer *valueSerializer;
    TypeSerializer *namespaceSerializer;
    N currentNamespace;
    int32_t maxParallelism_;
};

template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::clear() {
    std::unique_ptr<RocksIteratorWrapper> iterator = RocksDbOperationUtils::getRocksIterator(
                    stateTable->getRocksDB(), stateTable->getColumnFamily(), stateTable->getReadOptions());

    auto batchSizeObj = reinterpret_cast<String*>(Configuration::TM_CONFIG->getValue(RocksDBConfigurableOptions::WRITE_BATCH_SIZE));
    auto batchSize = MemorySize::parseBytes(batchSizeObj->getData());
    if (batchSizeObj != nullptr) {
        delete batchSizeObj;
    }
    RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper(
            stateTable->getRocksDB(),
            std::shared_ptr<rocksdb::WriteOptions>(stateTable->getWriteOptions()),
            500,
            batchSize);
    DataOutputSerializer outputSerializer;
    OutputBufferStatus outputBufferStatus;
    outputSerializer.setBackendBuffer(&outputBufferStatus);
    ROCKSDB_NAMESPACE::Slice keyPrefixBytes = stateTable->GetKeyNameSpaceSlice(outputSerializer, currentNamespace);
    iterator->seek(keyPrefixBytes);

    while (iterator->isValid()) {
        ROCKSDB_NAMESPACE::Slice keyBytes = iterator->key();
        if (keyBytes.starts_with(keyPrefixBytes)) {
            rocksDBWriteBatchWrapper.Delete(stateTable->getColumnFamily(), keyBytes);
        } else {
            break;
        }
        iterator->next();
    }
}

template<typename K, typename N, typename UK, typename UV>
emhash7::HashMap<UK, UV> *RocksdbMapState<K, N, UK, UV>::entries()
{
    return stateTable->entries(currentNamespace);
}

template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::clearEntriesCache()
{
    stateTable->entriesCache.clear();
}

template<typename K, typename N, typename UK, typename UV>
java_util_Iterator* RocksdbMapState<K, N, UK, UV>::iterator() {
    if constexpr (std::is_same_v<UK, Object*> && std::is_same_v<UV, Object*>) {
        return stateTable->iterator(currentNamespace);
    } else {
        THROW_LOGIC_EXCEPTION("type is not Object in RocksdbMapState::iterator()")
    }
}

template<typename K, typename N, typename UK, typename UV>
std::unique_ptr<typename MapState<UK, UV>::IteratorV2> RocksdbMapState<K, N, UK, UV>::iteratorV2() {
    return stateTable->iteratorV2(currentNamespace);
}

template<typename K, typename N, typename UK, typename UV>
std::optional<UV> RocksdbMapState<K, N, UK, UV>::get(const UK &userKey)
{
    if (stateTable == nullptr) {
        throw std::runtime_error("RocksdbMapStateTable is not initialized.");
    }
    UV uv = stateTable->get(currentNamespace, userKey);
    if constexpr (std::is_pointer_v<UV>) {
        if (uv == nullptr) {
            return std::nullopt;
        }
    } else {
        if (uv == std::numeric_limits<UV>::max()) {
            return std::nullopt;
        }
    }
    return std::make_optional<UV>(uv);
}

template<typename K, typename N, typename UK, typename UV>
Object* RocksdbMapState<K, N, UK, UV>::Get(Object* userKey)
{
    if (stateTable == nullptr) {
        throw std::runtime_error("RocksdbMapStateTable is not initialized.");
    }
    if constexpr (std::is_same_v<UV, Object*>) {
        UV uv = stateTable->get(currentNamespace, userKey);
        return uv;
    } else {
        THROW_LOGIC_EXCEPTION("type is not Object in RocksdbMapState::get()")
    }
}

template<typename K, typename N, typename UK, typename UV>
std::shared_ptr<std::string> RocksdbMapState<K, N, UK, UV>::getRawBytes(UK& uk)
{
    return stateTable->getRawBytes(currentNamespace,uk);
}


template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::GetByBatch(std::unordered_map<K,std::unordered_set<XXH128_hash_t>> &dataToGet,std::unordered_map<std::pair<K,XXH128_hash_t>,UV> &result)
{
    if (stateTable == nullptr) {
        throw std::runtime_error("RocksdbMapStateTable is not initialized.");
    }
    stateTable->GetByBatch(currentNamespace, dataToGet,result);
}

template<typename K, typename N, typename UK, typename UV>
RocksdbMapState<K, N, UK, UV>::RocksdbMapState(RocksdbMapStateTable<K, N, UK, UV> *stateTable,
    TypeSerializer *keySerializer, TypeSerializer *valueSerializer, TypeSerializer *namespaceSerializer)
{
    this->keySerializer = keySerializer;
    this->namespaceSerializer = namespaceSerializer;
    this->valueSerializer = valueSerializer;
    this->stateTable = stateTable;
    currentNamespace = N();
}

template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::put(const UK &userKey, const UV &userValue)
{
    stateTable->put(currentNamespace, userKey, userValue);
}

template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::putByBatch(const K &key,const std::unordered_map<UK,UV> &dataToAdd)
{
    stateTable->putByBatch(currentNamespace, key, dataToAdd);
}


template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::putByBatch(std::unordered_map<K,std::unordered_map<UK,UV>> &dataToAdd)
{
    stateTable->putByBatch(currentNamespace, dataToAdd);
}

template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::putByBatch(std::unordered_map<K, std::vector<std::tuple<UK, UV>>> &dataToAdd)
{
    stateTable->putByBatch(currentNamespace,dataToAdd);
}

template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::putByBatch(std::vector<std::shared_ptr<std::tuple<K,UK,std::shared_ptr<std::string>>>> &dataToAdd)
{
    stateTable->putByBatch(currentNamespace, dataToAdd);
}

template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::putByBatch(std::vector<std::shared_ptr<std::tuple<K,UK,UV>>> &dataToAdd)
{
    stateTable->putByBatch(currentNamespace,dataToAdd);
}

template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::putByBatch(std::vector<std::tuple<K,UK,UV>> &dataToAdd)
{
    stateTable->putByBatch(currentNamespace,dataToAdd);
}

template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::remove(const UK &userKey)
{
    stateTable->remove(currentNamespace, userKey);
}
template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::removeByBatch(std::unordered_map<K,std::unordered_set<UK>> &dataToRemove)
{
    stateTable->removeByBatch(currentNamespace, dataToRemove);
}
template<typename K, typename N, typename UK, typename UV>
bool RocksdbMapState<K, N, UK, UV>::contains(const UK &userKey)
{
    return stateTable->contains(currentNamespace, userKey);
}

template <typename K, typename N, typename UK, typename UV>
inline void RocksdbMapState<K, N, UK, UV>::update(const UK &userKey, const UV &userValue)
{
    stateTable->put(currentNamespace, userKey, userValue);
}

template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::setCurrentNamespace(N nameSpace)
{
    currentNamespace = nameSpace;
}

template<typename K, typename N, typename UK, typename UV>
RocksdbMapState<K, N, UK, UV> *RocksdbMapState<K, N, UK, UV>::create(StateDescriptor *stateDesc,
                                                                     RocksdbMapStateTable<K, N, UK, UV> *stateTable,
                                                                     TypeSerializer *keySerializer)
{
return new RocksdbMapState<K, N, UK, UV>(stateTable, keySerializer, stateTable->getStateSerializer(),
                                         stateTable->getNamespaceSerializer());
}

template<typename K, typename N, typename UK, typename UV>
RocksdbMapState<K, N, UK, UV> *RocksdbMapState<K, N, UK, UV>::update(StateDescriptor *stateDesc,
    RocksdbMapStateTable<K, N, UK, UV> *stateTable, RocksdbMapState<K, N, UK, UV> *existingState)
{
existingState->setNamespaceSerializer(stateTable->getNamespaceSerializer());
existingState->setValueSerializer(stateTable->getStateSerializer());
return existingState;
}

template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::addVectorBatch(omnistream::VectorBatch *vectorBatch)
{
    stateTable->addVectorBatch(vectorBatch);
}

template<typename K, typename N, typename UK, typename UV>
omnistream::VectorBatch *RocksdbMapState<K, N, UK, UV>::getVectorBatch(int batchId)
{
    return stateTable->getVectorBatch(batchId);
};

template<typename K, typename N, typename UK, typename UV>
void RocksdbMapState<K, N, UK, UV>::createTable(ROCKSDB_NAMESPACE::DB* db, std::string cfName,
    std::unordered_map<std::string, std::shared_ptr<RocksDbKvStateInfo>> *kvStateInformation)
{
    stateTable->createTable(db, cfName, kvStateInformation);
}

template<typename K, typename N, typename UK, typename UV>
long RocksdbMapState<K, N, UK, UV>::getVectorBatchesSize()
{
    return stateTable->getVectorBatchesSize();
};

template<typename K, typename N, typename UK, typename UV>
void  RocksdbMapState<K, N, UK, UV>::clearVectors(int64_t currentTimestamp){
   
    return stateTable->clearVectors(currentTimestamp);
}

template<typename K, typename N, typename UK, typename UV>
void  RocksdbMapState<K, N, UK, UV>::clearVectors(std::vector<size_t>& indicesToDelete){
   
    return stateTable->clearVectors(indicesToDelete);
}