* Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Object spilling implementation.
*/
#include "datasystem/worker/object_cache/worker_oc_spill.h"
#include <iterator>
#include <limits>
#include <shared_mutex>
#include <string>
#include <utility>
#include <dirent.h>
#include <fcntl.h>
#include <linux/falloc.h>
#include "datasystem/common/flags/flags.h"
#include "datasystem/common/log/log.h"
#include "datasystem/common/iam/tenant_auth_manager.h"
#include "datasystem/common/inject/inject_point.h"
#include "datasystem/common/perf/perf_manager.h"
#include "datasystem/common/constants.h"
#include "datasystem/common/util/file_util.h"
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/memory.h"
#include "datasystem/common/util/net_util.h"
#include "datasystem/common/util/raii.h"
#include "datasystem/common/util/status_helper.h"
#include "datasystem/common/util/strings_util.h"
#include "datasystem/common/util/thread_local.h"
#include "datasystem/common/util/uuid_generator.h"
#include "datasystem/common/util/gflag/eviction_watermark.h"
#include "datasystem/common/util/validator.h"
#include "datasystem/common/log/trace.h"
#include "datasystem/utils/status.h"
DS_DEFINE_string(spill_directory, "",
"The path and file name prefix of the spilling, empty means spill disabled. The path length must less "
"than 4095 characters. If spill_type is local_disk, this para must be set.");
DS_DEFINE_validator(spill_directory, &Validator::ValidatePathString);
DS_DEFINE_uint64(spill_size_limit, 0,
"Maximum amount of spilled data that can be stored in the spill directory. If spill is enable and "
"spill_size_limit is 0, spill_size_limit will be set to 95% of the spill directory.");
DS_DEFINE_uint64(
spill_file_max_size_mb, 200,
"The size limit of single spill file, spilling objects which lager than that value with one object per "
"file. If there are some big objects, you can increase this value to avoid run out of inodes quickly. "
"The valid range is 200-10240.");
DS_DEFINE_validator(spill_file_max_size_mb, &Validator::ValidateSpillFileMaxSize);
DS_DEFINE_uint32(
spill_thread_num, 8,
"It represents the maximum parallelism of writing files, more threads will consume more CPU and I/O resources.");
DS_DEFINE_validator(spill_thread_num, &Validator::ValidateThreadNum);
DS_DEFINE_uint64(
spill_file_open_limit, 512,
"The maximum number of open file descriptors about spill. If opened file exceed this value, some files "
"will be temporarily closed to prevent exceeding the maximum system limit. You need reduce this value if "
"your system resources are limited. The valid range is greater than or equal to 8.");
DS_DEFINE_validator(spill_file_open_limit, &Validator::ValidateSpillOpenFileLimit);
DS_DEFINE_bool(spill_enable_readahead, true,
"Disable readahead can mitigate the read amplification problem for offset read, default is true");
namespace datasystem {
namespace object_cache {
constexpr int SPILL_FILE_MODE = 0600;
constexpr size_t MB = 1024 * 1024;
constexpr int MAX_OPEN_RETRY = 10;
constexpr int BACKOFF_TIME_MS = 200;
constexpr int INVALID_FD = -1;
constexpr int PERMISSION = 0700;
std::atomic<uint64_t> totalSpillFileDiskSize{ 0 };
std::atomic<int64_t> openFdCount{ 0 };
SpillBuffer::SpillBuffer()
{
data_.reserve(SpillFileManager::LARGE_OBJ_SIZE_THRESHOLD);
}
void SpillBuffer::Append(const std::string &objectKey, const char *input, size_t size)
{
PerfPoint point(PerfKey::WORKER_SPILL_APPEND_BUFFER);
size_t offset = data_.size();
(void)index_.emplace(objectKey, std::pair<size_t, size_t>{ offset, size });
(void)data_.append(input, size);
VLOG(1) << "Append buffer " << objectKey << ", offset " << offset << ", size " << size;
}
void SpillBuffer::Remove(const std::string &objectKey)
{
(void)index_.erase(objectKey);
}
Status SpillBuffer::CopyTo(const std::string &objectKey, const char *output, size_t size, size_t offset)
{
PerfPoint point(PerfKey::WORKER_SPILL_COPY_FROM_BUFFER);
const auto &offSize = index_[objectKey];
size_t offsetInBuffer = offSize.first;
size_t sz = offSize.second;
CHECK_FAIL_RETURN_STATUS(
size + offset <= sz && size > 0 && offsetInBuffer + sz <= data_.size(), StatusCode::K_RUNTIME_ERROR,
FormatString(
"Invalid size, objectKey=%s, need size=%ld, real size=%ld, offset=%ld, offsetInBuffer=%ld, bufferSize=%ld",
objectKey, size, sz, offset, offsetInBuffer, data_.size()));
LOG(INFO) << FormatString(
"Copy data from spill buffer, objectKey=%s, need size=%ld, real size=%ld, offset=%ld, offsetInBuffer=%ld, "
"bufferSize=%ld",
objectKey, size, sz, offset, offsetInBuffer, data_.size());
int ret = memcpy_s((void *)output, size, data_.data() + offset + offsetInBuffer, size);
CHECK_FAIL_RETURN_STATUS(ret == EOK, StatusCode::K_RUNTIME_ERROR,
FormatString("Copy data failed, memcpy_s return %d, oobjectKey=%s, need size=%ld, real "
"size=%ld, offset=%ld, offsetInBuffer=%ld, "
"bufferSize=%ld",
objectKey, size, sz, offset, offsetInBuffer, data_.size()));
return Status::OK();
}
Status SpillBuffer::CopyTo(const std::string &objectKey, std::vector<RpcMessage> &messages, size_t size, size_t offset)
{
PerfPoint point(PerfKey::WORKER_SPILL_COPY_BUFFER_TO_MESSAGE);
const auto &offSize = index_[objectKey];
size_t offsetInBuffer = offSize.first;
size_t sz = offSize.second;
CHECK_FAIL_RETURN_STATUS(
size + offset <= sz && size > 0 && offsetInBuffer + sz <= data_.size(), StatusCode::K_RUNTIME_ERROR,
FormatString(
"Invalid size, objectKey=%s, need size=%ld, real size=%ld, offset=%ld, offsetInBuffer=%ld, bufferSize=%ld",
objectKey, size, sz, offset, offsetInBuffer, data_.size()));
LOG(INFO) << FormatString(
"Copy data from spill buffer, objectKey=%s, need size=%ld, real size=%ld, offset=%ld, offsetInBuffer=%ld, "
"bufferSize=%ld",
objectKey, size, sz, offset, offsetInBuffer, data_.size());
return CopyAndSplitBuffer(TenantAuthManager::ExtractTenantId(objectKey), data_.data() + offset + offsetInBuffer,
size, messages);
}
bool SpillBuffer::Exist(const std::string &objectKey, uint64_t &size)
{
auto iter = index_.find(objectKey);
if (iter == index_.end()) {
return false;
}
size = iter->second.second;
return true;
}
bool SpillBuffer::UniqObject(const std::string &objectKey)
{
return index_.count(objectKey) > 0 && index_.size() == 1;
}
size_t SpillBuffer::Size()
{
return data_.size();
}
const char *SpillBuffer::GetData()
{
return data_.data();
}
const std::unordered_map<std::string, std::pair<size_t, size_t>> &SpillBuffer::GetIndex()
{
return index_;
}
void SpillBuffer::Reset()
{
data_.clear();
index_.clear();
}
void SpillBuffer::CloneTo(SpillBuffer &buffer)
{
for (const auto &kv : index_) {
const auto &objectKey = kv.first;
auto offset = kv.second.first;
auto size = kv.second.second;
buffer.Append(objectKey, data_.data() + offset, size);
}
}
Status ActiveSpillFile::Read(void *buffer, size_t count, off_t offset)
{
return ReadFile(fd_, buffer, count, offset);
}
Status ActiveSpillFile::ReadToRpcMessage(size_t size, size_t offset, std::vector<RpcMessage> &messages)
{
const size_t maxInt = std::numeric_limits<int32_t>::max();
while (size > 0) {
size_t bufSize = std::min(size, maxInt);
messages.emplace_back();
RETURN_IF_NOT_OK(messages.back().Resize(bufSize));
PerfPoint p(PerfKey::WORKER_SPILL_READ_FILE);
RETURN_IF_NOT_OK(ReadFile(fd_, messages.back().Data(), bufSize, offset));
p.Record();
size -= bufSize;
offset += bufSize;
}
return Status::OK();
}
Status ActiveSpillFile::Write(const void *buffer, size_t count, off_t offset)
{
INJECT_POINT("worker.Spill.Write");
return WriteFile(fd_, buffer, count, offset);
}
void ActiveSpillFile::Sync()
{
INJECT_POINT("worker.Spill.Sync", [] {});
(void)syncfs(fd_);
}
int ActiveSpillFile::GetFd()
{
return fd_;
}
ActiveSpillFile::~ActiveSpillFile()
{
if (fd_ == INVALID_FD) {
return;
}
RETRY_ON_EINTR(close(fd_));
openFdCount--;
if (openFdCount < 0) {
openFdCount = 0;
}
VLOG(1) << FormatString("Close fd %d success, openFdCount: %ld", fd_, openFdCount.load());
fd_ = INVALID_FD;
}
SpillFileManager::~SpillFileManager()
{
for (auto &tenantInfo : tenant2FileInfo_) {
for (auto &fileInfo : tenantInfo.second) {
if (fileInfo.second.file != nullptr) {
fileInfo.second.file = nullptr;
}
VLOG(1) << "Delete file in deconstruct: " << fileInfo.first << ", id:" << id_;
auto status = DeleteFile(fileInfo.first);
if (status.IsError()) {
LOG(ERROR) << "Delete file " << fileInfo.first << " failed, status: " << status.ToString();
}
}
}
}
void SpillFileManager::Init(const std::string &directory)
{
spillDir_ = directory;
tenant2FileInfo_[DEFAULT_TENANT_ID] = {};
}
Status SpillFileManager::SpillSmallObject(const std::string &objectKey, const void *buffer, size_t size)
{
PerfPoint p(PerfKey::WORKER_SPILL_TO_BUFFER);
std::unique_lock<std::shared_timed_mutex> lock(fileInfoMutex_);
buffer_.Append(objectKey, static_cast<const char *>(buffer), size);
if (buffer_.Size() < SpillFileManager::LARGE_OBJ_SIZE_THRESHOLD) {
return Status::OK();
}
PerfPoint point(PerfKey::WORKER_SPILL_FLUSH_BUFFER);
SpillBuffer spillBuffer;
buffer_.CloneTo(spillBuffer);
const auto &index = spillBuffer.GetIndex();
ObjectLocation flushLoc;
std::shared_ptr<ActiveSpillFile> file;
RETURN_IF_NOT_OK(FindBestWriteFile(DEFAULT_TENANT_ID, spillBuffer.Size(), flushLoc, file));
Timer timer;
RETURN_IF_NOT_OK(file->Write(spillBuffer.GetData(), flushLoc.size, flushLoc.offset));
auto writeElapsed = timer.ElapsedMilliSecond();
for (const auto &kv : index) {
const auto &objectKey = kv.first;
const auto &offset = kv.second.first;
const auto &size = kv.second.second;
ObjectLocation loc;
loc.path = flushLoc.path;
loc.offset = flushLoc.offset + offset;
loc.size = size;
UpdateSpillInfo(DEFAULT_TENANT_ID, objectKey, loc);
}
buffer_.Reset();
lock.unlock();
timer.Reset();
file->Sync();
auto syncElapsed = timer.ElapsedMilliSecond();
LOG(INFO) << FormatString(
"Id: %d, Flush %d objects and %d bytes from buffer to file, fd: %d, write: %fms, sync: %fms.", id_,
index.size(), spillBuffer.Size(), file->GetFd(), writeElapsed, syncElapsed);
return Status::OK();
}
Status SpillFileManager::Spill(const std::string &objectKey,
const std::vector<std::pair<const uint8_t *, uint64_t>> &payloads, uint64_t size)
{
PerfPoint point(PerfKey::WORKER_SPILL);
auto tenantId = TenantAuthManager::ExtractTenantId(objectKey);
std::replace(tenantId.begin(), tenantId.end(), '/', '_');
if (size <= sizeThreshold_ && tenantId == DEFAULT_TENANT_ID) {
CHECK_FAIL_RETURN_STATUS(
payloads.size() == 1, StatusCode::K_RUNTIME_ERROR,
FormatString("Spill small object %s[size: %ld] but payload size is %ld", objectKey, size, payloads.size()));
return SpillSmallObject(objectKey, payloads[0].first, payloads[0].second);
}
INJECT_POINT("worker.Spill", [](int waitTimeMs) {
std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeMs));
return Status::OK();
});
Timer timer;
std::unique_lock<std::shared_timed_mutex> lock(fileInfoMutex_);
auto waitLockElapsed = timer.ElapsedMilliSecond();
PerfPoint p(PerfKey::WORKER_SPILL_WRITE_FILE);
ObjectLocation loc;
std::shared_ptr<ActiveSpillFile> file;
RETURN_IF_NOT_OK(FindBestWriteFile(tenantId, size, loc, file));
tenant2FileInfo_[tenantId][loc.path].objectKeys.insert(objectKey);
lock.unlock();
timer.Reset();
Status rc = WriteFile(file, payloads, loc.offset);
if (rc.IsError()) {
std::unique_lock<std::shared_timed_mutex> lock(fileInfoMutex_);
tenant2FileInfo_[tenantId][loc.path].objectKeys.erase(objectKey);
return rc;
}
totalSpillFileDiskSize += size;
auto writeElapsed = timer.ElapsedMilliSecond();
timer.Reset();
file->Sync();
p.Record();
auto syncElapsed = timer.ElapsedMilliSecond();
{
std::unique_lock<std::shared_timed_mutex> lock(fileInfoMutex_);
objLocations_[objectKey] = loc;
}
LOG(INFO) << FormatString(
"Id: %d, Spill object [%s], fd: %d, path: %s, offset: %ld, size: %ld, wait: %fms, write: %fms, syncfs: %fms",
id_, objectKey, file->GetFd(), loc.path, loc.offset, loc.size, waitLockElapsed, writeElapsed, syncElapsed);
return Status::OK();
}
Status SpillFileManager::Spill(const std::string &objectKey, void *buffer, uint64_t size)
{
std::vector<std::pair<const uint8_t *, uint64_t>> payloads{ { static_cast<const uint8_t *>(buffer), size } };
return Spill(objectKey, payloads, size);
}
Status SpillFileManager::GetObjectFileLocation(const std::string &objectKey, ObjectLocation &loc)
{
auto it = objLocations_.find(objectKey);
if (it != objLocations_.end()) {
loc = it->second;
} else {
RETURN_STATUS_LOG_ERROR(StatusCode::K_NOT_FOUND, "Object " + objectKey + " is not found in the spilled file");
}
return Status::OK();
}
Status SpillFileManager::GetFileInfoPtr(const std::string &objectKey, const std::string &path, FileInfo *&outFileInfo)
{
auto tenantId = TenantAuthManager::ExtractTenantId(objectKey);
std::replace(tenantId.begin(), tenantId.end(), '/', '_');
CHECK_FAIL_RETURN_STATUS(tenant2FileInfo_.count(tenantId) > 0, StatusCode::K_NOT_FOUND,
FormatString("Cannot find tenant %s in the tenantInfoMap", tenantId));
auto &fileInfoMap = tenant2FileInfo_[tenantId];
CHECK_FAIL_RETURN_STATUS(fileInfoMap.count(path) > 0, StatusCode::K_NOT_FOUND,
FormatString("Cannot find the file %s in the fileInfoMap", path));
outFileInfo = &fileInfoMap[path];
return Status::OK();
}
Status SpillFileManager::GetFileInfoAndReopenFile(const std::string &objectKey, const std::string &path,
FileInfo *&outFileInfo, bool &isReopen)
{
RETURN_IF_NOT_OK(GetFileInfoPtr(objectKey, path, outFileInfo));
isReopen = false;
if (outFileInfo->file != nullptr) {
return Status::OK();
}
int fd;
RETURN_IF_NOT_OK(RetryFileOperation(MAX_OPEN_RETRY, BACKOFF_TIME_MS, [&path, &fd]() {
return OpenFile(path, O_RDWR | O_CREAT, SPILL_FILE_MODE, &fd);
}));
DisableReadAheadIfNeed(fd);
openFdCount++;
isReopen = true;
outFileInfo->file = std::make_shared<ActiveSpillFile>(fd);
VLOG(1) << FormatString("Reopen file: %s, objectKey: %s, openFdCount: %ld", path, objectKey, openFdCount.load());
return Status::OK();
}
void SpillFileManager::CloseFileIfExceedLimit(FileInfo *fileInfo)
{
if (FdCountExceedLimit() && fileInfo->file != nullptr) {
fileInfo->file = nullptr;
VLOG(1) << "Close exceed file: " << fileInfo->path;
}
}
bool SpillFileManager::FdCountExceedLimit()
{
return static_cast<uint64_t>(openFdCount) >= FLAGS_spill_file_open_limit;
}
Status SpillFileManager::CreateSpillFile(const std::string &directory, SpillFileType spillFileType,
std::string &outFileURL, int *outFd)
{
std::string fileUrl;
do {
fileUrl = directory + "/" + GetStringUuid();
} while (FileExist(fileUrl));
RETURN_IF_NOT_OK(RetryFileOperation(MAX_OPEN_RETRY, BACKOFF_TIME_MS, [&fileUrl, &directory, outFd]() {
RETURN_IF_NOT_OK(CreateDir(directory, true, PERMISSION));
return OpenFile(fileUrl, O_RDWR | O_CREAT, SPILL_FILE_MODE, outFd);
}));
DisableReadAheadIfNeed(*outFd);
outFileURL = fileUrl;
openFdCount++;
LOG(INFO) << "[OCCreateSpillFile] FileType(0:large/1:small): " << static_cast<unsigned int>(spillFileType)
<< " path: " << outFileURL << ", openFdCount: " << openFdCount;
return Status::OK();
}
Status SpillFileManager::FindBestWriteFile(const std::string &tenantId, size_t size, ObjectLocation &outLocation,
std::shared_ptr<ActiveSpillFile> &file)
{
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(size < SIZE_MAX, K_RUNTIME_ERROR,
FormatString("The size is overflow, size:%d > SIZE_MAX:%d", size, SIZE_MAX));
SpillFileType spillFileType =
size >= LARGE_OBJ_SIZE_THRESHOLD ? SpillFileType::LARGE_OBJ_FILE : SpillFileType::SMALL_OBJ_FILE;
auto &fileInfoMap = tenant2FileInfo_[tenantId];
auto iter = fileInfoMap.begin();
while (iter != fileInfoMap.end()) {
if (iter->second.full || iter->second.file == nullptr) {
CloseFileIfExceedLimit(&iter->second);
++iter;
continue;
}
if (iter->second.spillFileType != spillFileType) {
++iter;
continue;
}
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(
std::numeric_limits<size_t>::max() - iter->second.size > size, K_RUNTIME_ERROR,
FormatString("The size is overflow, size:%d + add:%d > SIZE_MAX:%d", iter->second.size, size, SIZE_MAX));
if (iter->second.size + size > FLAGS_spill_file_max_size_mb * MB) {
iter->second.full = true;
if (iter->second.objectKeys.empty() && spillFileType == SpillFileType::LARGE_OBJ_FILE) {
iter->second.file = nullptr;
LOG_IF_ERROR(DeleteFile(iter->second.path), "[Spill] Delete Empty file Fail");
iter = fileInfoMap.erase(iter);
} else {
CloseFileIfExceedLimit(&iter->second);
++iter;
}
continue;
}
outLocation = ObjectLocation{ .path = iter->first, .offset = iter->second.size, .size = size };
RETURN_RUNTIME_ERROR_IF_NULL(iter->second.file);
file = iter->second.file;
iter->second.size += size;
return Status::OK();
}
CHECK_FAIL_RETURN_STATUS(!spillDir_.empty(), K_RUNTIME_ERROR, "Variable directory should not be empty.");
auto dir = spillDir_ + "/" + tenantId;
std::string newFilePath;
int newFd;
RETURN_IF_NOT_OK(CreateSpillFile(dir, spillFileType, newFilePath, &newFd));
auto isFull = size >= FLAGS_spill_file_max_size_mb * MB;
file = std::make_shared<ActiveSpillFile>(newFd);
fileInfoMap[newFilePath] = FileInfo{
.file = file, .size = size, .holesSize = 0, .full = isFull, .spillFileType = spillFileType, .path = newFilePath
};
outLocation = ObjectLocation{ .path = newFilePath, .offset = 0, .size = size };
return Status::OK();
}
Status SpillFileManager::WriteFile(const std::shared_ptr<ActiveSpillFile> &file,
const std::vector<std::pair<const uint8_t *, uint64_t>> &payloads, uint64_t offset)
{
Status rc;
CHECK_FAIL_RETURN_STATUS(file != nullptr, K_RUNTIME_ERROR, "file is null");
for (const auto &payload : payloads) {
rc = file->Write(payload.first, payload.second, offset);
if (rc.IsOk()) {
offset += payload.second;
} else {
break;
}
}
return rc;
}
void SpillFileManager::UpdateSpillInfo(const std::string &tenantId, const std::string &objectKey,
const ObjectLocation &location)
{
(void)tenant2FileInfo_[tenantId][location.path].objectKeys.insert(objectKey);
objLocations_[objectKey] = location;
}
Status SpillFileManager::LoadFromDisk(const std::string &objectKey, void *buffer, size_t size, size_t offset)
{
PerfPoint point(PerfKey::WORKER_SPILL_GET);
auto readBufferFunc = [&objectKey, buffer, offset, size](SpillBuffer &spillBuffer) {
return spillBuffer.CopyTo(objectKey, static_cast<const char *>(buffer), size, offset);
};
auto readFileFunc = [buffer, offset, size](std::shared_ptr<ActiveSpillFile> &file, uint64_t objectOffset,
uint64_t objectSize) {
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(SIZE_MAX - size >= offset, K_RUNTIME_ERROR,
FormatString("size: %zu + offset: %zu > SIZE_MAX", size, offset));
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(
size + offset <= objectSize && size > 0, K_RUNTIME_ERROR,
FormatString("Invalid size: %zu, offset: %zu, objectSize: %zu", size, offset, objectSize));
return file->Read(buffer, size, objectOffset + offset);
};
return LoadFromDiskImpl(objectKey, readBufferFunc, readFileFunc);
}
Status SpillFileManager::LoadFromDisk(const std::string &objectKey, std::vector<RpcMessage> &messages, size_t size,
size_t offset)
{
PerfPoint point(PerfKey::WORKER_SPILL_GET_TO_MESSAGE);
auto readBufferFunc = [&objectKey, &messages, size, offset](SpillBuffer &spillBuffer) {
return spillBuffer.CopyTo(objectKey, messages, size, offset);
};
auto readFileFunc = [&messages, size, offset](std::shared_ptr<ActiveSpillFile> &file, uint64_t objectOffset,
uint64_t objectSize) {
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(SIZE_MAX - size >= offset, K_RUNTIME_ERROR,
FormatString("size: %zu + offset: %zu > SIZE_MAX", size, offset));
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(
size + offset <= objectSize && size > 0, K_RUNTIME_ERROR,
FormatString("Invalid size: %zu, offset: %zu, objectSize: %zu", size, offset, objectSize));
return file->ReadToRpcMessage(size, objectOffset + offset, messages);
};
return LoadFromDiskImpl(objectKey, readBufferFunc, readFileFunc);
}
Status SpillFileManager::DeleteFromDisk(const std::string &objectKey, uint64_t &decSize)
{
PerfPoint point(PerfKey::WORKER_SPILL_DELETE);
Timer timer;
std::lock_guard<std::shared_timed_mutex> fileLock(fileInfoMutex_);
auto waitElapsed = timer.ElapsedMilliSecond();
workerOperationTimeCost.Append("Delete From Disk Get lock", timer.ElapsedMilliSecond());
if (buffer_.Exist(objectKey, decSize)) {
if (buffer_.UniqObject(objectKey)) {
buffer_.Reset();
} else {
buffer_.Remove(objectKey);
}
return Status::OK();
}
ObjectLocation loc;
RETURN_IF_NOT_OK(GetObjectFileLocation(objectKey, loc));
const auto &filename = loc.path;
LOG(INFO) << FormatString("Id: %d, Delete object [%s], path: %s, offset: %ld, size: %ld, wait lock: %fms", id_,
objectKey, filename, loc.offset, loc.size, waitElapsed);
decSize = loc.size;
auto tenantId = TenantAuthManager::ExtractTenantId(objectKey);
std::replace(tenantId.begin(), tenantId.end(), '/', '_');
CHECK_FAIL_RETURN_STATUS(tenant2FileInfo_.count(tenantId) > 0, StatusCode::K_NOT_FOUND,
FormatString("Cannot find tenant %s in the tenantInfoMap", tenantId));
auto &fileInfoMap = tenant2FileInfo_[tenantId];
CHECK_FAIL_RETURN_STATUS(fileInfoMap.count(filename) > 0, StatusCode::K_NOT_FOUND,
FormatString("Cannot find the file %s in the fileInfoMap", filename));
auto &fileInfo = fileInfoMap[filename];
CHECK_FAIL_RETURN_STATUS(fileInfo.objectKeys.erase(objectKey) == 1, StatusCode::K_RUNTIME_ERROR,
FormatString("Failed to erase object %s from fileInfoMap", objectKey));
(void)objLocations_.erase(objectKey);
fileInfo.holesSize += decSize;
DeleteLargeObj(objectKey, loc, fileInfoMap);
return Status::OK();
}
void SpillFileManager::DeleteLargeObj(const std::string &objectKey, const ObjectLocation &loc,
std::unordered_map<std::string, FileInfo> &fileInfoMap)
{
if (loc.size >= LARGE_OBJ_SIZE_THRESHOLD) {
auto &fileInfo = fileInfoMap[loc.path];
if (fileInfo.objectKeys.empty() && fileInfo.full) {
LOG(INFO) << FormatString("Delete empty spill file %s immediately", loc.path);
if (fileInfo.file != nullptr) {
fileInfo.file = nullptr;
}
Status rc = DeleteFile(loc.path);
LOG_IF_ERROR(rc, "failed to delete file");
(void)fileInfoMap.erase(loc.path);
if (totalSpillFileDiskSize >= loc.size) {
totalSpillFileDiskSize -= loc.size;
} else {
LOG(WARNING) << FormatString("Computation overflow: totalSpillFileDiskSize=%llu, loc.size=%llu",
totalSpillFileDiskSize.load(), loc.size);
totalSpillFileDiskSize = 0;
}
} else {
std::lock_guard<std::shared_timed_mutex> queueLock(fallocateQueueMutex_);
fallocateQueue_.emplace_back(objectKey, loc);
}
}
}
Status SpillFileManager::FallocateInplace(const ObjectLocation &loc)
{
int tmpFd;
Status rc = RetryFileOperation(MAX_OPEN_RETRY, BACKOFF_TIME_MS, [&loc, &tmpFd]() {
return OpenFile(loc.path, O_RDWR | O_CREAT, SPILL_FILE_MODE, &tmpFd);
});
if (rc.IsError()) {
LOG(ERROR) << FormatString("[ProcessFallocateQueue] Failed to open file %s with error:%d", loc.path, errno);
return rc;
}
int ret = fallocate(tmpFd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, loc.offset, loc.size);
if (ret != 0) {
LOG(ERROR) << FormatString("[ProcessFallocateQueue] Failed to fallocate file with error:%d", errno);
}
RETRY_ON_EINTR(close(tmpFd));
return Status::OK();
}
void SpillFileManager::ProcessFallocateQueue()
{
std::vector<std::pair<std::string, ObjectLocation>> fallocateQueueCopy;
{
std::lock_guard<std::shared_timed_mutex> lock(fallocateQueueMutex_);
fallocateQueueCopy.swap(fallocateQueue_);
}
for (const auto &objectLocationIter : fallocateQueueCopy) {
const auto &objectKey = objectLocationIter.first;
const auto &loc = objectLocationIter.second;
VLOG(1) << "Process Fallocate task"
<< ", objectKey: " << objectKey << ", path: " << loc.path << ", offset: " << loc.offset
<< ", size: " << loc.size;
ProcessFallocateOneTask(objectKey, loc);
}
}
void SpillFileManager::ProcessFallocateOneTask(const std::string &objectKey, const ObjectLocation &loc)
{
if (totalSpillFileDiskSize >= loc.size) {
totalSpillFileDiskSize -= loc.size;
} else {
LOG(WARNING) << FormatString("Computation overflow: totalSpillFileDiskSize=%llu, loc.size=%llu",
totalSpillFileDiskSize.load(), loc.size);
totalSpillFileDiskSize = 0;
}
auto tenantId = TenantAuthManager::ExtractTenantId(objectKey);
std::replace(tenantId.begin(), tenantId.end(), '/', '_');
{
std::lock_guard<std::shared_timed_mutex> lock(fileInfoMutex_);
auto &fileInfoMap = tenant2FileInfo_[tenantId];
auto iter = fileInfoMap.find(loc.path);
if (iter == fileInfoMap.end()) {
VLOG(1) << "file has been deleted, skip it.";
return;
}
auto &fileInfo = iter->second;
if (fileInfo.objectKeys.empty() && fileInfo.full && fileInfo.spillFileType == SpillFileType::LARGE_OBJ_FILE) {
if (fileInfo.file != nullptr) {
fileInfo.file = nullptr;
}
Status rc = DeleteFile(loc.path);
if (rc.IsError()) {
LOG_IF_ERROR(rc, "[ProcessFallocateQueue] failed to delete file");
return;
}
LOG(INFO) << FormatString("Spill file %s has been removed as all objs expired.", loc.path);
if (fileInfoMap.erase(loc.path) != 1) {
LOG(ERROR) << "[ProcessFallocateQueue] failed to erase file from fileInfoMap: " << loc.path;
}
return;
}
}
Status rc = FallocateInplace(loc);
LOG_IF_ERROR(rc, FormatString("FallocateInplace failed, objectKey: %s", objectKey));
}
Status SpillFileManager::CompactFiles(float holeSizeRatioThreshold)
{
ProcessFallocateQueue();
std::vector<HoleFileInfo> holeFileArray;
GetHoleFiles(holeSizeRatioThreshold, holeFileArray);
throttle_.Reset();
for (auto &iter : holeFileArray) {
const std::string &tenantId = iter.tenantId;
const std::string &oldSpillFilename = iter.spillFilename;
FileInfo &fileinfo = *(iter.fileinfo);
size_t oldFileSize = fileinfo.size;
RETURN_IF_NOT_OK(CompactFile(tenantId, oldSpillFilename, fileinfo));
RETURN_IF_NOT_OK(DeleteFile(oldSpillFilename));
if (totalSpillFileDiskSize >= oldFileSize) {
totalSpillFileDiskSize -= oldFileSize;
} else {
LOG(WARNING) << FormatString("Computation overflow: totalSpillFileDiskSize=%llu, oldFileSize=%llu",
totalSpillFileDiskSize.load(), oldFileSize);
totalSpillFileDiskSize = 0;
}
LOG(INFO) << FormatString("[Compact] old Spill file %s remove.", oldSpillFilename);
}
return Status::OK();
}
void SpillFileManager::GetHoleFiles(float holeSizeRatioThreshold, std::vector<HoleFileInfo> &holeFileArray)
{
std::shared_lock<std::shared_timed_mutex> l(fileInfoMutex_);
for (const auto &tenantInfo : tenant2FileInfo_) {
for (const auto &fileinfo : tenantInfo.second) {
if (!fileinfo.second.full || fileinfo.second.spillFileType == SpillFileType::LARGE_OBJ_FILE) {
VLOG(1) << FormatString("[Compact] File %s not full or not large obj file.", fileinfo.first);
continue;
}
if (fileinfo.second.size == 0) {
LOG(WARNING) << FormatString("[Compact] File %s is empty.", fileinfo.first);
continue;
}
float holeSizeRatio =
static_cast<float>(fileinfo.second.holesSize) / static_cast<float>(fileinfo.second.size);
VLOG(1) << FormatString("[Compact] File %s hole ratio %d, threshold %d.", fileinfo.first, holeSizeRatio,
holeSizeRatioThreshold);
if (holeSizeRatio > holeSizeRatioThreshold) {
holeFileArray.emplace_back(HoleFileInfo{ holeSizeRatio, tenantInfo.first, fileinfo.first,
const_cast<FileInfo *>(&fileinfo.second) });
}
}
}
std::sort(holeFileArray.begin(), holeFileArray.end(),
[](HoleFileInfo &a, HoleFileInfo &b) { return a.holeSizeRatio > b.holeSizeRatio; });
}
Status SpillFileManager::CompactFile(const std::string &tenantId, const std::string &oldSpillFilename,
FileInfo &fileinfo)
{
LOG(INFO) << FormatString("[Compact] start to Compact file %s, holesSize:%d, size:%d.", oldSpillFilename,
fileinfo.holesSize, fileinfo.size);
std::string newFilePath;
int newFd = -1;
auto dir = spillDir_ + "/" + tenantId;
RETURN_IF_NOT_OK(CreateSpillFile(dir, fileinfo.spillFileType, newFilePath, &newFd));
FileInfo newFileInfo = { .file = std::make_shared<ActiveSpillFile>(newFd),
.size = 0,
.holesSize = 0,
.full = false,
.spillFileType = fileinfo.spillFileType,
.path = newFilePath };
std::unordered_map<std::string, ObjectLocation> newObjectLocationsMap;
Status rc = CopyObjects(fileinfo, newFilePath, newFileInfo, newObjectLocationsMap);
if (rc.IsError()) {
LOG(ERROR) << "[Compact] failed to CopyObjects: " << rc.ToString();
newFileInfo.file = nullptr;
Status ret = DeleteFile(newFilePath);
LOG_IF_ERROR(ret, "[Compact] failed to delete new file");
return rc;
}
totalSpillFileDiskSize += newFileInfo.size;
uint64_t usedSize = 0;
std::lock_guard<std::shared_timed_mutex> lock(fileInfoMutex_);
auto &fileInfoMap = tenant2FileInfo_[tenantId];
fileInfoMap[newFilePath] = newFileInfo;
for (const auto &objectKey : fileinfo.objectKeys) {
auto &loc = newObjectLocationsMap[objectKey];
UpdateSpillInfo(tenantId, objectKey, loc);
usedSize += loc.size;
VLOG(1) << "[Compact] update object meta: " << objectKey;
}
FileInfo &newFileinfo = fileInfoMap[newFilePath];
newFileinfo.holesSize = newFileinfo.size - usedSize;
LOG(INFO) << FormatString("[Compact] update %d objects meta info, newFile holesize:%d", fileinfo.objectKeys.size(),
newFileinfo.holesSize);
CHECK_FAIL_RETURN_STATUS(fileInfoMap.erase(oldSpillFilename) == 1, K_RUNTIME_ERROR,
FormatString("[Compact] Fail to erase file from fileInfoMap %s", oldSpillFilename));
LOG(INFO) << FormatString("[Compact] old fileInfo remove from map, Compact file success %s -> %s.",
oldSpillFilename, newFilePath);
if (fileinfo.file != nullptr) {
fileinfo.file = nullptr;
}
return Status::OK();
}
Status SpillFileManager::CopyObjects(const FileInfo &fileinfo, const std::string &newFilePath, FileInfo &newFileinfo,
std::unordered_map<std::string, ObjectLocation> &newObjectLocationsMap)
{
ObjectLocation oldLoc;
std::unordered_map<std::string, ObjectLocation> oldObjectLocationsMap;
{
std::shared_lock<std::shared_timed_mutex> lock(fileInfoMutex_);
for (auto &objectKey : fileinfo.objectKeys) {
RETURN_IF_NOT_OK(GetObjectFileLocation(objectKey, oldLoc));
oldObjectLocationsMap[objectKey] = oldLoc;
}
}
int tmpOldFileFd;
RETURN_IF_NOT_OK(RetryFileOperation(MAX_OPEN_RETRY, BACKOFF_TIME_MS, [&fileinfo, &tmpOldFileFd]() {
return OpenFile(fileinfo.path, O_RDWR | O_CREAT, SPILL_FILE_MODE, &tmpOldFileFd);
}));
Raii raii([&tmpOldFileFd]() { RETRY_ON_EINTR(close(tmpOldFileFd)); });
ObjectLocation newLocation;
auto compactionBuffer = std::make_unique<uint8_t[]>(LARGE_OBJ_SIZE_THRESHOLD);
for (auto &objectLocation : oldObjectLocationsMap) {
const auto &objectKey = objectLocation.first;
oldLoc = objectLocation.second;
RETURN_IF_NOT_OK(ReadFile(tmpOldFileFd, compactionBuffer.get(), oldLoc.size, oldLoc.offset));
newLocation.path = newFilePath;
newLocation.size = oldLoc.size;
newLocation.offset = newFileinfo.size;
newFileinfo.size += oldLoc.size;
newObjectLocationsMap[objectKey] = newLocation;
RETURN_IF_NOT_OK(newFileinfo.file->Write(compactionBuffer.get(), newLocation.size, newLocation.offset));
VLOG(1) << "[Compact] copy object: " << objectKey << ", path: " << newLocation.path
<< ", offset: " << newLocation.offset << ", size: " << newLocation.size;
throttle_.LimitIORate(newLocation.size);
}
newFileinfo.file->Sync();
LOG(INFO) << FormatString("[Compact] copy %d objects from %s to new file %s success.", oldObjectLocationsMap.size(),
fileinfo.path, newFileinfo.path);
return Status::OK();
}
std::string SpillFileManager::GetObjectLocation(const std::string &objectKey)
{
uint64_t size = 0;
if (buffer_.Exist(objectKey, size)) {
return SPILL_BUFFER;
}
ObjectLocation loc;
Status rc = GetObjectFileLocation(objectKey, loc);
if (rc.IsError()) {
return "";
}
return loc.path;
}
void SpillFileManager::DisableReadAheadIfNeed(int fd)
{
if (!FLAGS_spill_enable_readahead) {
int ret = posix_fadvise(fd, 0, 0, MADV_RANDOM);
if (ret != 0) {
LOG(WARNING) << "posix_fadvise MADV_RANDOM failed: " << StrErr(errno);
}
}
}
WorkerOcSpill *WorkerOcSpill::Instance()
{
static WorkerOcSpill instance;
return &instance;
}
WorkerOcSpill::~WorkerOcSpill()
{
LOG(INFO) << "WorkerOcSpill exit";
stopCompaction_ = true;
waitPost_.Set();
if (spillCompactionThread_.joinable()) {
spillCompactionThread_.join();
}
}
Status WorkerOcSpill::Init()
{
CHECK_FAIL_RETURN_STATUS(fileMgr_.empty(), K_RUNTIME_ERROR, "The Spill File Manager has already been Initialized.");
std::string realSpillDirectory = FLAGS_spill_directory + SPILL_PATH_PREFIX;
if (!FLAGS_spill_directory.empty()) {
RETURN_IF_NOT_OK_PRINT_ERROR_MSG(RemoveAll(realSpillDirectory), "[Spill] RemoveAll failed");
RETURN_IF_NOT_OK_PRINT_ERROR_MSG(CreateDir(realSpillDirectory, true, PERMISSION),
"create spill directory failed");
freeSpaceInRealTime_ = GetFreeSpaceBytes(realSpillDirectory);
initial95SpillFreeSpace_ = GetFreeSpaceBytes(realSpillDirectory) / 100u * 95u;
LOG(INFO) << FormatString("[SPill] spill_size_limit automatically set to %llu bytes.",
initial95SpillFreeSpace_);
}
for (uint32_t i = 0; i < FLAGS_spill_thread_num; i++) {
auto tmpMgr = std::make_unique<SpillFileManager>(i);
tmpMgr->Init(realSpillDirectory);
fileMgr_.emplace_back(std::move(tmpMgr));
}
totalActiveSpilledSize_ = 0;
stopCompaction_ = false;
spillCompactionThread_ = Thread(&WorkerOcSpill::Compact, this);
LOG(INFO) << "WorkerOcSpill init success, spill_directory: " << realSpillDirectory
<< ", spill_size_limit: " << GetSpillLimitSize();
return Status::OK();
}
void WorkerOcSpill::Compact()
{
TraceGuard traceGuard = Trace::Instance().SetTraceUUID();
LOG(INFO) << "[Compact] compact spill files thread start to run.";
uint64_t compactIntervalMs = 10 * 1000;
if (!IsEnabled()) {
LOG(INFO) << "[Compact] spill is disabled, exit.";
return;
}
uint64_t diskSpaceThreshold = GetSpillLimitSize() / 100u * 90u;
LOG(INFO) << "[Compact] diskSpaceThreshold is: " << diskSpaceThreshold;
std::string realSpillDirectory = FLAGS_spill_directory + SPILL_PATH_PREFIX;
while (!stopCompaction_) {
auto diskSpaceThreshold = GetSpillLimitSize() / 100u * 90u;
freeSpaceInRealTime_ = GetFreeSpaceBytes(realSpillDirectory);
LOG(INFO) << "[Spill] freeSpaceInRealTime: " << freeSpaceInRealTime_;
waitPost_.WaitForAndClear(compactIntervalMs);
for (uint32_t mgrIndex = 0; mgrIndex < FLAGS_spill_thread_num && !stopCompaction_; mgrIndex++) {
Status rc = fileMgr_[mgrIndex]->CompactFiles(SpillFileManager::HOLE_SIZE_RATIO_THRESHOLD_50);
LOG_IF_ERROR(rc, "failed to CompactFiles");
}
if (totalSpillFileDiskSize >= diskSpaceThreshold) {
VLOG(1) << "[Compact] start to compact spill files as disk usage is:" << totalSpillFileDiskSize;
for (uint32_t mgrIndex = 0; mgrIndex < FLAGS_spill_thread_num && !stopCompaction_; mgrIndex++) {
Status rc = fileMgr_[mgrIndex]->CompactFiles(SpillFileManager::HOLE_SIZE_RATIO_THRESHOLD_30);
LOG_IF_ERROR(rc, "failed to CompactFiles");
}
}
}
LOG(INFO) << "[Compact] compact spill files thread end.";
}
Status WorkerOcSpill::Spill(const std::string &objectKey, const void *buffer, size_t size, bool evictable)
{
std::vector<std::pair<const uint8_t *, uint64_t>> payloads{ { static_cast<const uint8_t *>(buffer), size } };
return Spill(objectKey, payloads, evictable);
}
Status WorkerOcSpill::Spill(const std::string &objectKey,
const std::vector<std::pair<const uint8_t *, uint64_t>> &payloads, bool evictable)
{
uint64_t size = 0;
for (const auto &payload : payloads) {
size += payload.second;
}
if (IsSpaceFull(size)) {
LOG(INFO) << FormatString("[ObjectKey %s] The spill space is full.", objectKey);
RETURN_STATUS(K_NO_SPACE, "No space when WorkerOcSpill::Spill");
}
size_t mgrIndex = GetMgrIndex(objectKey);
VLOG(1) << FormatString("[ObjectKey %s] SpillFileManager: %d", objectKey, mgrIndex);
RETURN_IF_NOT_OK(fileMgr_[mgrIndex]->Spill(objectKey, payloads, size));
totalActiveSpilledSize_ += size;
if (evictable) {
spillEvictionList_.Add(objectKey, size >= SpillFileManager::LARGE_OBJ_SIZE_THRESHOLD ? Q2 : Q1);
}
return Status::OK();
}
Status WorkerOcSpill::Get(const std::string &objectKey, void *buffer, size_t size, size_t offset)
{
size_t mgrIndex = GetMgrIndex(objectKey);
Status status = fileMgr_[mgrIndex]->LoadFromDisk(objectKey, buffer, size, offset);
if (status.IsOk() && spillEvictionList_.Exist(objectKey)) {
spillEvictionList_.Add(objectKey, Q1);
}
return status;
}
Status WorkerOcSpill::Get(const std::string &objectKey, std::vector<RpcMessage> &message, size_t size, size_t offset)
{
size_t mgrIndex = GetMgrIndex(objectKey);
Status status = fileMgr_[mgrIndex]->LoadFromDisk(objectKey, message, size, offset);
if (status.IsOk() && spillEvictionList_.Exist(objectKey)) {
spillEvictionList_.Add(objectKey, Q1);
}
return status;
}
Status WorkerOcSpill::Delete(const std::string &objectKey)
{
size_t mgrIndex = GetMgrIndex(objectKey);
uint64_t decSize = 0;
RETURN_IF_NOT_OK(fileMgr_[mgrIndex]->DeleteFromDisk(objectKey, decSize));
CHECK_FAIL_RETURN_STATUS(totalActiveSpilledSize_ >= decSize, K_RUNTIME_ERROR,
FormatString("Computation overflow: totalActiveSpilledSize_=%llu, decSize=%llu",
totalActiveSpilledSize_.load(), decSize));
totalActiveSpilledSize_ -= decSize;
spillEvictionList_.Erase(objectKey);
return Status::OK();
}
bool WorkerOcSpill::IsEnabled() const
{
return !FLAGS_spill_directory.empty();
}
size_t WorkerOcSpill::GetMgrIndex(const std::string &objectKey)
{
std::hash<std::string> hash;
return hash(objectKey) % FLAGS_spill_thread_num;
}
std::string WorkerOcSpill::GetObjectLocation(const std::string &objectKey)
{
size_t mgrIndex = GetMgrIndex(objectKey);
return fileMgr_[mgrIndex]->GetObjectLocation(objectKey);
}
std::vector<std::string> WorkerOcSpill::ParseGFlagDirectories()
{
if (FLAGS_spill_directory.empty()) {
return {};
}
return Split(FLAGS_spill_directory, ",");
}
std::vector<std::string> WorkerOcSpill::GetSpilledFileName(const std::string &path)
{
bool isDir;
std::vector<std::string> files = {};
if (!FileExist(path)) {
return files;
}
DIR *dir = opendir(path.c_str());
if (dir == nullptr) {
LOG(ERROR) << "GetSpilledFileName open dir failed with " << path;
return files;
}
Raii releaseDir([dir] { closedir(dir); });
struct dirent *ent;
std::string filePath;
while ((ent = readdir(dir)) != nullptr) {
if (!strcmp(ent->d_name, ".") || !strcmp(ent->d_name, "..")) {
continue;
}
filePath = path + "/" + ent->d_name;
if (IsDirectory(filePath, isDir).IsOk() && isDir) {
auto tmpFiles = GetSpilledFileName(filePath);
files.insert(files.end(), tmpFiles.begin(), tmpFiles.end());
}
files.emplace_back(filePath);
}
return files;
}
uint64_t WorkerOcSpill::GetSpilledSize()
{
return totalActiveSpilledSize_;
}
bool WorkerOcSpill::IsSpaceFull(size_t size)
{
return IsSpaceExceed(1.0, size);
}
double WorkerOcSpill::LowWaterFactor() const
{
return GetSpillLowWaterFactor();
}
double WorkerOcSpill::HighWaterFactor() const
{
return GetSpillHighWaterFactor();
}
bool WorkerOcSpill::IsSpaceExceedLWM(size_t size)
{
return IsSpaceExceed(LowWaterFactor(), size);
}
bool WorkerOcSpill::IsSpaceExceedHWM(size_t size)
{
return IsSpaceExceed(HighWaterFactor(), size);
}
bool WorkerOcSpill::IsSpaceExceed(double ratio, size_t size)
{
auto spillSizeLimit = GetSpillLimitSize();
return (totalSpillFileDiskSize + size >= spillSizeLimit * ratio);
}
bool WorkerOcSpill::IsActiveSpillSizeExceedLWM(size_t size)
{
return IsActiveSpillSizeExceed(LowWaterFactor(), size);
}
bool WorkerOcSpill::IsActiveSpillSizeExceedHWM(size_t size)
{
return IsActiveSpillSizeExceed(HighWaterFactor(), size);
}
bool WorkerOcSpill::IsActiveSpillSizeExceed(double ratio, size_t size)
{
auto spillSizeLimit = GetSpillLimitSize();
if (size > std::numeric_limits<size_t>::max() - totalActiveSpilledSize_) {
return true;
}
return (totalActiveSpilledSize_ + size >= spillSizeLimit * ratio);
}
uint64_t WorkerOcSpill::GetRemainActiveSpillSize()
{
return GetSpillLimitSize() - totalActiveSpilledSize_;
}
std::string WorkerOcSpill::GetSpillUsage()
{
auto spillSizeLimit = GetSpillLimitSize();
if (spillSizeLimit == 0) {
return "0/0/0/0";
}
auto workerSpillHardDiskUsage = totalActiveSpilledSize_.load() / static_cast<float>(spillSizeLimit);
return FormatString("%lu/%lu/%lu/%.3f", totalActiveSpilledSize_.load(), totalSpillFileDiskSize.load(),
spillSizeLimit, workerSpillHardDiskUsage);
}
uint64_t WorkerOcSpill::GetSpillLimitSize()
{
return FLAGS_spill_size_limit == 0 ? initial95SpillFreeSpace_ : FLAGS_spill_size_limit;
}
Throttle::Throttle(uint64_t desiredRatePerSec, uint64_t checkIntervalMs)
: desiredRatePerSec_(desiredRatePerSec), checkIntervalMs_(checkIntervalMs), totalBytes_(0)
{
desiredRatePerMs_ = static_cast<float>(desiredRatePerSec_) / TIME_UNIT_CONVERSION;
}
void Throttle::LimitIORate(size_t dataSize)
{
totalBytes_ += dataSize;
uint64_t costTimeMs = static_cast<uint64_t>(timer_.ElapsedMilliSecond());
if (costTimeMs < checkIntervalMs_) {
return;
}
float currentRatePerSec = static_cast<float>(totalBytes_) * TIME_UNIT_CONVERSION / costTimeMs;
VLOG(1) << FormatString("[Throttle] current rate:%f B/s, desired rate:%f B/s, cost:%d.", currentRatePerSec,
desiredRatePerSec_, costTimeMs);
if (currentRatePerSec > desiredRatePerSec_) {
if (desiredRatePerMs_ <= std::numeric_limits<float>::epsilon()) {
LOG(INFO) << FormatString("[Throttle] desired rate(%f) is invalid.", desiredRatePerMs_);
} else {
int64_t sleepTime = static_cast<int64_t>(static_cast<float>(totalBytes_) / desiredRatePerMs_ - costTimeMs);
if (sleepTime > 0) {
LOG(INFO) << FormatString("[Throttle] sleep %d ms to Limit IO Rate.", sleepTime);
std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime));
}
}
}
totalBytes_ = 0;
timer_.Reset();
}
void Throttle::Reset()
{
totalBytes_ = 0;
timer_.Reset();
}
Status Throttle::UpdateDesiredRatePerSec(uint64_t desiredRatePerSec)
{
CHECK_FAIL_RETURN_STATUS(desiredRatePerSec > 0, K_INVALID, "desiredRatePerSec must be greater than 0");
desiredRatePerSec_ = desiredRatePerSec;
desiredRatePerMs_ = static_cast<float>(desiredRatePerSec_) / TIME_UNIT_CONVERSION;
return Status::OK();
}
}
}