* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Monitor logger, flush current node's res information and operation time cost
* information, there is two Bounded-buffer to support async write log messages.
*/
#include "datasystem/common/log/access_recorder.h"
#include <algorithm>
#include <cstddef>
#include <sstream>
#include <string>
#include <utility>
#include <unistd.h>
#include "datasystem/common/constants.h"
#include "datasystem/common/log/log.h"
#include "datasystem/common/log/log_sampler.h"
#include "datasystem/common/log/logging.h"
#include "datasystem/common/log/trace.h"
#include "datasystem/common/metrics/hard_disk_exporter/hard_disk_exporter.h"
#include "datasystem/common/util/file_util.h"
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/status_helper.h"
#include "datasystem/utils/status.h"
DS_DECLARE_bool(log_monitor);
DS_DECLARE_string(log_dir);
namespace datasystem {
namespace {
constexpr char LOG_SAMPLED_MARK[] = "logSampled:true";
constexpr size_t EMPTY_REQ_MSG_SIZE = 2;
#define ACCESS_RECORDER_KEY_DEF(keyEnum, keyType) #keyEnum,
static const char *g_keyNames[] = {
#include "datasystem/common/log/access_point.def"
};
#undef ACCESS_RECORDER_KEY_DEF
#define ACCESS_RECORDER_KEY_DEF(keyEnum, keyType) AccessKeyType::keyType,
static AccessKeyType gKeyTypes[] = {
#include "datasystem/common/log/access_point.def"
};
#undef ACCESS_RECORDER_KEY_DEF
}
bool IsCurrentRequestLogSampled(AccessKeyType type)
{
if (type == AccessKeyType::REQUEST_OUT || !Trace::Instance().IsRequestLogTrace()) {
return false;
}
if (LogSampler::Instance().IsSamplerEnabledFast()) {
return LogSampler::Instance().IsCurrentRequestSampledIn();
}
return true;
}
std::string FormatAccessReqMsg(AccessKeyType type, const std::string &reqMsg)
{
std::ostringstream logStream;
if (!IsCurrentRequestLogSampled(type)) {
return reqMsg;
}
if (reqMsg.size() >= EMPTY_REQ_MSG_SIZE && reqMsg.front() == '{' && reqMsg.back() == '}') {
logStream.write(reqMsg.data(), reqMsg.size() - 1);
if (reqMsg.size() > EMPTY_REQ_MSG_SIZE) {
logStream << ",";
}
logStream << LOG_SAMPLED_MARK << "}";
return logStream.str();
}
logStream << "{";
if (!reqMsg.empty()) {
logStream << reqMsg << ",";
}
logStream << LOG_SAMPLED_MARK << "}";
return logStream.str();
}
AccessKeyType GetAccessKeyType(AccessRecorderKey key)
{
return gKeyTypes[static_cast<size_t>(key)];
}
AccessRecorder::AccessRecorder(AccessRecorderKey key)
: handleType_(gKeyTypes[static_cast<size_t>(key)]), isRecord_(false)
{
if (key == AccessRecorderKey::DS_ETCD_UNKNOWN) {
shouldRecord_ = false;
isRecord_ = true;
} else {
shouldRecord_ = LogSampler::Instance().ShouldRecordAccess(key);
if (shouldRecord_) {
handleName_ = g_keyNames[static_cast<size_t>(key)];
beg_ = clock::now();
} else {
isRecord_ = true;
}
}
}
void AccessRecorder::WritePerformance(int code, uint64_t dataSize, uint64_t elapsed,
const RequestParam &reqParam, const std::string &respMsg,
uint64_t asyncElapseTime)
{
if (FLAGS_log_monitor && Logging::AccessRecorderManagerInstance() != nullptr) {
Logging::AccessRecorderManagerInstance()->LogPerformance(handleName_, handleType_, elapsed, code,
std::to_string(dataSize), reqParam.ToString(),
respMsg, asyncElapseTime);
}
}
AccessRecorder::~AccessRecorder()
{
if (!isRecord_) {
LOG(WARNING) << handleName_ << " Not call AccessRecorder::Record()";
}
}
AccessRecorder::AccessRecorder(AccessRecorder &&other) noexcept
: beg_(other.beg_),
handleName_(std::move(other.handleName_)),
handleType_(other.handleType_),
isRecord_(other.isRecord_),
shouldRecord_(other.shouldRecord_)
{
other.isRecord_ = true;
other.shouldRecord_ = false;
}
AccessRecorder &AccessRecorder::operator=(AccessRecorder &&other) noexcept
{
if (this == &other) {
return *this;
}
beg_ = other.beg_;
handleName_ = std::move(other.handleName_);
handleType_ = other.handleType_;
isRecord_ = other.isRecord_;
shouldRecord_ = other.shouldRecord_;
other.isRecord_ = true;
other.shouldRecord_ = false;
return *this;
}
uint64_t AccessRecorder::ElapsedUs() const
{
return std::chrono::duration_cast<std::chrono::microseconds>(clock::now() - beg_).count();
}
void AccessRecorder::WriteObject(int code, uint64_t dataSize, uint64_t elapsedUs, const RequestParam &req,
const std::string &respMsg, uint64_t asyncElapsedUs)
{
WritePerformance(code, dataSize, elapsedUs, req, respMsg, asyncElapsedUs);
}
void AccessRecorder::WriteStream(int code, uint64_t elapsedUs, const StreamRequestParam &req,
const StreamResponseParam &rsp)
{
if (FLAGS_log_monitor && Logging::AccessRecorderManagerInstance() != nullptr) {
Logging::AccessRecorderManagerInstance()->LogPerformance(handleName_, handleType_, elapsedUs, code, "0",
req.ToString(), rsp.ToString());
}
}
ObjectAccessRecorder AccessRecorder::Object(AccessRecorderKey key)
{
return ObjectAccessRecorder(key);
}
StreamAccessRecorder AccessRecorder::Stream(AccessRecorderKey key)
{
return StreamAccessRecorder(key);
}
RequestOutRecorder AccessRecorder::RequestOut(AccessRecorderKey key)
{
return RequestOutRecorder(key);
}
void FieldEntry::Fill(std::string &out) const
{
switch (kind) {
case NUMERIC:
out = std::to_string(numericValue);
break;
case TEXT_VIEW:
out = textView.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT);
break;
case TEXT_OWNED:
out = textOwned.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT);
break;
case TRACKED_TRANSPORT:
out = AccessTransportTracker::ToString();
break;
case PROVIDER:
if (provider) {
out = provider();
}
break;
default:
break;
}
}
void ObjectKeyState::FillObjectKey(RequestParam &req) const
{
switch (kind) {
case VIEW:
req.objectKey = view.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT);
break;
case OWNED:
req.objectKey = owned.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT);
break;
case VECTOR_REF:
req.objectKey = objectKeysToString(*vectorRef);
break;
case VECTOR_SUMMARY_REF:
req.objectKey = ObjectKeysToAbbrStr(*vectorRef);
break;
case PROTO_REF: {
std::vector<std::string> keys;
keys.reserve(protoRef->size());
for (const auto &k : *protoRef) {
keys.emplace_back(k);
}
req.objectKey = objectKeysToString(keys);
break;
}
case C_ARRAY_REF:
req.objectKey = objectKeysToString(cArray.keys, cArray.lens, cArray.count);
break;
case PROVIDER:
req.objectKey = provider();
break;
default:
break;
}
}
void ObjectKeyState::FillNestedKey(RequestParam &req) const
{
switch (kind) {
case VIEW:
req.nestedKey = view.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT);
break;
case OWNED:
req.nestedKey = owned.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT);
break;
case VECTOR_REF:
req.nestedKey = objectKeysToString(*vectorRef);
break;
case C_ARRAY_REF:
req.nestedKey = objectKeysToString(cArray.keys, cArray.lens, cArray.count);
break;
case PROVIDER:
req.nestedKey = provider();
break;
default:
break;
}
}
ObjectAccessRecorder::ObjectAccessRecorder(AccessRecorderKey key) : core_(key), state_() {}
ObjectAccessRecorder &ObjectAccessRecorder::ObjectKeyRef(std::string_view key)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.objectKey.kind = ObjectKeyState::VIEW;
state_.objectKey.view = key;
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::ObjectKeyOwned(const std::string &key)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.objectKey.kind = ObjectKeyState::OWNED;
state_.objectKey.owned = key;
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::ObjectKeyOwned(std::string &&key)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.objectKey.kind = ObjectKeyState::OWNED;
state_.objectKey.owned = std::move(key);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::ObjectKeysRef(const std::vector<std::string> &keys)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.objectKey.kind = ObjectKeyState::VECTOR_REF;
state_.objectKey.vectorRef = &keys;
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::ObjectKeysRef(const google::protobuf::RepeatedPtrField<std::string> &keys)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.objectKey.kind = ObjectKeyState::PROTO_REF;
state_.objectKey.protoRef = &keys;
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::ObjectKeysRef(const char *const *keys, const size_t *keyLens,
uint64_t keyCount)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.objectKey.kind = ObjectKeyState::C_ARRAY_REF;
state_.objectKey.cArray.keys = keys;
state_.objectKey.cArray.lens = keyLens;
state_.objectKey.cArray.count = keyCount;
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::ObjectKeysSummaryRef(const std::vector<std::string> &keys)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.objectKey.kind = ObjectKeyState::VECTOR_SUMMARY_REF;
state_.objectKey.vectorRef = &keys;
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::NestedKeysRef(const std::vector<std::string> &keys)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.nestedKey.kind = ObjectKeyState::VECTOR_REF;
state_.nestedKey.vectorRef = &keys;
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::NestedKeysRef(const char *const *keys, const size_t *keyLens,
uint64_t keyCount)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.nestedKey.kind = ObjectKeyState::C_ARRAY_REF;
state_.nestedKey.cArray.keys = keys;
state_.nestedKey.cArray.lens = keyLens;
state_.nestedKey.cArray.count = keyCount;
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::Keep(bool keep)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.keep.SetNumeric(keep ? 1 : 0);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::WriteMode(int writeMode)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.writeMode.SetNumeric(writeMode);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::WriteModeText(std::string_view writeMode)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.writeMode.SetView(writeMode);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::ConsistencyType(int consistencyType)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.consistencyType.SetNumeric(consistencyType);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::ConsistencyTypeText(std::string_view consistencyType)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.consistencyType.SetView(consistencyType);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::IsSeal(bool isSeal)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.isSeal.SetNumeric(isSeal ? 1 : 0);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::IsRetry(bool isRetry)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.isRetry.SetNumeric(isRetry ? 1 : 0);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::TtlSecond(uint32_t ttlSecond)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.ttlSecond.SetNumeric(ttlSecond);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::Existence(int existence)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.existence.SetNumeric(existence);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::SubTimeoutMs(uint64_t timeoutMs)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.subTimeout.SetNumeric(timeoutMs);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::TimeoutMs(uint64_t timeoutMs)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.timeout.SetNumeric(timeoutMs);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::CacheType(int cacheType)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.cacheType.SetNumeric(cacheType);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::RemoteClientId(std::string_view remoteClientId)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.remoteClientId.SetView(remoteClientId);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::TransportType(std::string_view transportType)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.transportType.SetView(transportType);
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::TransportTypeOwned(std::string transportType)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.transportType.SetOwned(std::move(transportType));
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::TrackedTransportType()
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.transportType.SetTrackedTransport();
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::DataSize(uint64_t dataSize)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.dataSize.value = dataSize;
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::Result(const Status &rc)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.code = rc.GetCode();
state_.respMsg = rc.GetMsg();
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::Result(StatusCode code, std::string_view msg)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.code = static_cast<int>(code);
state_.respMsg = msg;
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::Result(int code, std::string_view msg)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.code = code;
state_.respMsg = msg;
return *this;
}
ObjectAccessRecorder &ObjectAccessRecorder::AsyncElapsedUs(uint64_t asyncElapsedUs)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.asyncElapsedUs = asyncElapsedUs;
return *this;
}
void ObjectAccessRecorder::Record()
{
if (!core_.ShouldRecordInternal()) {
core_.MarkRecorded();
return;
}
uint64_t elapsed = core_.ElapsedUs();
RequestParam req;
state_.objectKey.FillObjectKey(req);
state_.nestedKey.FillNestedKey(req);
state_.writeMode.Fill(req.writeMode);
state_.consistencyType.Fill(req.consistencyType);
state_.remoteClientId.Fill(req.remoteClientId);
state_.transportType.Fill(req.transportType);
state_.keep.Fill(req.keep);
state_.isSeal.Fill(req.isSeal);
state_.isRetry.Fill(req.isRetry);
state_.ttlSecond.Fill(req.ttlSecond);
state_.existence.Fill(req.existence);
state_.subTimeout.Fill(req.subTimeout);
state_.timeout.Fill(req.timeout);
state_.cacheType.Fill(req.cacheType);
core_.WriteObject(state_.code, state_.dataSize.Resolve(), elapsed, req, state_.respMsg, state_.asyncElapsedUs);
core_.MarkRecorded();
}
StreamAccessRecorder::StreamAccessRecorder(AccessRecorderKey key) : core_(key), state_() {}
StreamAccessRecorder &StreamAccessRecorder::StreamName(std::string_view streamName)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.streamName = streamName;
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::ProducerId(std::string_view producerId)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.producerId = producerId;
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::ConsumerId(std::string_view consumerId)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.consumerId = consumerId;
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::ClientId(std::string_view clientId)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.clientId = clientId;
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::DelayFlushTime(int64_t value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.delayFlushTime = Optional<int64_t>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::PageSize(int64_t value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.pageSize = Optional<int64_t>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::MaxStreamSize(uint64_t value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.maxStreamSize = Optional<uint64_t>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::AutoCleanup(bool value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.autoCleanup = Optional<bool>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::RetainForNumConsumers(uint64_t value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.retainForNumConsumers = Optional<uint64_t>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::EncryptStream(bool value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.encryptStream = Optional<bool>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::SubscriptionName(std::string_view subscriptionName)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.subscriptionName = subscriptionName;
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::AutoAck(bool value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.autoAck = Optional<bool>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::ReserveSize(uint64_t value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.reserveSize = Optional<uint64_t>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::StreamMode(int32_t value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.streamMode = Optional<int32_t>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::Count(uint64_t value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.count = Optional<uint64_t>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::SenderProducerNo(uint64_t value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.senderProducerNo = Optional<uint64_t>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::EnableDataVerification(bool value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.enableDataVerification = Optional<bool>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::StreamNo(uint64_t value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.streamNo = Optional<uint64_t>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::SharedPageSize(uint64_t value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.sharedPageSize = Optional<uint64_t>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::EnableSharedPage(bool value)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.enableSharedPage = Optional<bool>(value);
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::Result(const Status &rc)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.code = rc.GetCode();
state_.respMsg = rc.GetMsg();
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::Result(StatusCode code, std::string_view msg)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.code = static_cast<int>(code);
state_.respMsg = msg;
return *this;
}
StreamAccessRecorder &StreamAccessRecorder::Result(int code, std::string_view msg)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.code = code;
state_.respMsg = msg;
return *this;
}
void StreamAccessRecorder::Record()
{
if (!core_.ShouldRecordInternal()) {
core_.MarkRecorded();
return;
}
uint64_t elapsed = core_.ElapsedUs();
StreamRequestParam req;
req.streamName = state_.streamName;
req.producerId = state_.producerId;
req.consumerId = state_.consumerId;
req.clientId = state_.clientId;
req.delayFlushTime = state_.delayFlushTime;
req.pageSize = state_.pageSize;
req.maxStreamSize = state_.maxStreamSize;
req.autoCleanup = state_.autoCleanup;
req.retainForNumConsumers = state_.retainForNumConsumers;
req.encryptStream = state_.encryptStream;
req.subscriptionName = state_.subscriptionName;
req.autoAck = state_.autoAck;
req.reserveSize = state_.reserveSize;
req.streamMode = state_.streamMode;
StreamResponseParam rsp;
rsp.msg = state_.respMsg.empty() ? "OK" : state_.respMsg;
rsp.count = state_.count;
rsp.senderProducerNo = state_.senderProducerNo;
rsp.enableDataVerification = state_.enableDataVerification;
rsp.streamNo = state_.streamNo;
rsp.sharedPageSize = state_.sharedPageSize;
rsp.enableSharedPage = state_.enableSharedPage;
core_.WriteStream(state_.code, elapsed, req, rsp);
core_.MarkRecorded();
}
RequestOutRecorder::RequestOutRecorder(AccessRecorderKey key) : core_(key), state_() {}
RequestOutRecorder &RequestOutRecorder::OutReq(std::string_view req)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.outReqKind = RequestOutState::VIEW;
state_.outReqView = req;
return *this;
}
RequestOutRecorder &RequestOutRecorder::OutReqOwned(std::string req)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.outReqKind = RequestOutState::OWNED;
state_.outReqOwned = std::move(req);
return *this;
}
RequestOutRecorder &RequestOutRecorder::DataSize(uint64_t dataSize)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.dataSize = dataSize;
return *this;
}
RequestOutRecorder &RequestOutRecorder::AsyncElapsedUs(uint64_t asyncElapsedUs)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.asyncElapsedUs = asyncElapsedUs;
return *this;
}
RequestOutRecorder &RequestOutRecorder::Result(int code, std::string_view msg)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.code = code;
state_.respMsg = msg;
return *this;
}
RequestOutRecorder &RequestOutRecorder::Result(StatusCode code, std::string_view msg)
{
if (!core_.ShouldRecordInternal()) {
return *this;
}
state_.code = static_cast<int>(code);
state_.respMsg = msg;
return *this;
}
void RequestOutRecorder::Record()
{
if (!core_.ShouldRecordInternal()) {
core_.MarkRecorded();
return;
}
uint64_t elapsed = core_.ElapsedUs();
RequestParam req;
switch (state_.outReqKind) {
case RequestOutState::VIEW:
req.outReq = state_.outReqView;
break;
case RequestOutState::OWNED:
req.outReq = state_.outReqOwned;
break;
case RequestOutState::PROVIDER:
req.outReq = state_.outReqProvider();
break;
default:
break;
}
core_.WriteObject(state_.code, state_.dataSize, elapsed, req, state_.respMsg, state_.asyncElapsedUs);
core_.MarkRecorded();
}
Status AccessRecorderManager::Init(bool isClient, bool isEmbeddedClient)
{
isClient_ = isClient;
if (FLAGS_log_monitor) {
RETURN_IF_NOT_OK_PRINT_ERROR_MSG(ResetWriteLogger(isEmbeddedClient), "AccessRecorder init failed");
}
return Status::OK();
}
Status AccessRecorderManager::ResetWriteLogger(bool isEmbeddedClient)
{
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(Logging::CreateLogDir(), K_NOT_READY, "Log file creation failed");
std::vector<std::pair<AccessKeyType, std::string>> typeList;
if (isClient_) {
std::string clientAccessLogName = Logging::GetClientLogName(CLIENT_ACCESS_LOG_NAME, getpid());
std::string accessLogName = Logging::GetClientAccessLogName();
if (Logging::ValidateLogName(accessLogName)) {
clientAccessLogName = std::move(accessLogName);
}
typeList = { std::make_pair(AccessKeyType::CLIENT, FLAGS_log_dir + "/" + clientAccessLogName + ".log") };
} else {
typeList = { std::make_pair(AccessKeyType::ACCESS, FLAGS_log_dir + "/" + ACCESS_LOG_NAME + ".log"),
std::make_pair(AccessKeyType::REQUEST_OUT, FLAGS_log_dir + "/" + REQUEST_OUT_LOG_NAME + ".log") };
if (isEmbeddedClient) {
typeList.emplace_back(AccessKeyType::CLIENT, FLAGS_log_dir + "/" + CLIENT_ACCESS_LOG_NAME + ".log");
}
}
for (const auto &kv : typeList) {
auto exporter = std::make_unique<HardDiskExporter>();
RETURN_IF_NOT_OK_PRINT_ERROR_MSG(exporter->Init(kv.second), "Exporter init failed.");
exporterMap_.emplace(kv.first, std::move(exporter));
}
return Status::OK();
}
Status AccessRecorderManager::SubmitWriteMessage()
{
if (exporterMap_.empty()) {
const std::string errMsg = "AccessRecorder is not init.";
if (FLAGS_log_monitor) {
LOG(ERROR) << errMsg;
}
RETURN_STATUS(K_NOT_READY, errMsg);
}
for (auto &iter : exporterMap_) {
iter.second->SubmitWriteMessage();
}
return Status::OK();
}
AccessRecorderManager::~AccessRecorderManager() = default;
Status AccessRecorderManager::LogPerformance(const std::string &handleName, AccessKeyType type, int64_t microsecond,
int code, const std::string &dataSize, const std::string &reqMsg,
const std::string &respMsg, uint64_t asyncElapseTime)
{
if (exporterMap_.empty()) {
const std::string errMsg = "AccessRecorder is not init.";
if (FLAGS_log_monitor) {
LOG(ERROR) << errMsg;
}
RETURN_STATUS(K_NOT_READY, errMsg);
}
if (type == AccessKeyType::REQUEST_OUT && isClient_) {
return Status::OK();
}
std::ostringstream logStream;
logStream << code << " | " << handleName << " | " << microsecond;
logStream << " | " << dataSize << " | " << FormatAccessReqMsg(type, reqMsg);
logStream << " | " << respMsg;
if (type == AccessKeyType::REQUEST_OUT) {
logStream << " | " << asyncElapseTime;
}
auto it = exporterMap_.find(type);
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(it != exporterMap_.end(), K_RUNTIME_ERROR,
FormatString("Access type: %zu not found!", int(type)));
static Uri uri(__FILE__);
it->second->Send(logStream.str(), uri, __LINE__);
return Status::OK();
}
std::string objectKeysToString(const std::vector<std::string> &keys)
{
std::string ret = "[";
uint32_t len = 0;
uint32_t count = 0;
for (const auto &key : keys) {
if (count >= LOG_TOTAL_KEYS_SIZE_LIMIT) {
ret.append("***").append(",").append("total:").append(std::to_string(keys.size())).append("]");
return ret;
}
if (len > LOG_OBJECT_KEY_SIZE_LIMIT - key.length()) {
ret.append("total:").append(std::to_string(keys.size())).append("]");
return ret;
}
count += 1;
len = std::min(len + static_cast<uint32_t>(key.length()), static_cast<uint32_t>(LOG_OBJECT_KEY_SIZE_LIMIT));
ret.append(key.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT)).append(",");
}
if (ret.length() > 1) {
ret.pop_back();
}
ret.append("]");
return ret;
}
std::string objectKeysToString(const google::protobuf::RepeatedPtrField<std::string> &keys)
{
std::string ret = "[";
uint32_t len = 0;
uint32_t count = 0;
for (const auto &key : keys) {
if (count >= LOG_TOTAL_KEYS_SIZE_LIMIT) {
ret.append("***").append(",").append("total:").append(std::to_string(keys.size())).append("]");
return ret;
}
if (len > LOG_OBJECT_KEY_SIZE_LIMIT - key.length()) {
ret.append("total:").append(std::to_string(keys.size())).append("]");
return ret;
}
count += 1;
len = std::min(len + static_cast<uint32_t>(key.length()),
static_cast<uint32_t>(LOG_OBJECT_KEY_SIZE_LIMIT));
ret.append(key.substr(0, LOG_OBJECT_KEY_SIZE_LIMIT)).append(",");
}
if (ret.length() > 1) {
ret.pop_back();
}
ret.append("]");
return ret;
}
std::string objectKeysToString(const char *const *cKey, size_t keyLen)
{
std::string ret = "[";
uint32_t totalDisplayLen = 0;
for (size_t i = 0; i < keyLen; i++) {
if (cKey[i] == nullptr) {
continue;
}
size_t declaredLen = strlen(cKey[i]);
if (declaredLen == 0) {
continue;
}
size_t boundedLen = std::min(declaredLen, static_cast<size_t>(LOG_TOTAL_KEYS_SIZE_LIMIT));
size_t displayLen = std::min(boundedLen, static_cast<size_t>(LOG_OBJECT_KEY_SIZE_LIMIT));
if (totalDisplayLen > LOG_TOTAL_KEYS_SIZE_LIMIT - displayLen) {
ret.append("total:").append(std::to_string(keyLen)).append("]");
return ret;
}
totalDisplayLen += displayLen;
ret.append(cKey[i], displayLen).append(",");
}
if (ret.length() > 1) {
ret.pop_back();
}
ret.append("]");
return ret;
}
std::string objectKeysToString(const char *const *cKey, const size_t *cKeyLens, size_t keyLen)
{
std::string ret = "[";
uint32_t totalDisplayLen = 0;
for (size_t i = 0; i < keyLen; i++) {
if (cKey[i] == nullptr) {
continue;
}
size_t declaredLen = (cKeyLens != nullptr) ? cKeyLens[i] : strlen(cKey[i]);
if (declaredLen == 0) {
continue;
}
size_t boundedLen = std::min(declaredLen, static_cast<size_t>(LOG_TOTAL_KEYS_SIZE_LIMIT));
size_t displayLen = std::min(boundedLen, static_cast<size_t>(LOG_OBJECT_KEY_SIZE_LIMIT));
if (totalDisplayLen > LOG_TOTAL_KEYS_SIZE_LIMIT - displayLen) {
ret.append("total:").append(std::to_string(keyLen)).append("]");
return ret;
}
totalDisplayLen += displayLen;
ret.append(cKey[i], displayLen).append(",");
}
if (ret.length() > 1) {
ret.pop_back();
}
ret.append("]");
return ret;
}
std::string RequestParam::ToString() const
{
std::string ret = "{";
if (!outReq.empty()) {
ret.append(outReq).append("}");
return ret;
}
if (!objectKey.empty()) {
ret.append("Object_key:").append(objectKey).append(",");
}
if (!nestedKey.empty()) {
ret.append("Nested_keys:").append(nestedKey).append(",");
}
if (!keep.empty()) {
ret.append("keep:").append(keep).append(",");
}
if (!writeMode.empty()) {
ret.append("Write_mode:").append(writeMode).append(",");
}
if (!consistencyType.empty()) {
ret.append("consistency_type:").append(consistencyType).append(",");
}
if (!isSeal.empty()) {
ret.append("is_seal:").append(isSeal).append(",");
}
if (!isRetry.empty()) {
ret.append("is_retry:").append(isRetry).append(",");
}
if (!ttlSecond.empty()) {
ret.append("ttl_second:").append(ttlSecond).append(",");
}
if (!subTimeout.empty()) {
ret.append("sub_timeout:").append(subTimeout).append(",");
}
if (!timeout.empty()) {
ret.append("timeout:").append(timeout).append(",");
}
if (!cacheType.empty()) {
ret.append("cacheType:").append(cacheType).append(",");
}
if (!transportType.empty()) {
ret.append("transportType:").append(transportType).append(",");
}
if (ret.length() > 1) {
ret.pop_back();
}
ret.append("}");
return ret;
}
std::string StreamRequestParam::ToString() const
{
std::stringstream ss;
ss << "{stream_name:" << streamName;
if (!producerId.empty()) {
ss << ",producer_id:" << producerId;
}
if (!consumerId.empty()) {
ss << ",consumer_id:" << consumerId;
}
if (!clientId.empty()) {
ss << ",client_id:" << clientId;
}
if (delayFlushTime) {
ss << ",delay_flush_time:" << delayFlushTime.value();
}
if (pageSize) {
ss << ",page_size:" << pageSize.value();
}
if (maxStreamSize) {
ss << ",max_stream_size:" << maxStreamSize.value();
}
if (autoCleanup) {
ss << ",auto_cleanup:" << autoCleanup.value();
}
if (retainForNumConsumers) {
ss << ",retain_for_num_consumers:" << retainForNumConsumers.value();
}
if (encryptStream) {
ss << ",encrypt_stream:" << encryptStream.value();
}
if (!subscriptionName.empty()) {
ss << ",subscription_name:" << subscriptionName;
}
if (autoAck) {
ss << ",auto_ack:" << autoAck.value();
}
if (streamMode) {
ss << ",stream_mode:" << streamMode.value();
}
ss << "}";
return ss.str();
}
std::string StreamResponseParam::ToString() const
{
std::stringstream ss;
ss << "{msg:" << (msg.empty() ? "OK" : msg.c_str());
if (count) {
ss << ",count:" << count.value();
}
if (senderProducerNo) {
ss << ",sender_producer_no:" << senderProducerNo.value();
}
if (enableDataVerification) {
ss << ",enable_data_verification:" << enableDataVerification.value();
}
if (streamNo) {
ss << ",stream_no:" << streamNo.value();
}
if (enableSharedPage) {
ss << ",enable_shared_page:" << enableSharedPage.value();
}
if (sharedPageSize) {
ss << ",shared_page_size:" << sharedPageSize.value();
}
ss << "}";
return ss.str();
}
thread_local AccessTransportKind AccessTransportTracker::current_ = AccessTransportKind::SHM;
void AccessTransportTracker::Reset()
{
current_ = AccessTransportKind::SHM;
}
void AccessTransportTracker::Record(AccessTransportKind kind)
{
current_ = kind;
}
std::string AccessTransportTracker::ToString()
{
switch (current_) {
case AccessTransportKind::UB:
return "UB";
case AccessTransportKind::TCP:
return "TCP";
case AccessTransportKind::SHM:
default:
return "SHM";
}
}
}