* Copyright (c) Huawei Technologies Co., Ltd. 2026. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Single-slot storage manager.
*/
#include "datasystem/common/l2cache/slot_client/slot.h"
#include <algorithm>
#include <cerrno>
#include <chrono>
#include <unordered_map>
#include <vector>
#include <fcntl.h>
#include <unistd.h>
#include "datasystem/common/flags/flags.h"
#include "datasystem/common/l2cache/slot_client/slot_compactor.h"
#include "datasystem/common/l2cache/slot_client/slot_file_util.h"
#include "datasystem/common/l2cache/slot_client/slot_internal_config.h"
#include "datasystem/common/l2cache/slot_client/slot_index_codec.h"
#include "datasystem/common/l2cache/slot_client/slot_takeover_planner.h"
#include "datasystem/common/inject/inject_point.h"
#include "datasystem/common/log/log.h"
#include "datasystem/common/util/file_util.h"
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/net_util.h"
#include "datasystem/common/util/raii.h"
#include "datasystem/common/util/status_helper.h"
constexpr uint32_t DEFAULT_SLOT_SYNC_INTERVAL_MS = 1000;
constexpr uint64_t DEFAULT_SLOT_SYNC_BATCH_BYTES = 32UL * 1024UL * 1024UL;
DS_DEFINE_uint32(distributed_disk_sync_interval_ms, DEFAULT_SLOT_SYNC_INTERVAL_MS,
"Max buffered write interval in millisecond before one slot group commit.");
DS_DEFINE_uint64(distributed_disk_sync_batch_bytes, DEFAULT_SLOT_SYNC_BATCH_BYTES,
"Max buffered write bytes before one slot group commit.");
DS_DEFINE_validator(distributed_disk_sync_interval_ms, [](const char *flagName, uint32_t value) {
(void)flagName;
return value <= 60000;
});
DS_DEFINE_validator(distributed_disk_sync_batch_bytes, [](const char *flagName, uint64_t value) {
(void)flagName;
return value > 0;
});
namespace datasystem {
namespace {
constexpr size_t SLOT_IO_CHUNK_BYTES = 4UL * 1024UL * 1024UL;
std::string BaseName(const std::string &path)
{
auto pos = path.find_last_of('/');
return pos == std::string::npos ? path : path.substr(pos + 1);
}
bool IsCompactIndexFile(const std::string &filename)
{
constexpr char prefix[] = "index_compact_";
constexpr char suffix[] = ".log";
return filename.rfind(prefix, 0) == 0 && filename.size() > sizeof(prefix) + sizeof(suffix) - 2
&& filename.substr(filename.size() - (sizeof(suffix) - 1)) == suffix;
}
bool IsImportIndexFile(const std::string &filename)
{
constexpr char prefix[] = "index_import_";
constexpr char suffix[] = ".log";
return filename.rfind(prefix, 0) == 0 && filename.size() > sizeof(prefix) + sizeof(suffix) - 2
&& filename.substr(filename.size() - (sizeof(suffix) - 1)) == suffix;
}
bool ShouldUseAsBootstrapIndex(const std::string &candidateName, size_t candidateValidBytes,
const std::string &bestName, size_t bestValidBytes)
{
if (bestName.empty()) {
return true;
}
if (candidateValidBytes != bestValidBytes) {
return candidateValidBytes > bestValidBytes;
}
auto candidateIsCompact = IsCompactIndexFile(candidateName);
auto bestIsCompact = IsCompactIndexFile(bestName);
if (candidateIsCompact != bestIsCompact) {
return candidateIsCompact;
}
return candidateName > bestName;
}
std::string GenerateTxnId(uint32_t slotId)
{
const auto nowNs =
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch());
return std::to_string(slotId) + "_" + std::to_string(nowNs.count());
}
std::string BuildRecoverySlotPath(const std::string &sourceSlotPath, const std::string &txnId)
{
return sourceSlotPath + ".recovery." + txnId;
}
bool IsNormalManifest(const SlotManifestData &manifest)
{
return manifest.state == SlotState::NORMAL && manifest.opType == SlotOperationType::NONE
&& manifest.opPhase == SlotOperationPhase::NONE && manifest.role == SlotOperationRole::NONE;
}
bool IsTransferManifest(const SlotManifestData &manifest)
{
return manifest.state == SlotState::IN_OPERATION && manifest.opType == SlotOperationType::TRANSFER;
}
std::string ManifestDebugString(const SlotManifestData &manifest)
{
return FormatString("state=%s, opType=%s, opPhase=%s, role=%s, txnId=%s, activeIndex=%s, activeDataCount=%zu, "
"pendingIndex=%s, pendingDataCount=%zu, obsoleteIndex=%s, obsoleteDataCount=%zu, gcPending=%d",
ToString(manifest.state), ToString(manifest.opType), ToString(manifest.opPhase),
ToString(manifest.role), manifest.txnId, manifest.activeIndex, manifest.activeData.size(),
manifest.pendingIndex, manifest.pendingData.size(), manifest.obsoleteIndex,
manifest.obsoleteData.size(), manifest.gcPending);
}
std::string EncodeTransferFileMap(const SlotTakeoverPlan &plan)
{
std::ostringstream ss;
for (size_t i = 0; i < plan.dataMappings.size(); ++i) {
if (i > 0) {
ss << ",";
}
ss << plan.dataMappings[i].sourceFileId << ":" << plan.dataMappings[i].targetFileId;
}
return ss.str();
}
Status DecodeTransferFileMap(const std::string &encoded, std::unordered_map<uint32_t, uint32_t> &mapping)
{
mapping.clear();
if (encoded.empty()) {
return Status::OK();
}
for (const auto &item : Split(encoded, ",")) {
auto fields = Split(item, ":");
CHECK_FAIL_RETURN_STATUS(fields.size() == 2, StatusCode::K_INVALID, "Invalid transfer file map entry: " + item);
try {
auto sourceFileId = static_cast<uint32_t>(std::stoul(fields[0]));
auto targetFileId = static_cast<uint32_t>(std::stoul(fields[1]));
mapping[sourceFileId] = targetFileId;
} catch (const std::exception &e) {
RETURN_STATUS(StatusCode::K_INVALID, std::string("Invalid transfer file map entry: ") + e.what());
}
}
return Status::OK();
}
Status RenameDataFileIfNeeded(const std::string &sourcePath, const std::string &targetPath)
{
if (FileExist(targetPath)) {
if (FileExist(sourcePath)) {
RETURN_IF_NOT_OK(DeleteFile(sourcePath));
}
return Status::OK();
}
if (!FileExist(sourcePath)) {
RETURN_STATUS(StatusCode::K_NOT_FOUND, "Source data file not found during takeover: " + sourcePath);
}
RETURN_IF_NOT_OK(RenameFile(sourcePath, targetPath));
return Status::OK();
}
bool HasClosedImportBatch(const std::vector<SlotRecord> &records, const std::string &txnId)
{
bool importBatchOpen = false;
for (const auto &record : records) {
if (record.type == SlotRecordType::IMPORT_BEGIN && record.import.txnId == txnId) {
importBatchOpen = true;
continue;
}
if (record.type == SlotRecordType::IMPORT_END && record.import.txnId == txnId && importBatchOpen) {
return true;
}
}
return false;
}
uint32_t GetCurrentSlotSyncIntervalMs()
{
return FLAGS_distributed_disk_sync_interval_ms;
}
uint64_t GetCurrentSlotSyncBatchBytes()
{
return FLAGS_distributed_disk_sync_batch_bytes;
}
uint64_t GetCurrentSlotCompactCutoverBytes()
{
return DISTRIBUTED_DISK_COMPACT_CUTOVER_BYTES;
}
uint32_t GetCurrentSlotCompactCutoverRecords()
{
return DISTRIBUTED_DISK_COMPACT_CUTOVER_RECORDS;
}
}
Slot::Slot(uint32_t slotId, std::string slotPath, uint64_t maxDataFileBytes)
: slotId_(slotId), slotPath_(std::move(slotPath)), maxDataFileBytes_(maxDataFileBytes)
{
runtime_.Reset();
}
Status Slot::EnsureRuntimeReadyLocked()
{
if (runtime_.initialized && !IsRuntimeStaleLocked()) {
return Status::OK();
}
ResetRuntimeLocked();
RETURN_IF_NOT_OK(BuildRuntimeStateLocked());
return Status::OK();
}
Status Slot::BuildRuntimeStateLocked()
{
VLOG(1) << "Building slot runtime state, slotId=" << slotId_ << ", slotPath=" << slotPath_;
RETURN_IF_NOT_OK(BootstrapManifestIfNeed());
SlotManifestData manifest;
RETURN_IF_NOT_OK(LoadManifest(manifest));
RETURN_IF_NOT_OK(RecoverManifestIfNeeded(manifest));
SlotSnapshot snapshot;
RETURN_IF_NOT_OK(SlotSnapshot::Replay(slotPath_, manifest, snapshot));
runtime_.manifest = manifest;
runtime_.snapshot = snapshot;
auto manifestPath = JoinPath(slotPath_, "manifest");
RETURN_IF_NOT_OK(GetFileModifiedTime(manifestPath, runtime_.manifestModifiedTimeUs));
runtime_.initialized = true;
RETURN_IF_NOT_OK(writer_.Init(slotPath_, manifest));
VLOG(1) << "Built slot runtime state successfully, slotId=" << slotId_ << ", slotPath=" << slotPath_
<< ", " << ManifestDebugString(manifest);
return Status::OK();
}
bool Slot::IsRuntimeStaleLocked() const
{
if (!runtime_.initialized) {
return true;
}
int64_t currentManifestMtimeUs = 0;
auto rc = GetFileModifiedTime(JoinPath(slotPath_, "manifest"), currentManifestMtimeUs);
if (rc.IsError()) {
return true;
}
return currentManifestMtimeUs != runtime_.manifestModifiedTimeUs;
}
Status Slot::FlushRuntimeLocked(bool force)
{
if (!writer_.IsInitialized()) {
return Status::OK();
}
if (!force && !writer_.ShouldFlush(GetCurrentSlotSyncIntervalMs(), GetCurrentSlotSyncBatchBytes())) {
return Status::OK();
}
auto pendingOps = writer_.GetPendingOps();
RETURN_IF_NOT_OK(writer_.Flush());
INJECT_POINT("slotstore.SlotWriter.Flush.After", []() { return Status::OK(); });
if (pendingOps > 0) {
VLOG(1) << "slot flush finished, slotId=" << slotId_ << ", pendingOps=" << pendingOps;
}
return Status::OK();
}
void Slot::ResetRuntimeLocked()
{
writer_.Close();
runtime_.Reset();
}
Status Slot::AllocateNextDataFileIdLocked(uint32_t &fileId) const
{
CHECK_FAIL_RETURN_STATUS(runtime_.initialized, StatusCode::K_RUNTIME_ERROR, "Slot runtime is not initialized");
uint32_t maxFileId = 0;
for (const auto &dataFile : runtime_.manifest.activeData) {
uint32_t candidate = 0;
RETURN_IF_NOT_OK(ParseDataFileId(dataFile, candidate));
maxFileId = std::max(maxFileId, candidate);
}
fileId = maxFileId + 1;
return Status::OK();
}
Status Slot::RotateWritableDataFileLocked(uint64_t payloadSize, uint32_t &fileId)
{
CHECK_FAIL_RETURN_STATUS(runtime_.initialized, StatusCode::K_RUNTIME_ERROR, "Slot runtime is not initialized");
if (runtime_.manifest.activeData.empty()) {
runtime_.manifest.activeData.emplace_back(FormatDataFileName(1));
RETURN_IF_NOT_OK(PersistManifest(runtime_.manifest));
RETURN_IF_NOT_OK(writer_.Init(slotPath_, runtime_.manifest));
VLOG(1) << "Bootstrapped first active data file for slot, slotId=" << slotId_
<< ", dataFile=" << runtime_.manifest.activeData.back();
}
fileId = writer_.GetActiveDataFileId();
if (writer_.GetActiveDataSize() + payloadSize <= maxDataFileBytes_) {
return Status::OK();
}
RETURN_IF_NOT_OK(FlushRuntimeLocked(true));
RETURN_IF_NOT_OK(AllocateNextDataFileIdLocked(fileId));
runtime_.manifest.activeData.emplace_back(FormatDataFileName(fileId));
RETURN_IF_NOT_OK(PersistManifest(runtime_.manifest));
RETURN_IF_NOT_OK(writer_.Init(slotPath_, runtime_.manifest));
VLOG(1) << "Rotated writable slot data file, slotId=" << slotId_ << ", payloadSize=" << payloadSize
<< ", newDataFile=" << runtime_.manifest.activeData.back();
return Status::OK();
}
Status Slot::AllocateExclusiveDataFileLocked(uint32_t &fileId)
{
CHECK_FAIL_RETURN_STATUS(runtime_.initialized, StatusCode::K_RUNTIME_ERROR, "Slot runtime is not initialized");
CHECK_FAIL_RETURN_STATUS(!runtime_.manifest.activeData.empty(), StatusCode::K_RUNTIME_ERROR,
"Slot active data file list is empty");
RETURN_IF_NOT_OK(AllocateNextDataFileIdLocked(fileId));
runtime_.manifest.activeData.insert(runtime_.manifest.activeData.end() - 1, FormatDataFileName(fileId));
RETURN_IF_NOT_OK(PersistManifest(runtime_.manifest));
VLOG(1) << "Allocated exclusive slot data file, slotId=" << slotId_ << ", dataFile=" << FormatDataFileName(fileId);
return Status::OK();
}
Status Slot::GetPayloadSize(const std::shared_ptr<std::iostream> &body, uint64_t &payloadSize) const
{
CHECK_FAIL_RETURN_STATUS(body != nullptr, StatusCode::K_INVALID, "body is nullptr");
body->clear();
auto currentPos = body->tellg();
CHECK_FAIL_RETURN_STATUS(currentPos != static_cast<std::streampos>(-1), StatusCode::K_INVALID,
"body stream is not seekable");
(void)body->seekg(0, std::ios::end);
auto endPos = body->tellg();
CHECK_FAIL_RETURN_STATUS(endPos != static_cast<std::streampos>(-1), StatusCode::K_INVALID,
"body stream failed to determine payload size");
payloadSize = static_cast<uint64_t>(endPos);
body->clear();
(void)body->seekg(0, std::ios::beg);
return Status::OK();
}
Status Slot::WriteStreamToFd(const std::shared_ptr<std::iostream> &body, int fd, uint64_t startOffset,
uint64_t &writtenBytes) const
{
CHECK_FAIL_RETURN_STATUS(body != nullptr, StatusCode::K_INVALID, "body is nullptr");
CHECK_FAIL_RETURN_STATUS(fd >= 0, StatusCode::K_INVALID, "fd is invalid");
body->clear();
(void)body->seekg(0, std::ios::beg);
std::vector<char> buffer(SLOT_IO_CHUNK_BYTES);
uint64_t offset = startOffset;
while (true) {
body->read(buffer.data(), static_cast<std::streamsize>(buffer.size()));
auto bytesRead = body->gcount();
if (bytesRead > 0) {
RETURN_IF_NOT_OK(WriteFile(fd, buffer.data(), static_cast<size_t>(bytesRead), static_cast<off_t>(offset)));
offset += static_cast<uint64_t>(bytesRead);
}
if (body->eof()) {
break;
}
CHECK_FAIL_RETURN_STATUS(!body->fail(), StatusCode::K_RUNTIME_ERROR, "Failed to read payload stream");
}
writtenBytes = offset - startOffset;
body->clear();
(void)body->seekg(0, std::ios::beg);
return Status::OK();
}
Status Slot::AppendPayloadToActiveFileLocked(const std::shared_ptr<std::iostream> &body, uint64_t payloadSize,
uint64_t &offset)
{
body->clear();
(void)body->seekg(0, std::ios::beg);
std::vector<char> buffer(SLOT_IO_CHUNK_BYTES);
uint64_t writtenBytes = 0;
bool firstChunk = true;
while (true) {
body->read(buffer.data(), static_cast<std::streamsize>(buffer.size()));
auto bytesRead = body->gcount();
if (bytesRead > 0) {
uint64_t chunkOffset = 0;
RETURN_IF_NOT_OK(writer_.AppendData(buffer.data(), static_cast<size_t>(bytesRead), chunkOffset));
if (firstChunk) {
offset = chunkOffset;
firstChunk = false;
}
writtenBytes += static_cast<uint64_t>(bytesRead);
}
if (body->eof()) {
break;
}
CHECK_FAIL_RETURN_STATUS(!body->fail(), StatusCode::K_RUNTIME_ERROR, "Failed to read payload stream");
}
body->clear();
(void)body->seekg(0, std::ios::beg);
CHECK_FAIL_RETURN_STATUS(writtenBytes == payloadSize, StatusCode::K_RUNTIME_ERROR,
"Active data file payload size mismatch");
return Status::OK();
}
Status Slot::WriteExclusivePayloadLocked(const std::shared_ptr<std::iostream> &body, uint32_t fileId,
uint64_t payloadSize) const
{
auto dataPath = JoinPath(slotPath_, FormatDataFileName(fileId));
RETURN_IF_NOT_OK(EnsureFile(dataPath));
int dataFd = -1;
RETURN_IF_NOT_OK(OpenFile(dataPath, O_RDWR, &dataFd));
Raii closeDataFd([&dataFd]() {
if (dataFd >= 0) {
close(dataFd);
dataFd = -1;
}
});
uint64_t writtenBytes = 0;
RETURN_IF_NOT_OK(WriteStreamToFd(body, dataFd, 0, writtenBytes));
CHECK_FAIL_RETURN_STATUS(writtenBytes == payloadSize, StatusCode::K_RUNTIME_ERROR,
"Exclusive data file payload size mismatch");
RETURN_IF_NOT_OK(FsyncFd(dataFd));
return Status::OK();
}
Status Slot::Save(const std::string &key, uint64_t version, const std::shared_ptr<std::iostream> &body,
uint64_t asyncElapse, WriteMode writeMode, uint32_t ttlSecond)
{
(void)asyncElapse;
VLOG(1) << "Slot save begin, slotId=" << slotId_ << ", key=" << key << ", version=" << version
<< ", writeMode=" << static_cast<uint32_t>(writeMode);
std::lock_guard<std::mutex> lock(mu_);
RETURN_IF_NOT_OK(EnsureRuntimeReadyLocked());
RETURN_IF_NOT_OK(EnsureWritable(runtime_.manifest));
uint64_t payloadSize = 0;
RETURN_IF_NOT_OK(GetPayloadSize(body, payloadSize));
uint32_t fileId = 0;
bool useExclusiveFile = payloadSize >= maxDataFileBytes_;
if (useExclusiveFile) {
RETURN_IF_NOT_OK(AllocateExclusiveDataFileLocked(fileId));
} else {
RETURN_IF_NOT_OK(RotateWritableDataFileLocked(payloadSize, fileId));
}
uint64_t offset = 0;
if (useExclusiveFile) {
RETURN_IF_NOT_OK(WriteExclusivePayloadLocked(body, fileId, payloadSize));
} else {
RETURN_IF_NOT_OK(AppendPayloadToActiveFileLocked(body, payloadSize, offset));
}
SlotPutRecord record;
record.key = key;
record.fileId = fileId;
record.offset = offset;
record.size = payloadSize;
record.version = version;
record.writeMode = writeMode;
record.ttlSecond = ttlSecond;
std::string encoded;
RETURN_IF_NOT_OK(SlotIndexCodec::EncodePut(record, encoded));
RETURN_IF_NOT_OK(writer_.AppendIndexPayload(encoded));
writer_.RecordOperation(payloadSize + encoded.size());
runtime_.snapshot.ApplyPut(record);
RETURN_IF_NOT_OK(FlushRuntimeLocked(false));
VLOG(1) << "Slot save end, slotId=" << slotId_ << ", key=" << key << ", version=" << version
<< ", payloadSize=" << payloadSize << ", fileId=" << fileId << ", offset=" << offset
<< ", exclusiveFile=" << useExclusiveFile;
return Status::OK();
}
Status Slot::Get(const std::string &key, uint64_t version, std::shared_ptr<std::stringstream> &content)
{
VLOG(1) << "Slot get begin, slotId=" << slotId_ << ", key=" << key << ", version=" << version;
SlotSnapshotValue value;
{
std::lock_guard<std::mutex> lock(mu_);
RETURN_IF_NOT_OK(EnsureRuntimeReadyLocked());
RETURN_IF_NOT_OK(runtime_.snapshot.FindExact(key, version, value));
}
RETURN_IF_NOT_OK(ReadRecordData(value, content));
VLOG(1) << "Slot get end, slotId=" << slotId_ << ", key=" << key << ", version=" << version
<< ", fileId=" << value.fileId << ", offset=" << value.offset << ", size=" << value.size;
return Status::OK();
}
Status Slot::GetWithoutVersion(const std::string &key, uint64_t minVersion, std::shared_ptr<std::stringstream> &content)
{
VLOG(1) << "Slot get-latest begin, slotId=" << slotId_ << ", key=" << key << ", minVersion=" << minVersion;
SlotSnapshotValue value;
{
std::lock_guard<std::mutex> lock(mu_);
RETURN_IF_NOT_OK(EnsureRuntimeReadyLocked());
RETURN_IF_NOT_OK(runtime_.snapshot.FindLatest(key, minVersion, value));
}
RETURN_IF_NOT_OK(ReadRecordData(value, content));
VLOG(1) << "Slot get-latest end, slotId=" << slotId_ << ", key=" << key
<< ", resolvedVersion=" << value.version << ", fileId=" << value.fileId
<< ", offset=" << value.offset << ", size=" << value.size;
return Status::OK();
}
Status Slot::Delete(const std::string &key, uint64_t maxVerToDelete, bool deleteAllVersion)
{
VLOG(1) << "Slot delete begin, slotId=" << slotId_ << ", key=" << key
<< ", maxVerToDelete=" << maxVerToDelete << ", deleteAllVersion=" << deleteAllVersion;
std::lock_guard<std::mutex> lock(mu_);
RETURN_IF_NOT_OK(EnsureRuntimeReadyLocked());
RETURN_IF_NOT_OK(EnsureWritable(runtime_.manifest));
SlotDeleteRecord record;
record.key = key;
record.version = deleteAllVersion ? UINT64_MAX : maxVerToDelete;
std::string encoded;
RETURN_IF_NOT_OK(SlotIndexCodec::EncodeDelete(record, encoded));
RETURN_IF_NOT_OK(writer_.AppendIndexPayload(encoded));
writer_.RecordOperation(encoded.size());
runtime_.snapshot.ApplyDelete(record);
RETURN_IF_NOT_OK(FlushRuntimeLocked(false));
VLOG(1) << "Slot delete end, slotId=" << slotId_ << ", key=" << key << ", tombstoneVersion=" << record.version;
return Status::OK();
}
Status Slot::PreloadLocal(const SlotPreloadCallback &callback)
{
if (!callback) {
return Status::OK();
}
std::vector<SlotPutRecord> visiblePuts;
{
std::lock_guard<std::mutex> lock(mu_);
RETURN_IF_NOT_OK(EnsureRuntimeReadyLocked());
RETURN_IF_NOT_OK(FlushRuntimeLocked(true));
RETURN_IF_NOT_OK(runtime_.snapshot.CollectVisiblePuts(visiblePuts));
}
VLOG(1) << "Slot preload-local begin, slotId=" << slotId_ << ", visiblePutCount=" << visiblePuts.size();
return RunPreloadCallback(visiblePuts, callback);
}
Status Slot::ReplayIndex(SlotSnapshot &snapshot)
{
std::lock_guard<std::mutex> lock(mu_);
VLOG(1) << "Slot replay-index begin, slotId=" << slotId_ << ", slotPath=" << slotPath_;
INJECT_POINT("slotstore.Slot.ReplayIndex.Enter", []() { return Status::OK(); });
RETURN_IF_NOT_OK(FlushRuntimeLocked(true));
RETURN_IF_NOT_OK(BootstrapManifestIfNeed());
SlotManifestData manifest;
RETURN_IF_NOT_OK(LoadManifest(manifest));
RETURN_IF_NOT_OK(RecoverManifestIfNeeded(manifest));
RETURN_IF_NOT_OK(SlotSnapshot::Replay(slotPath_, manifest, snapshot));
VLOG(1) << "Slot replay-index end, slotId=" << slotId_ << ", slotPath=" << slotPath_
<< ", " << ManifestDebugString(manifest);
return Status::OK();
}
Status Slot::BootstrapManifestIfNeed()
{
VLOG(1) << "Bootstrap slot manifest if needed, slotId=" << slotId_ << ", slotPath=" << slotPath_;
RETURN_IF_NOT_OK(CreateDir(slotPath_, true));
SlotManifestData manifest;
auto rc = SlotManifest::Load(slotPath_, manifest);
if (rc.GetCode() == StatusCode::K_NOT_FOUND) {
RETURN_IF_NOT_OK(BuildBootstrapManifestFromDisk(manifest));
RETURN_IF_NOT_OK(PersistManifest(manifest));
VLOG(1) << "Created bootstrap slot manifest, slotId=" << slotId_ << ", " << ManifestDebugString(manifest);
} else {
RETURN_IF_NOT_OK(rc);
VLOG(1) << "Slot manifest already exists, slotId=" << slotId_ << ", " << ManifestDebugString(manifest);
}
RETURN_IF_NOT_OK(EnsureActiveFiles(manifest));
return Status::OK();
}
Status Slot::BuildBootstrapManifestFromDisk(SlotManifestData &manifest)
{
manifest = SlotManifest::Bootstrap();
bool rebuiltFromDisk = false;
std::vector<std::string> indexPaths;
RETURN_IF_NOT_OK(Glob(JoinPath(slotPath_, "index*.log"), indexPaths));
std::string bestIndexName;
size_t bestValidBytes = 0;
for (const auto &indexPath : indexPaths) {
auto filename = BaseName(indexPath);
if (filename.empty()) {
continue;
}
if (IsImportIndexFile(filename)) {
continue;
}
std::vector<SlotRecord> records;
size_t validBytes = 0;
auto rc = SlotIndexCodec::ReadAllRecords(indexPath, records, validBytes);
if (rc.IsError()) {
LOG(WARNING) << "Skip invalid slot index while rebuilding manifest, path=" << indexPath
<< ", err=" << rc.ToString();
continue;
}
if (ShouldUseAsBootstrapIndex(filename, validBytes, bestIndexName, bestValidBytes)) {
bestIndexName = filename;
bestValidBytes = validBytes;
}
}
if (!bestIndexName.empty()) {
manifest.activeIndex = bestIndexName;
rebuiltFromDisk = true;
}
std::vector<std::string> dataPaths;
RETURN_IF_NOT_OK(Glob(JoinPath(slotPath_, "data_*.bin"), dataPaths));
std::vector<std::pair<uint32_t, std::string>> parsedDataFiles;
for (const auto &dataPath : dataPaths) {
auto filename = BaseName(dataPath);
uint32_t fileId = 0;
if (ParseDataFileId(filename, fileId).IsOk()) {
parsedDataFiles.emplace_back(fileId, filename);
}
}
if (!parsedDataFiles.empty()) {
std::sort(parsedDataFiles.begin(), parsedDataFiles.end(),
[](const auto &lhs, const auto &rhs) { return lhs.first < rhs.first; });
manifest.activeData.clear();
for (const auto &entry : parsedDataFiles) {
manifest.activeData.emplace_back(entry.second);
}
rebuiltFromDisk = true;
}
if (rebuiltFromDisk) {
LOG(INFO) << "Rebuilt slot manifest from disk layout, slotId=" << slotId_
<< ", activeIndex=" << manifest.activeIndex << ", activeDataCount=" << manifest.activeData.size();
}
return Status::OK();
}
Status Slot::Repair()
{
VLOG(1) << "Slot repair begin, slotId=" << slotId_ << ", slotPath=" << slotPath_;
std::lock_guard<std::mutex> lock(mu_);
RETURN_IF_NOT_OK(FlushRuntimeLocked(true));
writer_.Close();
runtime_.Reset();
RETURN_IF_NOT_OK(BootstrapManifestIfNeed());
SlotManifestData manifest;
RETURN_IF_NOT_OK(LoadManifest(manifest));
RETURN_IF_NOT_OK(RecoverManifestIfNeeded(manifest));
size_t validBytes = 0;
std::vector<SlotRecord> records;
RETURN_IF_NOT_OK(SlotIndexCodec::ReadAllRecords(JoinPath(slotPath_, manifest.activeIndex), records, validBytes));
SlotSnapshot snapshot;
RETURN_IF_NOT_OK(SlotSnapshot::Replay(slotPath_, manifest, snapshot));
runtime_.manifest = manifest;
runtime_.snapshot = snapshot;
RETURN_IF_NOT_OK(GetFileModifiedTime(JoinPath(slotPath_, "manifest"), runtime_.manifestModifiedTimeUs));
runtime_.initialized = true;
RETURN_IF_NOT_OK(writer_.Init(slotPath_, manifest));
VLOG(1) << "Slot repair end, slotId=" << slotId_ << ", slotPath=" << slotPath_
<< ", recordCount=" << records.size() << ", validBytes=" << validBytes
<< ", " << ManifestDebugString(manifest);
return Status::OK();
}
Status Slot::Compact()
{
VLOG(1) << "Slot compact begin, slotId=" << slotId_ << ", slotPath=" << slotPath_;
RETURN_IF_NOT_OK(Repair());
SlotManifestData baseManifest;
SlotSnapshot snapshot;
size_t appliedFrontier = 0;
{
std::lock_guard<std::mutex> lock(mu_);
RETURN_IF_NOT_OK(EnsureRuntimeReadyLocked());
RETURN_IF_NOT_OK(FlushRuntimeLocked(true));
RETURN_IF_NOT_OK(CheckCompactTransferPreemptionLocked());
RETURN_IF_NOT_OK(EnsureWritable(runtime_.manifest));
baseManifest = runtime_.manifest;
snapshot = runtime_.snapshot;
appliedFrontier = static_cast<size_t>(writer_.GetActiveIndexSize());
}
const auto compactEpochMs = static_cast<uint64_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
.count());
SlotCompactor compactor(slotPath_, maxDataFileBytes_);
SlotCompactBuildResult buildResult;
RETURN_IF_NOT_OK(compactor.BuildArtifacts(baseManifest, snapshot, compactEpochMs, buildResult));
bool keepCompactArtifacts = false;
Raii cleanupCompactArtifacts([&compactor, &buildResult, &keepCompactArtifacts]() {
if (!keepCompactArtifacts) {
(void)compactor.CleanupArtifacts(buildResult);
}
});
INJECT_POINT("slotstore.Slot.Compact.BeforeCommit",
[this](std::string objectKey, std::string version, std::string payload) {
auto body = std::make_shared<std::stringstream>();
*body << payload;
return Save(objectKey, std::stoull(version), body);
});
INJECT_POINT("slotstore.Slot.Compact.BeforeCommitDelete",
[this](std::string objectKey, std::string version, std::string deleteAllVersion) {
return Delete(objectKey, std::stoull(version), deleteAllVersion == "true");
});
INJECT_POINT("slotstore.Slot.Compact.BeforeCommitTransfer",
[this](std::string sourceSlotPath, std::string preload) {
SlotTakeoverRequest request;
request.mode = preload == "true" ? SlotTakeoverMode::PRELOAD : SlotTakeoverMode::MERGE;
return Takeover(sourceSlotPath, request);
});
for (;;) {
SlotManifestData currentManifest;
size_t catchupFrontier = 0;
{
std::lock_guard<std::mutex> lock(mu_);
RETURN_IF_NOT_OK(EnsureRuntimeReadyLocked());
RETURN_IF_NOT_OK(CheckCompactTransferPreemptionLocked());
RETURN_IF_NOT_OK(EnsureWritable(runtime_.manifest));
currentManifest = runtime_.manifest;
if (currentManifest.activeIndex != baseManifest.activeIndex) {
RETURN_STATUS(StatusCode::K_TRY_AGAIN, "Slot active index changed during compact");
}
RETURN_IF_NOT_OK(FlushRuntimeLocked(true));
catchupFrontier = static_cast<size_t>(writer_.GetActiveIndexSize());
}
std::vector<SlotRecord> deltaRecords;
size_t deltaBytes = 0;
RETURN_IF_NOT_OK(ReadIndexDeltaRecords(JoinPath(slotPath_, baseManifest.activeIndex), appliedFrontier,
catchupFrontier, deltaRecords, deltaBytes));
if (!deltaRecords.empty()) {
auto rc = compactor.ApplyDeltaRecords(deltaRecords, buildResult);
RETURN_IF_NOT_OK(rc);
appliedFrontier = catchupFrontier;
VLOG(1) << "Slot compact applied catchup delta, slotId=" << slotId_
<< ", deltaRecordCount=" << deltaRecords.size() << ", deltaBytes=" << deltaBytes
<< ", frontier=" << appliedFrontier;
}
if (deltaBytes <= GetCurrentSlotCompactCutoverBytes()
&& deltaRecords.size() <= GetCurrentSlotCompactCutoverRecords()) {
break;
}
}
{
std::lock_guard<std::mutex> lock(mu_);
RETURN_IF_NOT_OK(EnsureRuntimeReadyLocked());
RETURN_IF_NOT_OK(CheckCompactTransferPreemptionLocked());
RETURN_IF_NOT_OK(EnsureWritable(runtime_.manifest));
auto manifest = runtime_.manifest;
if (manifest.activeIndex != baseManifest.activeIndex) {
RETURN_STATUS(StatusCode::K_TRY_AGAIN, "Slot active index changed before compact cutover");
}
RETURN_IF_NOT_OK(FlushRuntimeLocked(true));
const auto finalFrontier = static_cast<size_t>(writer_.GetActiveIndexSize());
std::vector<SlotRecord> finalDeltaRecords;
size_t ignoredFinalDeltaBytes = 0;
RETURN_IF_NOT_OK(ReadIndexDeltaRecords(JoinPath(slotPath_, baseManifest.activeIndex), appliedFrontier,
finalFrontier, finalDeltaRecords, ignoredFinalDeltaBytes));
if (!finalDeltaRecords.empty()) {
auto rc = compactor.ApplyDeltaRecords(finalDeltaRecords, buildResult);
RETURN_IF_NOT_OK(rc);
VLOG(1) << "Slot compact applied final delta, slotId=" << slotId_
<< ", deltaRecordCount=" << finalDeltaRecords.size();
}
SlotManifestData switching = manifest;
switching.state = SlotState::IN_OPERATION;
switching.opType = SlotOperationType::COMPACT;
switching.opPhase = SlotOperationPhase::COMPACT_COMMITTING;
switching.role = SlotOperationRole::LOCAL;
switching.txnId = std::to_string(compactEpochMs);
switching.pendingIndex = buildResult.indexFile;
switching.pendingData = buildResult.dataFiles;
switching.lastCompactEpochMs = compactEpochMs;
keepCompactArtifacts = true;
RETURN_IF_NOT_OK(PersistManifest(switching));
SlotManifestData active = switching;
active.state = SlotState::NORMAL;
active.opType = SlotOperationType::NONE;
active.opPhase = SlotOperationPhase::NONE;
active.role = SlotOperationRole::NONE;
active.txnId.clear();
active.obsoleteIndex = manifest.activeIndex;
active.obsoleteData = manifest.activeData;
active.activeIndex = buildResult.indexFile;
active.activeData = buildResult.dataFiles;
active.gcPending = true;
active.pendingIndex.clear();
active.pendingData.clear();
RETURN_IF_NOT_OK(PersistManifest(active));
RETURN_IF_NOT_OK(ContinueGc(active));
ResetRuntimeLocked();
RETURN_IF_NOT_OK(BuildRuntimeStateLocked());
}
VLOG(1) << "Slot compact end, slotId=" << slotId_ << ", slotPath=" << slotPath_;
return Status::OK();
}
Status Slot::Seal(const std::string &sealReason)
{
(void)sealReason;
std::lock_guard<std::mutex> lock(mu_);
RETURN_IF_NOT_OK(EnsureRuntimeReadyLocked());
RETURN_IF_NOT_OK(FlushRuntimeLocked(true));
RETURN_IF_NOT_OK(SyncActiveFiles(runtime_.manifest));
return Status::OK();
}
Status Slot::Takeover(const std::string &sourceSlotPath, bool loadToMemory)
{
SlotTakeoverRequest request;
request.mode = loadToMemory ? SlotTakeoverMode::PRELOAD : SlotTakeoverMode::MERGE;
return Takeover(sourceSlotPath, request);
}
Status Slot::Takeover(const std::string &sourceSlotPath, const SlotTakeoverRequest &request)
{
CHECK_FAIL_RETURN_STATUS(!sourceSlotPath.empty(), StatusCode::K_INVALID, "sourceSlotPath must not be empty");
CHECK_FAIL_RETURN_STATUS(sourceSlotPath != slotPath_, StatusCode::K_INVALID,
"sourceSlotPath must differ from target slot path");
VLOG(1) << "Slot takeover begin, slotId=" << slotId_ << ", targetSlot=" << slotPath_
<< ", sourceSlot=" << sourceSlotPath << ", loadToMemory=" << request.IsPreload();
bool expected = false;
CHECK_FAIL_RETURN_STATUS(transferIntentActive_.compare_exchange_strong(expected, true), StatusCode::K_TRY_AGAIN,
"Another transfer already owns this target slot");
Raii clearTransferIntent([this]() { transferIntentActive_.store(false); });
RETURN_IF_NOT_OK(Repair());
{
Slot sourceManager(0, sourceSlotPath, maxDataFileBytes_);
RETURN_IF_NOT_OK(sourceManager.Repair());
}
std::string txnId;
std::vector<SlotPutRecord> preloadPuts;
{
std::lock_guard<std::mutex> lock(mu_);
RETURN_IF_NOT_OK(EnsureRuntimeReadyLocked());
RETURN_IF_NOT_OK(FlushRuntimeLocked(true));
SlotManifestData targetManifest = runtime_.manifest;
RETURN_IF_NOT_OK(EnsureWritable(targetManifest));
RETURN_IF_NOT_OK(CleanupStaleTakeoverArtifacts());
txnId = GenerateTxnId(slotId_);
const std::string recoverySlotPath = BuildRecoverySlotPath(sourceSlotPath, txnId);
targetManifest.state = SlotState::IN_OPERATION;
targetManifest.opType = SlotOperationType::TRANSFER;
targetManifest.opPhase = SlotOperationPhase::TRANSFER_FENCING;
targetManifest.role = SlotOperationRole::TARGET;
targetManifest.txnId = txnId;
targetManifest.ownerEpoch += 1;
targetManifest.peerSlotPath = sourceSlotPath;
targetManifest.recoverySlotPath = recoverySlotPath;
targetManifest.transferPlanPath.clear();
targetManifest.transferFileMap.clear();
RETURN_IF_NOT_OK(PersistManifest(targetManifest));
VLOG(1) << "Slot takeover fenced target manifest, slotId=" << slotId_ << ", txnId=" << txnId
<< ", recoverySlotPath=" << recoverySlotPath;
INJECT_POINT("slotstore.Slot.Takeover.AfterManifestFencing", []() { return Status::OK(); });
if (!FileExist(recoverySlotPath)) {
CHECK_FAIL_RETURN_STATUS(FileExist(sourceSlotPath), StatusCode::K_NOT_FOUND,
"Source slot path not found for takeover: " + sourceSlotPath);
RETURN_IF_NOT_OK(RenameFile(sourceSlotPath, recoverySlotPath));
}
RETURN_IF_NOT_OK(PersistSourceTransferManifest(recoverySlotPath, targetManifest, sourceSlotPath,
SlotOperationPhase::TRANSFER_FENCING));
SlotManifestData sourceManifest;
RETURN_IF_NOT_OK(SlotManifest::Load(recoverySlotPath, sourceManifest));
SlotSnapshot sourceSnapshot;
RETURN_IF_NOT_OK(SlotSnapshot::Replay(recoverySlotPath, sourceManifest, sourceSnapshot));
SlotTakeoverPlan plan;
RETURN_IF_NOT_OK(SlotTakeoverPlanner::BuildPlan(sourceSlotPath, recoverySlotPath, sourceManifest,
sourceSnapshot, slotPath_, targetManifest, request, txnId,
plan));
RETURN_IF_NOT_OK(SlotTakeoverPlanner::DumpPlan(slotPath_, plan));
VLOG(1) << "Slot takeover durable plan ready, slotId=" << slotId_ << ", txnId=" << txnId
<< ", dataMappingCount=" << plan.dataMappings.size();
INJECT_POINT("slotstore.Slot.Takeover.AfterPlanDurable", []() { return Status::OK(); });
targetManifest.transferPlanPath = FormatTakeoverPlanFileName(plan.txnId);
targetManifest.transferFileMap = EncodeTransferFileMap(plan);
targetManifest.opPhase = SlotOperationPhase::TRANSFER_PREPARED;
RETURN_IF_NOT_OK(PersistManifest(targetManifest));
RETURN_IF_NOT_OK(PersistSourceTransferManifest(recoverySlotPath, targetManifest, sourceSlotPath,
SlotOperationPhase::TRANSFER_PREPARED));
INJECT_POINT("slotstore.Slot.Takeover.AfterManifestImporting", []() { return Status::OK(); });
RETURN_IF_NOT_OK(RecoverTransfer(targetManifest, &request, &preloadPuts));
ResetRuntimeLocked();
RETURN_IF_NOT_OK(BuildRuntimeStateLocked());
}
RETURN_IF_NOT_OK(RunPreloadCallback(preloadPuts, request.callback));
VLOG(1) << "Slot takeover end, slotId=" << slotId_ << ", targetSlot=" << slotPath_ << ", txnId=" << txnId;
return Status::OK();
}
Status Slot::RecoverManifestIfNeeded(SlotManifestData &manifest)
{
VLOG(1) << "Checking slot manifest recovery, slotId=" << slotId_ << ", slotPath=" << slotPath_
<< ", " << ManifestDebugString(manifest);
if (manifest.state == SlotState::IN_OPERATION && manifest.opType == SlotOperationType::COMPACT
&& manifest.opPhase == SlotOperationPhase::COMPACT_COMMITTING) {
RETURN_IF_NOT_OK(RecoverCompactCommitting(manifest));
} else if (IsTransferManifest(manifest)) {
RETURN_IF_NOT_OK(RecoverTransfer(manifest, nullptr));
}
if (manifest.gcPending) {
RETURN_IF_NOT_OK(ContinueGc(manifest));
}
VLOG(1) << "Finished slot manifest recovery check, slotId=" << slotId_ << ", "
<< ManifestDebugString(manifest);
return Status::OK();
}
Status Slot::RecoverCompactCommitting(SlotManifestData &manifest)
{
VLOG(1) << "Recovering compact-committing slot manifest, slotId=" << slotId_ << ", "
<< ManifestDebugString(manifest);
if (PendingArtifactsReady(manifest)) {
manifest.obsoleteIndex = manifest.activeIndex;
manifest.obsoleteData = manifest.activeData;
manifest.activeIndex = manifest.pendingIndex;
manifest.activeData = manifest.pendingData;
manifest.gcPending = true;
} else {
std::vector<std::string> stalePaths = manifest.pendingData;
if (!manifest.pendingIndex.empty()) {
stalePaths.emplace_back(manifest.pendingIndex);
}
RETURN_IF_NOT_OK(CleanupArtifactFiles(stalePaths));
}
manifest.state = SlotState::NORMAL;
manifest.opType = SlotOperationType::NONE;
manifest.opPhase = SlotOperationPhase::NONE;
manifest.role = SlotOperationRole::NONE;
manifest.txnId.clear();
manifest.pendingIndex.clear();
manifest.pendingData.clear();
RETURN_IF_NOT_OK(PersistManifest(manifest));
VLOG(1) << "Recovered compact-committing slot manifest, slotId=" << slotId_ << ", "
<< ManifestDebugString(manifest);
return Status::OK();
}
Status Slot::RecoverTransfer(SlotManifestData &manifest, const SlotTakeoverRequest *request,
std::vector<SlotPutRecord> *preloadPuts)
{
CHECK_FAIL_RETURN_STATUS(IsTransferManifest(manifest), StatusCode::K_INVALID,
"RecoverTransfer requires transfer manifest");
CHECK_FAIL_RETURN_STATUS(!manifest.recoverySlotPath.empty(), StatusCode::K_INVALID,
"TRANSFER manifest missing recovery slot path");
CHECK_FAIL_RETURN_STATUS(!manifest.txnId.empty(), StatusCode::K_INVALID, "TRANSFER manifest missing txn_id");
VLOG(1) << "Recovering slot transfer, slotId=" << slotId_ << ", targetSlot=" << slotPath_
<< ", " << ManifestDebugString(manifest) << ", hasRequest=" << (request != nullptr);
while (IsTransferManifest(manifest)) {
VLOG(1) << "Slot transfer phase begin, slotId=" << slotId_ << ", txnId=" << manifest.txnId
<< ", phase=" << ToString(manifest.opPhase);
switch (manifest.opPhase) {
case SlotOperationPhase::TRANSFER_FENCING: {
if (!FileExist(manifest.recoverySlotPath)) {
CHECK_FAIL_RETURN_STATUS(!manifest.peerSlotPath.empty(), StatusCode::K_INVALID,
"TRANSFER_FENCING missing source home slot path");
if (!FileExist(manifest.peerSlotPath)) {
LOG(WARNING) << "Abort transfer fencing because neither source home nor recovery slot exists, "
<< "targetSlot=" << slotPath_ << ", txnId=" << manifest.txnId;
RETURN_IF_NOT_OK(CleanupTransferArtifacts(manifest));
RETURN_IF_NOT_OK(ResetTransferTargetManifest(manifest));
return Status::OK();
}
RETURN_IF_NOT_OK(RenameFile(manifest.peerSlotPath, manifest.recoverySlotPath));
}
RETURN_IF_NOT_OK(PersistSourceTransferManifest(
manifest.recoverySlotPath, manifest, manifest.peerSlotPath, SlotOperationPhase::TRANSFER_FENCING));
SlotManifestData sourceManifest;
SlotSnapshot sourceSnapshot;
RETURN_IF_NOT_OK(LoadRecoveryTransferState(manifest.recoverySlotPath, sourceManifest, sourceSnapshot));
SlotTakeoverPlan plan;
SlotTakeoverRequest rebuildRequest;
rebuildRequest.mode = SlotTakeoverMode::MERGE;
RETURN_IF_NOT_OK(SlotTakeoverPlanner::BuildPlan(manifest.peerSlotPath, manifest.recoverySlotPath,
sourceManifest, sourceSnapshot, slotPath_, manifest,
rebuildRequest, manifest.txnId, plan));
RETURN_IF_NOT_OK(SlotTakeoverPlanner::DumpPlan(slotPath_, plan));
manifest.transferPlanPath = FormatTakeoverPlanFileName(plan.txnId);
manifest.transferFileMap = EncodeTransferFileMap(plan);
manifest.opPhase = SlotOperationPhase::TRANSFER_PREPARED;
RETURN_IF_NOT_OK(PersistManifest(manifest));
RETURN_IF_NOT_OK(PersistSourceTransferManifest(
manifest.recoverySlotPath, manifest, manifest.peerSlotPath, SlotOperationPhase::TRANSFER_PREPARED));
VLOG(1) << "Slot transfer moved to PREPARED, slotId=" << slotId_ << ", txnId=" << manifest.txnId
<< ", dataMappingCount=" << plan.dataMappings.size();
break;
}
case SlotOperationPhase::TRANSFER_PREPARED: {
SlotTakeoverPlan plan;
auto needRebuildPlan =
manifest.transferPlanPath.empty() || !FileExist(JoinPath(slotPath_, manifest.transferPlanPath));
if (!needRebuildPlan) {
auto rc = SlotTakeoverPlanner::LoadPlan(JoinPath(slotPath_, manifest.transferPlanPath), plan);
if (rc.IsError()) {
needRebuildPlan = true;
}
}
const auto importIndexPath = JoinPath(slotPath_, FormatImportIndexFileName(manifest.txnId));
if (!FileExist(importIndexPath)) {
needRebuildPlan = true;
}
if (needRebuildPlan) {
if (!FileExist(manifest.recoverySlotPath) && !FileExist(importIndexPath)) {
LOG(ERROR) << "Abort prepared transfer because plan/import artifacts are missing and source "
<< "recovery is unavailable, targetSlot=" << slotPath_
<< ", txnId=" << manifest.txnId;
RETURN_IF_NOT_OK(CleanupTransferArtifacts(manifest));
RETURN_IF_NOT_OK(ResetTransferTargetManifest(manifest));
return Status::OK();
}
RETURN_IF_NOT_OK(RebuildPreparedTransferPlan(manifest, plan));
}
if (!FileExist(plan.sourceRecoverySlotPath) && !AreTransferTargetDataReady(plan)) {
LOG(ERROR) << "Abort prepared transfer because source recovery slot is missing before all data "
<< "files are durable on target, targetSlot=" << slotPath_
<< ", txnId=" << manifest.txnId;
RETURN_IF_NOT_OK(CleanupTransferArtifacts(manifest, &plan));
RETURN_IF_NOT_OK(ResetTransferTargetManifest(manifest));
return Status::OK();
}
for (size_t i = 0; i < plan.dataMappings.size(); ++i) {
const auto &mapping = plan.dataMappings[i];
RETURN_IF_NOT_OK(
RenameDataFileIfNeeded(JoinPath(plan.sourceRecoverySlotPath, mapping.sourceDataFile),
JoinPath(slotPath_, mapping.targetDataFile)));
if (i == 0) {
INJECT_POINT("slotstore.Slot.Takeover.AfterFirstMove", []() { return Status::OK(); });
}
}
if (FileExist(plan.sourceRecoverySlotPath)) {
RETURN_IF_NOT_OK(FsyncDir(plan.sourceRecoverySlotPath));
}
RETURN_IF_NOT_OK(FsyncDir(slotPath_));
if (preloadPuts != nullptr && request != nullptr && request->IsPreload() && request->callback) {
RETURN_IF_NOT_OK(CollectPreloadPuts(plan, *request, *preloadPuts));
}
bool published = false;
RETURN_IF_NOT_OK(PublishImportBatch(JoinPath(slotPath_, manifest.activeIndex), plan.txnId,
plan.importIndexFile, published));
INJECT_POINT("slotstore.Slot.Takeover.AfterImportPublished", []() { return Status::OK(); });
for (const auto &mapping : plan.dataMappings) {
if (std::find(manifest.activeData.begin(), manifest.activeData.end(), mapping.targetDataFile)
== manifest.activeData.end()) {
manifest.activeData.emplace_back(mapping.targetDataFile);
}
}
manifest.opPhase = SlotOperationPhase::TRANSFER_INDEX_PUBLISHED;
RETURN_IF_NOT_OK(PersistManifest(manifest));
if (FileExist(plan.sourceRecoverySlotPath)) {
RETURN_IF_NOT_OK(PersistSourceTransferManifest(plan.sourceRecoverySlotPath, manifest,
plan.sourceHomeSlotPath,
SlotOperationPhase::TRANSFER_INDEX_PUBLISHED));
}
VLOG(1) << "Slot transfer published import batch, slotId=" << slotId_ << ", txnId=" << manifest.txnId
<< ", dataMappingCount=" << plan.dataMappings.size() << ", publishedActiveDataCount="
<< manifest.activeData.size();
break;
}
case SlotOperationPhase::TRANSFER_INDEX_PUBLISHED: {
RETURN_IF_NOT_OK(FinalizeSourceAfterTakeover(manifest.recoverySlotPath, manifest));
manifest.opPhase = SlotOperationPhase::TRANSFER_SOURCE_RETIRED;
RETURN_IF_NOT_OK(PersistManifest(manifest));
VLOG(1) << "Slot transfer source retired, slotId=" << slotId_ << ", txnId=" << manifest.txnId;
break;
}
case SlotOperationPhase::TRANSFER_SOURCE_RETIRED: {
const auto completedTxnId = manifest.txnId;
const auto transferPlanFile = manifest.transferPlanPath;
manifest.state = SlotState::NORMAL;
manifest.opType = SlotOperationType::NONE;
manifest.opPhase = SlotOperationPhase::NONE;
manifest.role = SlotOperationRole::NONE;
manifest.txnId.clear();
manifest.peerSlotPath.clear();
manifest.recoverySlotPath.clear();
manifest.transferFileMap.clear();
manifest.transferPlanPath.clear();
RETURN_IF_NOT_OK(PersistManifest(manifest));
bool deletedImportIndex = false;
(void)DeleteFileIfExists(FormatImportIndexFileName(completedTxnId), deletedImportIndex);
if (!transferPlanFile.empty()) {
bool deletedPlan = false;
(void)DeleteFileIfExists(BaseName(transferPlanFile), deletedPlan);
}
VLOG(1) << "Slot transfer completed, slotId=" << slotId_ << ", completedTxnId=" << completedTxnId;
return Status::OK();
}
default:
RETURN_STATUS(StatusCode::K_INVALID, "Unexpected TRANSFER op_phase");
}
}
return Status::OK();
}
Status Slot::ContinueGc(SlotManifestData &manifest)
{
VLOG(1) << "Slot GC begin, slotId=" << slotId_ << ", obsoleteDataCount=" << manifest.obsoleteData.size()
<< ", obsoleteIndex=" << manifest.obsoleteIndex << ", gcPending=" << manifest.gcPending;
bool deletedAnything = false;
for (const auto &dataFile : manifest.obsoleteData) {
RETURN_IF_NOT_OK(DeleteFileIfExists(dataFile, deletedAnything));
}
RETURN_IF_NOT_OK(DeleteFileIfExists(manifest.obsoleteIndex, deletedAnything));
if (deletedAnything || manifest.gcPending) {
manifest.gcPending = false;
manifest.obsoleteIndex.clear();
manifest.obsoleteData.clear();
RETURN_IF_NOT_OK(PersistManifest(manifest));
}
VLOG(1) << "Slot GC end, slotId=" << slotId_ << ", deletedAnything=" << deletedAnything
<< ", gcPending=" << manifest.gcPending;
return Status::OK();
}
Status Slot::DeleteFileIfExists(const std::string &relativePath, bool &deletedAnything)
{
if (relativePath.empty()) {
return Status::OK();
}
auto absPath = JoinPath(slotPath_, relativePath);
if (unlink(absPath.c_str()) != 0) {
if (errno == ENOENT) {
return Status::OK();
}
std::stringstream ss;
ss << "Delete file " << absPath << " failed with errno: " << errno << ", errmsg: " << StrErr(errno);
RETURN_STATUS(StatusCode::K_RUNTIME_ERROR, ss.str());
}
deletedAnything = true;
return Status::OK();
}
Status Slot::CleanupArtifactFiles(const std::vector<std::string> &relativePaths)
{
bool deletedAnything = false;
for (const auto &relativePath : relativePaths) {
RETURN_IF_NOT_OK(DeleteFileIfExists(relativePath, deletedAnything));
}
return Status::OK();
}
Status Slot::CleanupStaleTakeoverArtifacts()
{
std::vector<std::string> stalePlanPaths;
RETURN_IF_NOT_OK(Glob(JoinPath(slotPath_, "takeover_*.plan"), stalePlanPaths));
std::vector<std::string> staleImportPaths;
RETURN_IF_NOT_OK(Glob(JoinPath(slotPath_, "index_import_*.log"), staleImportPaths));
std::vector<std::string> staleRelativePaths;
staleRelativePaths.reserve(stalePlanPaths.size() + staleImportPaths.size());
for (const auto &path : stalePlanPaths) {
staleRelativePaths.emplace_back(BaseName(path));
}
for (const auto &path : staleImportPaths) {
staleRelativePaths.emplace_back(BaseName(path));
}
if (!staleRelativePaths.empty()) {
VLOG(1) << "Cleaning stale slot takeover artifacts, slotId=" << slotId_
<< ", artifactCount=" << staleRelativePaths.size();
}
RETURN_IF_NOT_OK(CleanupArtifactFiles(staleRelativePaths));
return Status::OK();
}
bool Slot::PendingArtifactsReady(const SlotManifestData &manifest) const
{
if (manifest.pendingIndex.empty() || manifest.pendingData.empty()) {
return false;
}
auto indexPath = JoinPath(slotPath_, manifest.pendingIndex);
if (!FileExist(indexPath) || FileSize(indexPath, false) < static_cast<off_t>(SlotIndexCodec::HEADER_SIZE)) {
return false;
}
for (const auto &dataFile : manifest.pendingData) {
if (!FileExist(JoinPath(slotPath_, dataFile))) {
return false;
}
}
return true;
}
Status Slot::LoadManifest(SlotManifestData &manifest)
{
RETURN_IF_NOT_OK(SlotManifest::Load(slotPath_, manifest));
return Status::OK();
}
Status Slot::EnsureActiveFiles(const SlotManifestData &manifest)
{
if (!manifest.activeIndex.empty()) {
RETURN_IF_NOT_OK(SlotIndexCodec::EnsureIndexFile(JoinPath(slotPath_, manifest.activeIndex)));
}
for (const auto &dataFile : manifest.activeData) {
RETURN_IF_NOT_OK(::datasystem::EnsureFile(JoinPath(slotPath_, dataFile)));
}
return Status::OK();
}
Status Slot::PersistManifest(const SlotManifestData &manifest)
{
SlotManifestData persisted = manifest;
if (persisted.homeSlotPath.empty()) {
persisted.homeSlotPath = slotPath_;
}
RETURN_IF_NOT_OK(SlotManifest::StoreAtomic(slotPath_, persisted));
RETURN_IF_NOT_OK(EnsureActiveFiles(persisted));
if (runtime_.initialized) {
runtime_.manifest = persisted;
RETURN_IF_NOT_OK(GetFileModifiedTime(JoinPath(slotPath_, "manifest"), runtime_.manifestModifiedTimeUs));
}
VLOG(1) << "Slot persisted manifest, slotId=" << slotId_ << ", slotPath=" << slotPath_
<< ", " << ManifestDebugString(persisted);
return Status::OK();
}
Status Slot::ReadRecordData(const SlotSnapshotValue &value, std::shared_ptr<std::stringstream> &content) const
{
CHECK_FAIL_RETURN_STATUS(content != nullptr, StatusCode::K_INVALID, "content is nullptr");
int fd = -1;
auto dataPath = JoinPath(slotPath_, FormatDataFileName(value.fileId));
RETURN_IF_NOT_OK(OpenFile(dataPath, O_RDONLY, &fd));
Raii closeFd([fd]() {
if (fd >= 0) {
close(fd);
}
});
content->str("");
content->clear();
std::vector<char> buffer(std::min<uint64_t>(value.size, SLOT_IO_CHUNK_BYTES));
uint64_t remaining = value.size;
uint64_t offset = value.offset;
while (remaining > 0) {
auto bytesToRead = static_cast<size_t>(std::min<uint64_t>(remaining, buffer.size()));
RETURN_IF_NOT_OK(ReadFile(fd, buffer.data(), bytesToRead, static_cast<off_t>(offset)));
content->write(buffer.data(), static_cast<std::streamsize>(bytesToRead));
offset += bytesToRead;
remaining -= bytesToRead;
}
return Status::OK();
}
Status Slot::EnsureWritable(const SlotManifestData &manifest) const
{
CHECK_FAIL_RETURN_STATUS(IsNormalManifest(manifest), StatusCode::K_TRY_AGAIN,
"Slot is not writable in current state");
return Status::OK();
}
Status Slot::SyncActiveFiles(const SlotManifestData &manifest) const
{
if (!manifest.activeIndex.empty()) {
int fd = -1;
RETURN_IF_NOT_OK(OpenFile(JoinPath(slotPath_, manifest.activeIndex), O_RDWR, &fd));
Raii closeIndexFd([fd]() {
if (fd >= 0) {
close(fd);
}
});
RETURN_IF_NOT_OK(FsyncFd(fd));
}
for (const auto &dataFile : manifest.activeData) {
int fd = -1;
RETURN_IF_NOT_OK(OpenFile(JoinPath(slotPath_, dataFile), O_RDWR, &fd));
Raii closeDataFd([fd]() {
if (fd >= 0) {
close(fd);
}
});
RETURN_IF_NOT_OK(FsyncFd(fd));
}
RETURN_IF_NOT_OK(FsyncDir(slotPath_));
return Status::OK();
}
Status Slot::CollectPreloadPuts(const SlotTakeoverPlan &plan, const SlotTakeoverRequest &request,
std::vector<SlotPutRecord> &visiblePuts) const
{
visiblePuts.clear();
if (!request.IsPreload() || !request.callback) {
return Status::OK();
}
std::vector<SlotRecord> importRecords;
size_t validBytes = 0;
RETURN_IF_NOT_OK(
SlotIndexCodec::ReadAllRecords(JoinPath(slotPath_, plan.importIndexFile), importRecords, validBytes));
visiblePuts.reserve(importRecords.size());
for (const auto &record : importRecords) {
if (record.type != SlotRecordType::PUT) {
continue;
}
visiblePuts.emplace_back(record.put);
}
return Status::OK();
}
Status Slot::RunPreloadCallback(const SlotTakeoverPlan &plan, const SlotTakeoverRequest &request) const
{
std::vector<SlotPutRecord> visiblePuts;
RETURN_IF_NOT_OK(CollectPreloadPuts(plan, request, visiblePuts));
return RunPreloadCallback(visiblePuts, request.callback);
}
Status Slot::RunPreloadCallback(const std::vector<SlotPutRecord> &visiblePuts,
const SlotPreloadCallback &callback) const
{
if (!callback) {
return Status::OK();
}
for (const auto &put : visiblePuts) {
SlotSnapshotValue value{ put.key, put.fileId, put.offset, put.size, put.version, put.writeMode, false };
auto content = std::make_shared<std::stringstream>();
RETURN_IF_NOT_OK(ReadRecordData(value, content));
SlotPreloadMeta meta;
meta.objectKey = put.key;
meta.version = put.version;
meta.writeMode = put.writeMode;
meta.size = put.size;
meta.ttlSecond = put.ttlSecond;
auto rc = callback(meta, content);
if (rc.IsError()) {
LOG(WARNING) << "Preload callback stopped early, slotPath=" << slotPath_ << ", objectKey=" << meta.objectKey
<< ", version=" << meta.version << ", code=" << Status::StatusCodeName(rc.GetCode())
<< ", msg=" << rc.GetMsg();
if (rc.GetCode() == StatusCode::K_OUT_OF_MEMORY) {
LOG(WARNING) << "Preload callback stopped by out-of-memory, disk takeover continues.";
}
break;
}
}
return Status::OK();
}
Status Slot::CheckCompactTransferPreemptionLocked() const
{
CHECK_FAIL_RETURN_STATUS(!transferIntentActive_.load(), StatusCode::K_TRY_AGAIN,
"Compact preempted by transfer intent");
return Status::OK();
}
Status Slot::ReadIndexDeltaRecords(const std::string &indexPath, size_t startOffset, size_t endOffset,
std::vector<SlotRecord> &records, size_t &deltaBytes) const
{
records.clear();
deltaBytes = 0;
CHECK_FAIL_RETURN_STATUS(endOffset >= startOffset, StatusCode::K_INVALID, "Invalid compact delta frontier range");
if (endOffset == startOffset) {
return Status::OK();
}
std::vector<SlotRecordFrame> frames;
size_t validBytes = 0;
RETURN_IF_NOT_OK(SlotIndexCodec::ReadAllRecordFrames(indexPath, frames, validBytes));
CHECK_FAIL_RETURN_STATUS(endOffset <= validBytes, StatusCode::K_RUNTIME_ERROR,
"Compact frontier exceeds validated active index bytes");
for (const auto &frame : frames) {
if (frame.endOffset <= startOffset) {
continue;
}
if (frame.startOffset >= endOffset) {
break;
}
CHECK_FAIL_RETURN_STATUS(frame.startOffset >= startOffset && frame.endOffset <= endOffset,
StatusCode::K_RUNTIME_ERROR, "Compact delta frontier split one index record");
records.emplace_back(frame.record);
}
deltaBytes = endOffset - startOffset;
return Status::OK();
}
Status Slot::ResetTransferTargetManifest(SlotManifestData &manifest)
{
CHECK_FAIL_RETURN_STATUS(manifest.role == SlotOperationRole::TARGET, StatusCode::K_INVALID,
"ResetTransferTargetManifest requires target transfer manifest");
manifest.state = SlotState::NORMAL;
manifest.opType = SlotOperationType::NONE;
manifest.opPhase = SlotOperationPhase::NONE;
manifest.role = SlotOperationRole::NONE;
manifest.txnId.clear();
manifest.peerSlotPath.clear();
manifest.recoverySlotPath.clear();
manifest.transferPlanPath.clear();
manifest.transferFileMap.clear();
return PersistManifest(manifest);
}
Status Slot::CleanupTransferArtifacts(const SlotManifestData &manifest, const SlotTakeoverPlan *plan)
{
std::vector<std::string> relativePaths;
if (!manifest.txnId.empty()) {
relativePaths.emplace_back(FormatImportIndexFileName(manifest.txnId));
}
if (!manifest.transferPlanPath.empty()) {
relativePaths.emplace_back(BaseName(manifest.transferPlanPath));
}
std::unordered_map<uint32_t, uint32_t> fileIdMap;
if (plan == nullptr) {
RETURN_IF_NOT_OK(DecodeTransferFileMap(manifest.transferFileMap, fileIdMap));
for (const auto &entry : fileIdMap) {
const auto targetDataFile = FormatDataFileName(entry.second);
if (std::find(manifest.activeData.begin(), manifest.activeData.end(), targetDataFile)
== manifest.activeData.end()) {
relativePaths.emplace_back(targetDataFile);
}
}
} else {
for (const auto &mapping : plan->dataMappings) {
if (std::find(manifest.activeData.begin(), manifest.activeData.end(), mapping.targetDataFile)
== manifest.activeData.end()) {
relativePaths.emplace_back(mapping.targetDataFile);
}
}
}
return CleanupArtifactFiles(relativePaths);
}
Status Slot::LoadRecoveryTransferState(const std::string &sourceRecoveryPath, SlotManifestData &sourceManifest,
SlotSnapshot &sourceSnapshot)
{
RETURN_IF_NOT_OK(SlotManifest::Load(sourceRecoveryPath, sourceManifest));
RETURN_IF_NOT_OK(SlotSnapshot::Replay(sourceRecoveryPath, sourceManifest, sourceSnapshot));
return Status::OK();
}
Status Slot::RebuildPreparedTransferPlan(SlotManifestData &manifest, SlotTakeoverPlan &plan)
{
plan = SlotTakeoverPlan{};
if (FileExist(manifest.recoverySlotPath)) {
SlotManifestData sourceManifest;
SlotSnapshot sourceSnapshot;
RETURN_IF_NOT_OK(LoadRecoveryTransferState(manifest.recoverySlotPath, sourceManifest, sourceSnapshot));
SlotTakeoverRequest rebuildRequest;
rebuildRequest.mode = SlotTakeoverMode::MERGE;
RETURN_IF_NOT_OK(SlotTakeoverPlanner::BuildPlan(manifest.peerSlotPath, manifest.recoverySlotPath,
sourceManifest, sourceSnapshot, slotPath_, manifest,
rebuildRequest, manifest.txnId, plan));
std::unordered_map<uint32_t, uint32_t> fileIdMap;
RETURN_IF_NOT_OK(DecodeTransferFileMap(manifest.transferFileMap, fileIdMap));
if (!fileIdMap.empty()) {
for (auto &mapping : plan.dataMappings) {
auto it = fileIdMap.find(mapping.sourceFileId);
CHECK_FAIL_RETURN_STATUS(it != fileIdMap.end(), StatusCode::K_INVALID,
"Missing source file mapping while rebuilding transfer plan");
mapping.targetFileId = it->second;
mapping.targetDataFile = FormatDataFileName(mapping.targetFileId);
}
auto importIndexPath = JoinPath(slotPath_, plan.importIndexFile);
RETURN_IF_NOT_OK(SlotIndexCodec::EnsureIndexFile(importIndexPath));
RETURN_IF_NOT_OK(SlotIndexCodec::TruncateTail(importIndexPath, SlotIndexCodec::HEADER_SIZE));
std::vector<SlotPutRecord> visiblePuts;
RETURN_IF_NOT_OK(sourceSnapshot.CollectVisiblePuts(visiblePuts));
std::string importPayload;
importPayload.reserve(visiblePuts.size() * 64);
for (const auto &record : visiblePuts) {
auto it = fileIdMap.find(record.fileId);
CHECK_FAIL_RETURN_STATUS(it != fileIdMap.end(), StatusCode::K_INVALID,
"Missing target file mapping for rebuilt visible put");
auto imported = record;
imported.fileId = it->second;
std::string encoded;
RETURN_IF_NOT_OK(SlotIndexCodec::EncodePut(imported, encoded));
importPayload.append(encoded);
}
RETURN_IF_NOT_OK(SlotIndexCodec::AppendEncodedRecords(importIndexPath, importPayload, true));
}
RETURN_IF_NOT_OK(SlotTakeoverPlanner::DumpPlan(slotPath_, plan));
manifest.transferPlanPath = FormatTakeoverPlanFileName(plan.txnId);
manifest.transferFileMap = EncodeTransferFileMap(plan);
RETURN_IF_NOT_OK(PersistManifest(manifest));
return Status::OK();
}
auto importIndexFile = FormatImportIndexFileName(manifest.txnId);
CHECK_FAIL_RETURN_STATUS(FileExist(JoinPath(slotPath_, importIndexFile)), StatusCode::K_NOT_FOUND,
"Cannot rebuild transfer plan without source recovery or import index");
std::unordered_map<uint32_t, uint32_t> fileIdMap;
RETURN_IF_NOT_OK(DecodeTransferFileMap(manifest.transferFileMap, fileIdMap));
plan.txnId = manifest.txnId;
plan.sourceHomeSlotPath = manifest.peerSlotPath;
plan.sourceRecoverySlotPath = manifest.recoverySlotPath;
plan.targetSlotPath = slotPath_;
plan.importIndexFile = importIndexFile;
for (const auto &entry : fileIdMap) {
SlotTakeoverDataFileMapping mapping;
mapping.sourceFileId = entry.first;
mapping.sourceDataFile = FormatDataFileName(entry.first);
mapping.targetFileId = entry.second;
mapping.targetDataFile = FormatDataFileName(entry.second);
plan.dataMappings.emplace_back(std::move(mapping));
}
std::sort(plan.dataMappings.begin(), plan.dataMappings.end(),
[](const SlotTakeoverDataFileMapping &lhs, const SlotTakeoverDataFileMapping &rhs) {
return lhs.sourceFileId < rhs.sourceFileId;
});
RETURN_IF_NOT_OK(SlotTakeoverPlanner::DumpPlan(slotPath_, plan));
manifest.transferPlanPath = FormatTakeoverPlanFileName(plan.txnId);
RETURN_IF_NOT_OK(PersistManifest(manifest));
return Status::OK();
}
bool Slot::AreTransferTargetDataReady(const SlotTakeoverPlan &plan) const
{
return std::all_of(plan.dataMappings.begin(), plan.dataMappings.end(),
[this](const auto &mapping) { return FileExist(JoinPath(slotPath_, mapping.targetDataFile)); });
}
Status Slot::PublishImportBatch(const std::string &indexPath, const std::string &txnId,
const std::string &importIndexFile, bool &published)
{
published = false;
VLOG(1) << "Publishing slot import batch, slotId=" << slotId_ << ", txnId=" << txnId
<< ", activeIndexPath=" << indexPath << ", importIndexFile=" << importIndexFile;
std::vector<SlotRecord> currentRecords;
size_t validBytes = 0;
RETURN_IF_NOT_OK(SlotIndexCodec::ReadAllRecords(indexPath, currentRecords, validBytes));
if (HasClosedImportBatch(currentRecords, txnId)) {
published = true;
return Status::OK();
}
std::string payload;
RETURN_IF_NOT_OK(SlotIndexCodec::EncodeImportBegin(SlotImportRecord{ txnId }, payload));
INJECT_POINT("slotstore.Slot.Takeover.AfterImportBegin", []() { return Status::OK(); });
std::string importContent;
RETURN_IF_NOT_OK(ReadWholeFile(JoinPath(slotPath_, importIndexFile), importContent));
CHECK_FAIL_RETURN_STATUS(importContent.size() >= SlotIndexCodec::HEADER_SIZE, StatusCode::K_RUNTIME_ERROR,
"Import index content is shorter than header");
payload.append(importContent.data() + SlotIndexCodec::HEADER_SIZE,
importContent.size() - SlotIndexCodec::HEADER_SIZE);
INJECT_POINT("slotstore.Slot.Takeover.BeforeImportEnd", []() { return Status::OK(); });
std::string importEndPayload;
RETURN_IF_NOT_OK(SlotIndexCodec::EncodeImportEnd(SlotImportRecord{ txnId }, importEndPayload));
payload.append(importEndPayload);
RETURN_IF_NOT_OK(SlotIndexCodec::AppendEncodedRecords(indexPath, payload, true));
RETURN_IF_NOT_OK(FsyncDir(slotPath_));
published = true;
VLOG(1) << "Published slot import batch successfully, slotId=" << slotId_ << ", txnId=" << txnId
<< ", payloadBytes=" << payload.size();
return Status::OK();
}
Status Slot::PersistSourceTransferManifest(const std::string &sourceRecoveryPath,
const SlotManifestData &targetManifest, const std::string &sourceHomePath,
SlotOperationPhase phase) const
{
CHECK_FAIL_RETURN_STATUS(!sourceRecoveryPath.empty(), StatusCode::K_INVALID,
"sourceRecoveryPath must not be empty");
SlotManifestData sourceManifest;
auto rc = SlotManifest::Load(sourceRecoveryPath, sourceManifest);
if (rc.GetCode() == StatusCode::K_NOT_FOUND) {
sourceManifest = SlotManifest::Bootstrap();
sourceManifest.homeSlotPath = sourceHomePath;
sourceManifest.activeIndex = "index_active.log";
sourceManifest.activeData = { FormatDataFileName(1) };
} else {
RETURN_IF_NOT_OK(rc);
}
sourceManifest.state = SlotState::IN_OPERATION;
sourceManifest.opType = SlotOperationType::TRANSFER;
sourceManifest.opPhase = phase;
sourceManifest.role = SlotOperationRole::SOURCE;
sourceManifest.txnId = targetManifest.txnId;
sourceManifest.ownerEpoch = targetManifest.ownerEpoch;
sourceManifest.homeSlotPath = sourceHomePath;
sourceManifest.recoverySlotPath = sourceRecoveryPath;
sourceManifest.peerSlotPath = slotPath_;
sourceManifest.transferPlanPath = targetManifest.transferPlanPath;
sourceManifest.transferFileMap = targetManifest.transferFileMap;
RETURN_IF_NOT_OK(SlotManifest::StoreAtomic(sourceRecoveryPath, sourceManifest));
RETURN_IF_NOT_OK(FsyncDir(sourceRecoveryPath));
return Status::OK();
}
Status Slot::FinalizeSourceAfterTakeover(const std::string &sourceRecoveryPath, const SlotManifestData &targetManifest)
{
if (!FileExist(sourceRecoveryPath)) {
return Status::OK();
}
VLOG(1) << "Finalizing slot source after takeover, slotId=" << slotId_
<< ", sourceRecoveryPath=" << sourceRecoveryPath << ", txnId=" << targetManifest.txnId;
RETURN_IF_NOT_OK(PersistSourceTransferManifest(sourceRecoveryPath, targetManifest, targetManifest.peerSlotPath,
SlotOperationPhase::TRANSFER_SOURCE_RETIRED));
INJECT_POINT("slotstore.Slot.Takeover.BeforeSourceFinalize", []() { return Status::OK(); });
if (FileExist(sourceRecoveryPath)) {
RETURN_IF_NOT_OK(RemoveAll(sourceRecoveryPath));
}
return Status::OK();
}
}