/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <config.h>
#if USE_ROCKSDB
#include "MetadataStorageFromRocksDBTransactionOperations.h"

#include <ranges>

namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_STATE;
}
}

namespace local_engine
{

void throwRockDBErrorNotOk(const rocksdb::Status & status)
{
    if (!status.ok())
        throw DB::Exception(DB::ErrorCodes::INVALID_STATE, "Access rocksdb failed: {}", status.ToString());
}

bool exist(rocksdb::DB & db, const std::string & path)
{
    std::string data;
    return tryGetData(db, path, &data);
}

bool tryGetData(rocksdb::DB & db, const std::string & path, std::string * value)
{
    auto status = db.Get({}, path, value);
    if (status.IsNotFound())
        return false;
    throwRockDBErrorNotOk(status);
    return status.ok();
}

String getData(rocksdb::DB & db, const std::string & path)
{
    std::string data;
    throwRockDBErrorNotOk(db.Get({}, path, &data));
    return data;
}

std::vector<String> listKeys(rocksdb::DB & db, const std::string & path)
{
    std::vector<String> result;
    auto *it = db.NewIterator({});
    for (it->Seek(path); it->Valid() && it->key().starts_with(path); it->Next())
    {
        if (it->key() == path)
            continue;
        result.push_back(it->key().ToString());
    }
    return result;
}

void RocksDBWriteFileOperation::execute(std::unique_lock<DB::SharedMutex> &)
{
    auto status = db.Get({}, path, &prev_data);
    if (status.IsNotFound())
        existed = false;
    else
        throwRockDBErrorNotOk(status);
    db.Put({}, path, data);
}

void RocksDBWriteFileOperation::undo(std::unique_lock<DB::SharedMutex> &)
{
    if (existed)
        throwRockDBErrorNotOk(db.Put({}, path, prev_data));
    else
        throwRockDBErrorNotOk(db.Delete({}, path));
}

void RocksDBCreateDirectoryOperation::execute(std::unique_lock<DB::SharedMutex> &)
{
    existed = exist(db, path);
    if (existed)
        return;
    throwRockDBErrorNotOk(db.Put({}, path, DIR_DATA));
}

void RocksDBCreateDirectoryOperation::undo(std::unique_lock<DB::SharedMutex> &)
{
    if (existed) return;
    throwRockDBErrorNotOk(db.Delete({}, path));
}

void RocksDBCreateDirectoryRecursiveOperation::execute(std::unique_lock<DB::SharedMutex> & )
{
    namespace fs = std::filesystem;
    fs::path p(path);
    while (!exist(db, p.string()))
    {
        paths_created.push_back(p);
        if (!p.has_parent_path())
            break;
        p = p.parent_path();
    }
    for (const auto & path_to_create : paths_created | std::views::reverse)
        throwRockDBErrorNotOk(db.Put({}, path_to_create, RocksDBCreateDirectoryOperation::DIR_DATA));
}

void RocksDBCreateDirectoryRecursiveOperation::undo(std::unique_lock<DB::SharedMutex> & )
{
    for (const auto & path_created : paths_created)
        throwRockDBErrorNotOk(db.Delete({}, path_created));
}

void RocksDBRemoveDirectoryOperation::execute(std::unique_lock<DB::SharedMutex> &)
{
    auto *it = db.NewIterator({});
    bool empty_dir = true;
    for (it->Seek(path); it->Valid() && it->key().starts_with(path); it->Next())
    {
        if (it->key() != path && it->key().starts_with(path))
        {
            empty_dir = false;
            break;
        }
        if (it->key() == path)
            existed = true;
    }
    if (!empty_dir)
    {
        throw DB::Exception(DB::ErrorCodes::INVALID_STATE, "Directory {} is not empty", path);
    }
    if (existed)
        throwRockDBErrorNotOk(db.Delete({}, path));
}

void RocksDBRemoveDirectoryOperation::undo(std::unique_lock<DB::SharedMutex> &)
{
    if (existed)
        throwRockDBErrorNotOk(db.Put({}, path, RocksDBCreateDirectoryOperation::DIR_DATA));
}

void RocksDBRemoveRecursiveOperation::execute(std::unique_lock<DB::SharedMutex> &)
{
    auto *it = db.NewIterator({});
    for (it->Seek(path); it->Valid() && it->key().starts_with(path); it->Next())
    {
        files.emplace(it->key().ToString(), it->value().ToString());
        throwRockDBErrorNotOk(db.Delete({}, it->key()));
    }
}

void RocksDBRemoveRecursiveOperation::undo(std::unique_lock<DB::SharedMutex> &)
{
    for (const auto & [key, value] : files)
        throwRockDBErrorNotOk(db.Put({}, key, value));
}

void RocksDBUnlinkFileOperation::execute(std::unique_lock<DB::SharedMutex> &)
{
    prev_data = getData(db, path);
    throwRockDBErrorNotOk(db.Delete({}, path));
}

void RocksDBUnlinkFileOperation::undo(std::unique_lock<DB::SharedMutex> &)
{
    throwRockDBErrorNotOk(db.Put({}, path, prev_data));
}
}
#endif