* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* plog.cpp
*
* IDENTIFICATION
* src/common/backend/utils/error/plog.cpp
*
* -------------------------------------------------------------------------
*/
#include "utils/plog.h"
#include "utils/palloc.h"
#include "access/xact.h"
#include "tcop/tcopprot.h"
#include "miscadmin.h"
#include "postgres.h"
#include "knl/knl_variable.h"
#define PLOG_NOT_USED (char)(-1)
#define PLOG_ENTRY_MAX_NUM 32
* judge whether the sum of two UINT32 overflow.
* (_sum) and (_val) must be both of UINT32 date type.
*/
#define UINT32_SUM_OVERFLOW(_sum, _val) ((_sum) + (_val) < (_sum))
#define PLOG_ENTRY_MAX_SIZE \
(LOGPIPE_HEADER_SIZE + sizeof(PLogEntryMeta) + sizeof(PLogEntryHead) + sizeof(PLogEntryItem) * PLOG_ENTRY_MAX_NUM)
typedef int (*plog_get_slot)(IndicatorItem*);
static inline void plog_fill_fixed_proto(LogPipeProtoHeader*);
static inline void plog_record_init_meta(PLogEntryMeta*, struct timeval*);
static inline void plog_record_init_head(PLogEntryHead*, int, char, char, char);
static inline void plog_record_init_items(PLogEntryItem*, int);
static inline bool plog_switch_new_msg(struct timeval*, struct timeval);
static inline void write_logmsg_with_one_item(IndicatorItem*, struct timeval*);
static int plog_md_get_slot(IndicatorItem*);
static int plog_obs_get_slot(IndicatorItem*);
static int plog_hdp_get_slot(IndicatorItem*);
static int plog_remote_get_slot(IndicatorItem*);
static void write_logmsg_with_multi_items(PLogMsg*);
static plog_get_slot g_plog_slot_array[DS_VALID_NUM][DSRQ_VALID_NUM] = {
{plog_md_get_slot, plog_md_get_slot, plog_md_get_slot, plog_md_get_slot},
{plog_obs_get_slot, plog_obs_get_slot, plog_obs_get_slot, plog_obs_get_slot},
{plog_hdp_get_slot, plog_hdp_get_slot, plog_hdp_get_slot, plog_hdp_get_slot},
{plog_remote_get_slot, plog_remote_get_slot, plog_remote_get_slot, plog_remote_get_slot}};
* this function MUST be called immediately after t_thrd.mem_cxt.profile_log_mem_cxt
* memory context is created. see its caller.
*
* Maybe these global memory must be malloced in the begin of worker
* thread running, including the followings:
* 1. t_thrd.log_cxt.plog_md_read_entry
* 2. t_thrd.log_cxt.plog_md_write_entry
*/
void init_plog_global_mem(void)
{
* we use "t_thrd.log_cxt.plog_md_read_entry is null pointer" as the IF judgement.
* so all memories like t_thrd.log_cxt.plog_md_read_entry MUST be malloced in IF
* branch.
*/
if (NULL == t_thrd.log_cxt.plog_md_read_entry) {
MemoryContext oldMemCnxt = MemoryContextSwitchTo(t_thrd.mem_cxt.profile_log_mem_cxt);
t_thrd.log_cxt.plog_md_read_entry = (char*)palloc0(PLOG_ENTRY_MAX_SIZE);
t_thrd.log_cxt.plog_md_read_entry[0] = t_thrd.log_cxt.plog_md_read_entry[1] = PLOG_NOT_USED;
t_thrd.log_cxt.plog_md_write_entry = (char*)palloc0(PLOG_ENTRY_MAX_SIZE);
t_thrd.log_cxt.plog_md_write_entry[0] = t_thrd.log_cxt.plog_md_write_entry[1] = PLOG_NOT_USED;
t_thrd.log_cxt.plog_obs_list_entry = (char*)palloc0(PLOG_ENTRY_MAX_SIZE);
t_thrd.log_cxt.plog_obs_list_entry[0] = t_thrd.log_cxt.plog_obs_list_entry[1] = PLOG_NOT_USED;
t_thrd.log_cxt.plog_obs_read_entry = (char*)palloc0(PLOG_ENTRY_MAX_SIZE);
t_thrd.log_cxt.plog_obs_read_entry[0] = t_thrd.log_cxt.plog_obs_read_entry[1] = PLOG_NOT_USED;
t_thrd.log_cxt.plog_obs_write_entry = (char*)palloc0(PLOG_ENTRY_MAX_SIZE);
t_thrd.log_cxt.plog_obs_write_entry[0] = t_thrd.log_cxt.plog_obs_write_entry[1] = PLOG_NOT_USED;
t_thrd.log_cxt.plog_hdp_read_entry = (char*)palloc0(PLOG_ENTRY_MAX_SIZE);
t_thrd.log_cxt.plog_hdp_read_entry[0] = t_thrd.log_cxt.plog_hdp_read_entry[1] = PLOG_NOT_USED;
t_thrd.log_cxt.plog_hdp_write_entry = (char*)palloc0(PLOG_ENTRY_MAX_SIZE);
t_thrd.log_cxt.plog_hdp_write_entry[0] = t_thrd.log_cxt.plog_hdp_write_entry[1] = PLOG_NOT_USED;
t_thrd.log_cxt.plog_hdp_open_entry = (char*)palloc0(PLOG_ENTRY_MAX_SIZE);
t_thrd.log_cxt.plog_hdp_open_entry[0] = t_thrd.log_cxt.plog_hdp_open_entry[1] = PLOG_NOT_USED;
t_thrd.log_cxt.plog_remote_read_entry = (char*)palloc0(PLOG_ENTRY_MAX_SIZE);
t_thrd.log_cxt.plog_remote_read_entry[0] = t_thrd.log_cxt.plog_remote_read_entry[1] = PLOG_NOT_USED;
(void)MemoryContextSwitchTo(oldMemCnxt);
t_thrd.log_cxt.g_plog_msgmem_array[DS_MD][DSRQ_LIST] = NULL;
t_thrd.log_cxt.g_plog_msgmem_array[DS_MD][DSRQ_READ] = t_thrd.log_cxt.plog_md_read_entry;
t_thrd.log_cxt.g_plog_msgmem_array[DS_MD][DSRQ_WRITE] = t_thrd.log_cxt.plog_md_write_entry;
t_thrd.log_cxt.g_plog_msgmem_array[DS_MD][DSRQ_OPEN] = NULL;
t_thrd.log_cxt.g_plog_msgmem_array[DS_OBS][DSRQ_LIST] = t_thrd.log_cxt.plog_obs_list_entry;
t_thrd.log_cxt.g_plog_msgmem_array[DS_OBS][DSRQ_READ] = t_thrd.log_cxt.plog_obs_read_entry;
t_thrd.log_cxt.g_plog_msgmem_array[DS_OBS][DSRQ_WRITE] = t_thrd.log_cxt.plog_obs_write_entry;
t_thrd.log_cxt.g_plog_msgmem_array[DS_OBS][DSRQ_OPEN] = NULL;
t_thrd.log_cxt.g_plog_msgmem_array[DS_HADOOP][DSRQ_LIST] = NULL;
t_thrd.log_cxt.g_plog_msgmem_array[DS_HADOOP][DSRQ_READ] = t_thrd.log_cxt.plog_hdp_read_entry;
t_thrd.log_cxt.g_plog_msgmem_array[DS_HADOOP][DSRQ_WRITE] = t_thrd.log_cxt.plog_hdp_write_entry;
t_thrd.log_cxt.g_plog_msgmem_array[DS_HADOOP][DSRQ_OPEN] = t_thrd.log_cxt.plog_hdp_open_entry;
t_thrd.log_cxt.g_plog_msgmem_array[DS_REMOTE_DATANODE][DSRQ_LIST] = NULL;
t_thrd.log_cxt.g_plog_msgmem_array[DS_REMOTE_DATANODE][DSRQ_READ] = t_thrd.log_cxt.plog_remote_read_entry;
t_thrd.log_cxt.g_plog_msgmem_array[DS_REMOTE_DATANODE][DSRQ_WRITE] = NULL;
t_thrd.log_cxt.g_plog_msgmem_array[DS_REMOTE_DATANODE][DSRQ_OPEN] = NULL;
}
}
* force to flush all the profile logs into syslogger and write them.
* it's triggered before this backend thread exits.
* notice this is not forced when a transaction is committed or aborted,
* just deferred when the next new transaction starts. see also plog_switch_new_msg()
*/
void flush_plog(void)
{
* this interface may be called under bootstrap/standalone mode,
* where all logs are output directly to standard output, including
* profile log data.
* we don't expect this happen, so restrict it when IsUnderPostmaster = true.
*/
if (GTM_FREE_MODE) {
return;
}
if (!IsUnderPostmaster) {
return;
}
for (int i = 0; i < DS_VALID_NUM; i++) {
for (int j = 0; j < DSRQ_VALID_NUM; j++) {
if (t_thrd.log_cxt.g_plog_msgmem_array[i][j]) {
write_logmsg_with_multi_items((PLogMsg*)t_thrd.log_cxt.g_plog_msgmem_array[i][j]);
}
}
}
}
* @Description: aggregate profile logs about MD IO, and write the existing
* data if necessary ( new transaction starts, global query id changes,
* aggregation time is enough, etc).
* @Param[IN] new_item: a new input indicator about read/write item
* @Param[IN] nowtm: the current time
* @Return: void
* @See also:
*/
void aggregate_profile_logs(IndicatorItem* new_item, struct timeval* nowtm)
{
* disk IO error should not be common states, so make it a
* single one record.
*/
if (RET_TYPE_FAIL == new_item->ret_type) {
write_logmsg_with_one_item(new_item, nowtm);
return;
}
PLogMsg* prfdata = (PLogMsg*)t_thrd.log_cxt.g_plog_msgmem_array[new_item->data_src][new_item->req_type];
LogPipeProtoHeader* proto = (LogPipeProtoHeader*)prfdata->msghead;
PLogBasicEntry* entry = &prfdata->msgbody.entry;
PLogEntryMeta* meta = &(entry->basic.meta);
PLogEntryHead* head = &(entry->basic.head);
PLogEntryItem* item = &(entry->basic.item[0]);
PLogEntryItem* tmp_item = NULL;
int slotno = 0;
slotno = (*(g_plog_slot_array[new_item->data_src][new_item->req_type]))(new_item);
Assert(slotno < (int)PLOG_ENTRY_MAX_NUM);
tmp_item = item + slotno;
if (PLOG_NOT_USED == proto->nuls[0] || PLOG_NOT_USED == proto->nuls[1]) {
plog_fill_fixed_proto(proto);
plog_record_init_meta(meta, nowtm);
plog_record_init_head(head, 0, new_item->data_src, new_item->req_type, RET_TYPE_OK);
plog_record_init_items(item, PLOG_ENTRY_MAX_NUM);
} else if (plog_switch_new_msg(nowtm, meta->req_time) || (meta->gxid != GetTopTransactionIdIfAny()) ||
(meta->gqid != (uint64)(u_sess->debug_query_id)) ||
UINT32_SUM_OVERFLOW(tmp_item->sum_usec, new_item->req_usec) ||
UINT32_SUM_OVERFLOW(tmp_item->sum_size, new_item->dat_size)
) {
write_logmsg_with_multi_items(prfdata);
proto->len = 0;
plog_record_init_meta(meta, nowtm);
plog_record_init_head(head, 0, new_item->data_src, new_item->req_type, RET_TYPE_OK);
plog_record_init_items(item, PLOG_ENTRY_MAX_NUM);
}
Assert(head->item_num <= PLOG_ENTRY_MAX_NUM);
if (0 == tmp_item->sum_count) {
++head->item_num;
tmp_item->sum_count = 1;
} else {
++tmp_item->sum_count;
}
tmp_item->sum_usec += new_item->req_usec;
tmp_item->sum_size += new_item->dat_size;
}
* @Description: init logging protocol information.
* LogPipeProtoHeader::len must be updated later before writing.
* @Param[IN] proto: protocol data
* @Return: void
* @See also:
*/
static inline void plog_fill_fixed_proto(LogPipeProtoHeader* proto)
{
proto->nuls[0] = proto->nuls[1] = '\0';
proto->len = 0;
proto->pid = t_thrd.proc_cxt.MyProcPid;
proto->logtype = (char)LOG_TYPE_PLOG;
proto->is_last = 't';
proto->magic = PROTO_HEADER_MAGICNUM;
}
* @Description: init meta data of this log record.
* @Param[IN] meta: meta memory of log record
* @Param[IN] tm: time info about requiring service
* @Return: void
* @See also:
*/
static inline void plog_record_init_meta(PLogEntryMeta* meta, struct timeval* tm)
{
meta->req_time = *tm;
meta->tid = t_thrd.proc_cxt.MyProcPid;
meta->gxid = GetTopTransactionIdIfAny();
meta->plog_magic = PLOG_ENTRY_MAGICNUM;
meta->gqid = u_sess->debug_query_id;
}
* @Description: init head data of this log record.
* @Param[IN] head: head memory of log record
* @Param[IN] nitems: how many items within this log record
* @Param[IN] ds: data source id
* @Param[IN] req_type: required type, read/write etc.
* @Param[IN] ret_type: result type, success or failed.
* @Return: void
* @See also:
*/
static inline void plog_record_init_head(PLogEntryHead* head, int nitems, char ds, char req_type, char ret_type)
{
head->data_src = ds;
head->req_type = req_type;
head->item_num = nitems;
head->ret_type = ret_type;
}
* @Description: init/reset items array
* @Param[IN] item: items array
* @Param[IN] n: items number
* @Return: void
* @See also:
*/
static inline void plog_record_init_items(PLogEntryItem* item, int n)
{
int rc = memset_s(item, sizeof(PLogEntryItem) * n, 0, sizeof(PLogEntryItem) * n);
securec_check_c(rc, "\0", "\0");
}
* @Description: judge whether switch to a new log record
* @Param[IN] current: current timestamp
* @Param[IN] last: last timestamp
* @Return: void
* @See also:
*/
static inline bool plog_switch_new_msg(struct timeval* current, struct timeval last)
{
struct timeval nowtime = *current;
INSTR_TIME_SUBTRACT(nowtime, last);
return TM_IS_BIGGER(nowtime, t_thrd.log_cxt.plog_msg_switch_tm);
}
* @Description: write a single profile log directly into log file.
* @Param[IN] item: an input indicator item
* @Param[IN] nowtm: current timestamp
* @Return: void
* @See also:
*/
static inline void write_logmsg_with_one_item(IndicatorItem* item, struct timeval* nowtm)
{
int size = offsetof(PLogMsg, msgbody) + offsetof(PLogBasicEntry, basic) + offsetof(PLogEntry, item) +
sizeof(PLogEntryItem);
PLogMsg* msg = (PLogMsg*)palloc0(size);
LogPipeProtoHeader* proto = (LogPipeProtoHeader*)&msg->msghead;
plog_fill_fixed_proto(proto);
proto->len = sizeof(PLogEntry);
plog_record_init_meta(&(msg->msgbody.entry.basic.meta), nowtm);
plog_record_init_head(&(msg->msgbody.entry.basic.head), 1, item->data_src, item->req_type, item->ret_type);
PLogEntryItem* tmp_item = &(msg->msgbody.entry.basic.item[0]);
tmp_item->sum_count = 1;
tmp_item->sum_size = item->dat_size;
tmp_item->sum_usec = item->req_usec;
{
Assert(int(LOGPIPE_HEADER_SIZE + proto->len) <= size);
int fd = fileno(stderr);
(void)write(fd, (char*)msg, LOGPIPE_HEADER_SIZE + proto->len);
}
pfree(msg);
msg = NULL;
}
* there are all 32 slots available, and we these ranges:
* R1: step = 16us,
* 0 ~ 16, < 32, < 48, ..., < 320
* R2:
* < 512, < 1024
* R3: step = 2 ms
* < 2ms, < 4ms, ..., < 16ms, < 18ms
* R4:
* the others time
*/
#define MD_R1_STEP 16
#define MD_R1_BOUND 320
#define MD_SLOT_BASE1 (MD_R1_BOUND / MD_R1_STEP)
#define MD_R2_BOUND1 512
#define MD_R2_BOUND2 1024
#define MD_SLOT_BASE2 (MD_SLOT_BASE1 + 2)
#define MD_R3_STEP 2000
#define MD_R3_BOUND 18000
#define MD_SLOT_BASE3 (MD_SLOT_BASE2 + MD_R3_BOUND / MD_R3_STEP)
* @Description: compute which slot to store this new item.
* Core codes about how to collect profile data.
* @Param[IN] item: an input indicator item to search
* @Return: which slot to store this new item
* @See also: macro defination above
*/
static int plog_md_get_slot(IndicatorItem* item)
{
Assert(MD_SLOT_BASE3 < PLOG_ENTRY_MAX_NUM);
if (item->req_usec < MD_R1_BOUND) {
return (item->req_usec / MD_R1_STEP);
}
if (item->req_usec < MD_R2_BOUND2) {
return (item->req_usec < MD_R2_BOUND1) ? MD_SLOT_BASE1 : (MD_SLOT_BASE1 + 1);
}
if (item->req_usec < MD_R3_BOUND) {
return MD_SLOT_BASE2 + (item->req_usec / MD_R3_STEP);
}
return MD_SLOT_BASE3;
}
#define OBS_R1_BOUND 900000
#define OBS_R1_STEP 30000
#define OBS_R1_BASE1 (OBS_R1_BOUND / OBS_R1_STEP)
#define OBS_R2_BOUND 2000000
static int plog_obs_get_slot(IndicatorItem* item)
{
Assert(OBS_R1_BASE1 < PLOG_ENTRY_MAX_NUM);
if (item->req_usec < OBS_R1_BOUND) {
return (item->req_usec / OBS_R1_STEP);
}
return OBS_R1_BASE1 + ((item->req_usec < OBS_R2_BOUND) ? 0 : 1);
}
#define LOG2_MAP_SIZE 1024
static const int log2_map[LOG2_MAP_SIZE] = {0,
0,
1,
2,
2,
3,
3,
3,
3,
4,
4,
4,
4,
4,
4,
4,
4,
5,
5,
5,
5,
5,
5,
5,
5,
5,
5,
5,
5,
5,
5,
5,
5,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
6,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
7,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
8,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
9,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10,
10};
* there are all 32 slots available, and we these ranges:
* R1: step = 50us,
* 0 ~ 50, < 100, < 150, ..., < 1000
* R2: step = 2^n ms
* < 2ms, < 4ms, <8ms..., < 512ms, < 1024ms
* R3:
* the other times
*/
#define HDP_R1_STEP 50
#define HDP_R1_BOUND 1000
#define HDP_SLOT_BASE1 (HDP_R1_BOUND / HDP_R1_STEP)
#define HDP_R2_BOUND 1024000
#define HDP_SLOT_BASE2 (HDP_SLOT_BASE1 + 11)
static int plog_hdp_get_slot(IndicatorItem* item)
{
Assert(HDP_SLOT_BASE2 < PLOG_ENTRY_MAX_NUM);
if (item->req_usec < HDP_R1_BOUND) {
return (item->req_usec / HDP_R1_STEP);
}
if (item->req_usec < HDP_R2_BOUND) {
uint32 idx = item->req_usec / 1000;
Assert(idx < LOG2_MAP_SIZE);
return HDP_SLOT_BASE1 + log2_map[idx];
}
return HDP_SLOT_BASE2;
}
* there are all 21 slots available, and we these ranges:
* R1: step = 100ms,
* 0 ~ 100, < 200, < 300, ..., < 1000
* R2: step = 1000ms
* <1000, <2000,..., < 10000
* R3:
* the others time
*/
#define REMOTE_R1_STEP 100000
#define REMOTE_R1_BOUND 1000000
#define REMOTE_SLOT_BASE1 (REMOTE_R1_BOUND / REMOTE_R1_STEP)
#define REMOTE_R2_STEP 1000000
#define REMOTE_R2_BOUND 10000000
#define REMOTE_SLOT_BASE2 (REMOTE_SLOT_BASE1 + (REMOTE_R2_BOUND / REMOTE_R2_STEP))
* @Description: compute which slot to store this new item.
* Core codes about how to collect profile data.
* @Param[IN] item: an input indicator item to search
* @Return: which slot to store this new item
* @See also: macro defination above
*/
static int plog_remote_get_slot(IndicatorItem* item)
{
Assert(REMOTE_SLOT_BASE2 < PLOG_ENTRY_MAX_NUM);
if (item->req_usec < REMOTE_R1_BOUND) {
return (item->req_usec / REMOTE_R1_STEP);
}
if (item->req_usec < REMOTE_R2_BOUND) {
return REMOTE_SLOT_BASE1 + (item->req_usec / REMOTE_R2_STEP);
}
return REMOTE_SLOT_BASE2;
}
* @Description: write a plog message into syslogger thread,
* which will flush this message into files/disk.
* @Param[IN] plog_msg: a log message data
* @Return: void
* @See also: write_logmsg_with_one_item()
*/
static void write_logmsg_with_multi_items(PLogMsg* plog_msg)
{
LogPipeProtoHeader* proto = (LogPipeProtoHeader*)plog_msg->msghead;
PLogBasicEntry* entry = &plog_msg->msgbody.entry;
PLogEntryItem* items = &(entry->basic.item[0]);
int const nitems = entry->basic.head.item_num;
if (PLOG_NOT_USED == proto->nuls[0] || PLOG_NOT_USED == proto->nuls[1] || 0 == nitems) {
return;
}
int left_items_num = 0;
PLogEntryItem* left = items;
PLogEntryItem* right = items + (PLOG_ENTRY_MAX_NUM - 1);
PLogEntryItem* const soldier = items + nitems;
do {
while (left->sum_count != 0 && left < soldier) {
++left_items_num;
++left;
}
if (left == soldier) {
break;
}
while (right->sum_count == 0 && right > soldier) {
--right;
}
Assert(left < right);
left->sum_count = right->sum_count;
left->sum_size = right->sum_size;
left->sum_usec = right->sum_usec;
++left_items_num;
++left;
--right;
} while (left < soldier);
Assert(left_items_num == nitems);
proto->len = offsetof(PLogEntry, item) + sizeof(PLogEntryItem) * nitems;
{
int fd = fileno(stderr);
(void)write(fd, (char*)plog_msg, LOGPIPE_HEADER_SIZE + proto->len);
}
entry->basic.head.item_num = 0;
}