* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "FsCheckpointStateOutputStream.h"
#include <fstream>
#include <memory>
#include <thread>
#include <common.h>
#include "FileStateHandle.h"
FsCheckpointStateOutputStream::FsCheckpointStateOutputStream(
Path basePath, int fs,
int bufferSize, int localStateThreshold, bool allowRelativePaths)
: basePath_(std::move(basePath)),
fs_(fs),
bufferSize_(bufferSize),
localStateThreshold_(localStateThreshold),
allowRelativePaths_(allowRelativePaths),
closed_(false) {
tempPath_ = Path(basePath_, relativeStatePath_ + ".tmp").toString();
finalPath_ = Path(basePath_, relativeStatePath_).toString();
handle = nullptr;
outStream_ = new std::ofstream(tempPath_, std::ios::binary);
if (!static_cast<std::ofstream*>(outStream_)->is_open()) {
throw std::runtime_error("Failed to open temp file for checkpoint output: " + tempPath_);
}
}
void FsCheckpointStateOutputStream::Write(const void* data, size_t length)
{
static_cast<std::ofstream*>(outStream_)->write(reinterpret_cast<const char*>(data), length);
fileSize += length;
}
void FsCheckpointStateOutputStream::Flush()
{
if (isSync) {
static_cast<std::ofstream*>(this->outStream_)->flush();
} else {
std::thread([this]() {
static_cast<std::ofstream*>(this->outStream_)->flush();
}).detach();
}
}
long FsCheckpointStateOutputStream::GetPos()
{
if (!outStream_) {
throw std::runtime_error("Stream not open");
}
auto* fbuf = static_cast<std::ofstream*>(outStream_)->rdbuf();
std::streampos pos = fbuf->pubseekoff(0, std::ios::cur, std::ios::out);
if (pos < 0) {
throw std::runtime_error("Failed to get current position");
}
return static_cast<long>(pos);
}
void FsCheckpointStateOutputStream::Sync()
{
NOT_IMPL_EXCEPTION
}
void FsCheckpointStateOutputStream::Close()
{
if (!outStream_) {
closed_ = true;
return;
}
std::unique_ptr<std::ofstream> stream(static_cast<std::ofstream*>(outStream_));
outStream_ = nullptr;
if (stream->is_open()) {
stream->close();
if (stream->fail()) {
closed_ = true;
INFO_RELEASE("Exception: Failed to close checkpoint output stream.");
throw std::runtime_error("Failed to close checkpoint output stream");
}
}
closed_ = true;
}
std::shared_ptr<StreamStateHandle> FsCheckpointStateOutputStream::CloseAndGetHandle()
{
if (handle == nullptr) {
handle = std::make_shared<FileStateHandle>(tempPath_, fileSize);
}
return handle;
}
bool FsCheckpointStateOutputStream::IsClosed()
{
return closed_;
}