* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNISTREAM_FILESTATEHANDLE_H
#define OMNISTREAM_FILESTATEHANDLE_H
#include "core/fs/FSDataInputStream.h"
#include "runtime/state/StreamStateHandle.h"
#include "runtime/state/PhysicalStateHandleID.h"
#include "core/fs/Path.h"
#include <fstream>
#include <filesystem>
#include <nlohmann/json.hpp>
class FileStateHandle : public StreamStateHandle {
public:
FileStateHandle(const Path& filePath, long stateSize)
: filePath_(filePath), stateSize_(stateSize)
{
if (stateSize < -1) {
throw std::invalid_argument("stateSize must be >= -1");
}
}
FileStateHandle(const nlohmann::json& json)
: filePath_(json.contains("filePath") ? Path(json["filePath"].get<std::string>())
: throw std::invalid_argument("FileStateHandle 'filePath' field missing")),
stateSize_(json.contains("stateSize") ? json["stateSize"].get<long>()
: throw std::invalid_argument("FileStateHandle 'stateSize' field missing"))
{
if (stateSize_ < -1) {
throw std::invalid_argument("stateSize must be >= -1");
}
}
const Path& GetFilePath() const
{
return filePath_;
}
std::shared_ptr<FSDataInputStream> OpenInputStream() const override
{
return nullptr;
}
std::optional<std::vector<uint8_t>> AsBytesIfInMemory() const override
{
return std::nullopt;
}
PhysicalStateHandleID GetStreamStateHandleID() const override
{
return PhysicalStateHandleID(filePath_.toString());
}
void DiscardState() override
{
auto fsPath = filePath_.toFilesystemPath();
bool removed = std::filesystem::remove(fsPath);
if (!removed && std::filesystem::exists(fsPath)) {
throw std::runtime_error("Failed to delete file " + filePath_.toString());
}
auto parent = fsPath.parent_path();
if (!parent.empty() && parent != fsPath) {
try {
if (std::filesystem::is_empty(parent)) {
std::filesystem::remove(parent);
}
} catch (...) {
}
}
}
long GetStateSize() const override
{
return stateSize_;
}
std::string ToString() const override
{
nlohmann::json j;
j["@class"] = "org.apache.flink.runtime.state.filesystem.FileStateHandle";
j["filePath"] = filePath_.toString();
j["stateSize"] = stateSize_;
j["relativePath"] = filePath_.toString();
j["streamStateHandleID"] = nlohmann::json::parse(GetStreamStateHandleID().ToString());
return j.dump();
}
bool operator==(const StreamStateHandle& other) const override
{
auto* o = dynamic_cast<const FileStateHandle*>(&other);
if (!o) return false;
return filePath_ == o->filePath_ && stateSize_ == o->stateSize_;
}
private:
Path filePath_;
long stateSize_;
std::string relativePath;
};
#endif