* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 2021, openGauss Contributors
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
* snapshot.cpp
*
* Automatically collect MPP snapshots in the background,
* you can also manually collect snapshots
*
* IDENTIFICATION
* src/gausskernel/cbb/instruments/wdr/snapshot.cpp
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/xact.h"
#include "postmaster/postmaster.h"
#include "pgxc/pgxc.h"
#include "pgxc/pgxcnode.h"
#include "pgxc/poolutils.h"
#include "executor/executor.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "utils/memutils.h"
#include "utils/lsyscache.h"
#include "miscadmin.h"
#include "storage/proc.h"
#include "storage/latch.h"
#include "storage/ipc.h"
#include "workload/workload.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_database.h"
#include "gssignal/gs_signal.h"
#include "utils/acl.h"
#include "utils/guc.h"
#include "utils/ps_status.h"
#include "utils/elog.h"
#include "utils/numeric.h"
#include "utils/memprot.h"
#include "utils/builtins.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "gs_thread.h"
#include "access/heapam.h"
#include "optimizer/autoanalyzer.h"
#include "utils/rel.h"
#include "utils/postinit.h"
#include "pgstat.h"
#include "pgxc/execRemote.h"
#include "workload/gscgroup.h"
#include "commands/dbcommands.h"
#include "instruments/generate_report.h"
#include "instruments/snapshot.h"
#include "instruments/dblink_query.h"
#include "libpq/pqsignal.h"
#include "pgxc/groupmgr.h"
const int PGSTAT_RESTART_INTERVAL = 60;
#define COUNT_ARRAY_SIZE(array) (sizeof((array)) / sizeof(*(array)))
using namespace std;
* function
*/
NON_EXEC_STATIC void SnapshotMain();
namespace SnapshotNameSpace {
void CleanSnapshot(uint64 curr_min_snapid);
void take_snapshot(void);
void SubSnapshotMain(void);
void InsertOneTableData(const char** views, int numViews, uint64 snapid);
void InsertTablesData(uint64 snapid);
void DeleteTablesData(const char** views, int numViews, uint64 snapid);
void GetQueryData(const char* query, bool with_column_name, List** cstring_values);
bool ExecuteQuery(const char* query, int SPI_OK_STAT);
void init_curr_table_size(void);
void init_curr_snapid(void);
void CreateTable(const char** views, int numViews, bool ismultidbtable);
void InitTables(void);
void CreateSnapStatTables(void);
void CreateIndexes(const char* views);
void CreateSequence(void);
void UpdateSnapEndTime(uint64 curr_snapid);
void GetQueryStr(StringInfoData& query, const char* viewname, uint64 curr_snapid, const char* dbname);
void GetDatabaseList(List** databaselist);
void InsertOneDbTables(const char** views, int numViews, uint64 curr_snapid);
void InsertDatabaseData(const char* dbname, uint64 curr_snapid, const char** views, int numViews);
void SplitString(const char* str, const char* delim, List** res);
void SetSnapshotSize(uint32* max_snap_size);
void SleepCheckInterrupt(uint32 minutes);
void ResetPortDefaultKeepalivesValue(void);
void SetThrdCxt(void);
void AnalyzeTable(List* analyzeList);
void GetAnalyzeList(List** analyzeList);
char* GetTableColAttr(const char* viewname, bool onlyViewCol, bool addType);
}
* select these views in a different database gives the same result,
* We just need to snapshot these views under openGauss database
*/
static const char* sharedViews[] = {"global_os_runtime", "global_os_threads", "global_instance_time",
"summary_workload_sql_count", "summary_workload_sql_elapse_time", "global_workload_transaction",
"summary_workload_transaction", "global_thread_wait_status", "global_memory_node_detail",
"global_shared_memory_detail",
#ifdef ENABLE_MULTIPLE_NODES
"global_comm_delay", "global_comm_recv_stream", "global_comm_send_stream", "global_comm_status",
"global_pooler_status",
#endif
"global_stat_db_cu", "global_stat_database", "summary_stat_database",
"global_stat_database_conflicts", "summary_stat_database_conflicts",
"global_stat_bad_block", "summary_stat_bad_block", "global_file_redo_iostat", "summary_file_redo_iostat",
"global_rel_iostat", "summary_rel_iostat", "global_file_iostat", "summary_file_iostat",
"global_replication_slots", "global_bgwriter_stat", "global_replication_stat",
"global_transactions_running_xacts", "summary_transactions_running_xacts",
"global_transactions_prepared_xacts", "summary_transactions_prepared_xacts", "summary_statement",
"global_statement_count", "summary_statement_count", "global_config_settings", "global_wait_events",
"summary_user_login", "global_ckpt_status", "global_double_write_status",
"global_pagewriter_status", "global_redo_status",
"global_rto_status", "global_recovery_status", "global_threadpool_status",
"statement_responsetime_percentile"};
* These views represent the state of the database in which they are located
* select these views in a different database gives the different result,
* We just need to snapshot these views under different database
*/
static const char* dbRelatedViews[] = {"global_statio_all_indexes", "summary_statio_all_indexes",
"global_statio_all_sequences", "summary_statio_all_sequences", "global_statio_all_tables",
"summary_statio_all_tables", "global_stat_all_indexes", "summary_stat_all_indexes",
"summary_stat_user_functions", "global_stat_user_functions", "global_stat_all_tables", "summary_stat_all_tables"};
static const char* lastDbRelatedViews[] = {"class_vital_info"};
static const char* lastStatViews[] = {"global_record_reset_time"};
void instrSnapshotClean(void)
{
dblinkCloseConn();
}
void instrSnapshotCancel(void)
{
dblinkRequestCancel();
}
static void instr_snapshot_exit(SIGNAL_ARGS)
{
t_thrd.perf_snap_cxt.need_exit = true;
die(postgres_signal_arg);
}
static void instr_snapshot_sighup_handler(SIGNAL_ARGS)
{
t_thrd.perf_snap_cxt.got_SIGHUP = true;
}
static void request_snapshot(SIGNAL_ARGS)
{
t_thrd.perf_snap_cxt.request_snapshot = true;
}
static char* GetCurrentTimeStampString()
{
return Datum_to_string(TimestampGetDatum(GetCurrentTimestamp()), TIMESTAMPTZOID, false);
}
static bool CheckNodeGroup()
{
#ifndef ENABLE_MULTIPLE_NODES
return true;
#endif
errno_t rc;
SPI_STACK_LOG("connect", NULL, NULL);
if ((rc = SPI_connect()) != SPI_OK_CONNECT) {
ereport(ERROR,
(errmodule(MOD_WDR_SNAPSHOT),
errcode(ERRCODE_SPI_CONNECTION_FAILURE),
errmsg("SPI_connect failed: %s", SPI_result_code_string(rc)),
errdetail("SPI_connect failed in function CheckNodeGroup"),
errcause("System error."),
erraction("Analyze the error message before the error")));
}
Datum colval;
bool isnull = false;
const char* query =
"select pg_catalog.count(*) from pgxc_group WHERE in_redistribution='n' and is_installation = TRUE;";
colval = GetDatumValue(query, 0, 0, &isnull);
SPI_STACK_LOG("finish", NULL, NULL);
SPI_finish();
return !isnull && (colval != 0);
}
* in pgxc_group table, exit snapshot thread to avoid dead lock issue
* 2. during upgrade, snapshot table can be modified,
* exit snapshot thread to avoid insert data failed
*/
static void check_snapshot_thd_exit()
{
bool need_exit = false;
start_xact_command();
PushActiveSnapshot(GetTransactionSnapshot());
char* redis_group = PgxcGroupGetInRedistributionGroup();
if (redis_group != NULL || !CheckNodeGroup()) {
need_exit = true;
u_sess->attr.attr_common.ExitOnAnyError = true;
}
PopActiveSnapshot();
finish_xact_command();
if (need_exit || u_sess->attr.attr_common.upgrade_mode != 0) {
* we sleep several minute before exit.
*/
const int SLEEP_GAP = 5;
ereport(LOG, (errmsg("snapshot thread will exit during redistribution, upgrade or init group")));
SnapshotNameSpace::SleepCheckInterrupt(SLEEP_GAP);
ereport(LOG, (errmsg("snapshot thread is exited during redistribution, upgrade or init group")));
gs_thread_exit(0);
}
}
* If enable_memory_limit is off,
* select session memory related views will report an error.
* The snapshot thread successfully collects global session memory information.
* Ensure that t_thrd.utils_cxt.gs_mp_inited of each node is true.
* Before each snapshot thread collects information,
* it will first determine whether the view related to session memory can be selected
*/
static bool CheckMemProtectInit()
{
bool res = false;
MemoryContext currentCtx = CurrentMemoryContext;
PG_TRY();
{
errno_t rc;
start_xact_command();
PushActiveSnapshot(GetTransactionSnapshot());
SPI_STACK_LOG("connect", NULL, NULL);
if ((rc = SPI_connect()) != SPI_OK_CONNECT)
ereport(ERROR,
(errmodule(MOD_WDR_SNAPSHOT),
errcode(ERRCODE_SPI_CONNECTION_FAILURE),
errmsg("SPI_connect failed: %s", SPI_result_code_string(rc)),
errdetail("SPI_connect failed in function CheckMemProtectInit"),
errcause("System error."),
erraction("Analyze the error message before the error")));
const char *query = "select * from DBE_PERF.global_memory_node_detail";
(void)SnapshotNameSpace::ExecuteQuery(query, SPI_OK_SELECT);
SPI_STACK_LOG("finish", query, NULL);
SPI_finish();
PopActiveSnapshot();
finish_xact_command();
res = true;
}
PG_CATCH();
{
SPI_STACK_LOG("finish", NULL, NULL);
SPI_finish();
(void)MemoryContextSwitchTo(currentCtx);
ErrorData *edata = CopyErrorData();
ereport(LOG, (errmsg("Failed to snapshot global_memory_node_detail, cause: %s", edata->message)));
FlushErrorState();
FreeErrorData(edata);
PopActiveSnapshot();
AbortCurrentTransaction();
t_thrd.postgres_cxt.xact_started = false;
}
PG_END_TRY();
return res;
}
void kill_snapshot_remote()
{
ExecNodes* exec_nodes = NULL;
StringInfoData buf;
ParallelFunctionState* state = NULL;
exec_nodes = (ExecNodes*)makeNode(ExecNodes);
exec_nodes->baselocatortype = LOCATOR_TYPE_HASH;
exec_nodes->accesstype = RELATION_ACCESS_READ;
exec_nodes->primarynodelist = NIL;
exec_nodes->en_expr = NULL;
exec_nodes->en_relid = InvalidOid;
exec_nodes->nodeList = NIL;
initStringInfo(&buf);
appendStringInfo(&buf, "SELECT pg_catalog.kill_snapshot();");
state = RemoteFunctionResultHandler(buf.data, exec_nodes, NULL, true, EXEC_ON_COORDS, true);
FreeParallelFunctionState(state);
pfree_ext(buf.data);
}
static void CheckPermission()
{
if (!superuser() && !has_rolreplication(GetUserId()) &&
!is_member_of_role(GetUserId(), DEFAULT_ROLE_REPLICATION)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied for terminate snapshot thread"),
errdetail("only system admin or replication role or a member of the gs_role_replication role"
"can terminate snapshot thread"),
errcause("The user does not have system admin privilege"), erraction("Grant system admin to user")));
}
}
* kill snapshot thread
* There will be deadlocks between snapshot thread and redistribution,
* so during redistribution, will stop snapshot thread.
*
* return true if killed snapshot thread successfully
*/
Datum kill_snapshot(PG_FUNCTION_ARGS)
{
CheckPermission();
#ifdef ENABLE_MULTIPLE_NODES
if (IS_PGXC_COORDINATOR) {
#endif
if (g_instance.pid_cxt.SnapshotPID != 0) {
if (PgxcGroupGetInRedistributionGroup() != NULL) {
ereport(LOG, (errmsg("snapshot thread - redistribution in progress")));
}
const int MAX_RETRY_COUNT = 1000000;
const int SLEEP_GAP = 100;
int err = 0;
uint32 old_snapshot_thread_counter = pg_atomic_read_u32(&g_instance.stat_cxt.snapshot_thread_counter);
if ((err = gs_signal_send(g_instance.pid_cxt.SnapshotPID, SIGTERM)) != 0) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_OPERATE_FAILED),
errmsg("terminate snapshot thread failed"),
errdetail("N/A"), errcause("Execution failed due to: %s", gs_strerror(err)),
erraction("check if snapshot thread exists")));
} else {
int wait_times = 0;
* - exit case: g_instance.pid_cxt.SnapshotPID is zero
* - new start case: g_instance.stat_cxt.snapshot_thread_counter will be changed
*
* new started snapshot thread PID can be same with before,
* so use a status counter to indicate snapshot thread restart
*/
while (g_instance.pid_cxt.SnapshotPID != 0 &&
g_instance.stat_cxt.snapshot_thread_counter == old_snapshot_thread_counter) {
if (wait_times++ > MAX_RETRY_COUNT) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_OPERATE_FAILED),
errmsg("terminate snapshot thread failed"),
errdetail("wait time(%d), exceeds MAX_RETRY_COUNT(%d)", wait_times, MAX_RETRY_COUNT),
errcause("restart wdr snapshot thread timeout"
"or The thread did not respond to the kill signal"),
erraction("Check the wdr snapshot thread is restarted")));
}
pg_usleep(SLEEP_GAP);
}
ereport(LOG, (errmsg("snapshot thread is killed")));
}
}
if (!IsConnFromCoord()) {
ereport(LOG, (errmsg("send 'kill snapshot thread' request to other CNs")));
#ifdef ENABLE_MULTIPLE_NODES
kill_snapshot_remote();
#endif
}
#ifdef ENABLE_MULTIPLE_NODES
}
#endif
PG_RETURN_VOID();
}
static void ReloadInfo()
{
if (t_thrd.perf_snap_cxt.got_SIGHUP) {
t_thrd.perf_snap_cxt.got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
if (IsGotPoolReload()) {
processPoolerReload();
ResetGotPoolReload(false);
}
}
static void set_lock_timeout()
{
int rc = 0;
const char* timeout_sql = "set lockwait_timeout = 3000";
if ((rc = SPI_execute(timeout_sql, false, 0)) != SPI_OK_UTILITY) {
ereport(ERROR,
(errmodule(MOD_WDR_SNAPSHOT),
errcode(ERRCODE_INTERNAL_ERROR),
errmsg("set lockwait_timeout failed"),
errdetail("error code: %s", SPI_result_code_string(rc)),
errcause("System error."),
erraction("Contact engineer to support.")));
}
}
* Manually performing a snapshot
*/
Datum create_wdr_snapshot(PG_FUNCTION_ARGS)
{
const uint32 maxTryCount = 100;
const uint32 waitTime = 100000;
if (!superuser()) {
ereport(ERROR,
(errmodule(MOD_WDR_SNAPSHOT),
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied for create WDR Snapshot"),
errdetail("Superuser privilege is need to operate snapshot"),
errcause("The user does not have system admin privilege"),
erraction("Grant system admin to user")));
}
if (!u_sess->attr.attr_common.enable_wdr_snapshot) {
ereport(WARNING, (errcode(ERRCODE_WARNING), errmsg("GUC parameter 'enable_wdr_snapshot' is off")));
PG_RETURN_TEXT_P(cstring_to_text("WDR snapshot request can't be executed"));
;
}
if (!PgxcIsCentralCoordinator(g_instance.attr.attr_common.PGXCNodeName) && (IS_SINGLE_NODE != true)) {
ereport(NOTICE, (errmsg("take snapshot must be on CCN node")));
PG_RETURN_TEXT_P(cstring_to_text("WDR snapshot request can't be executed"));
}
int err = 0;
for (uint32 ntries = 0;; ntries++) {
if (g_instance.pid_cxt.SnapshotPID == 0) {
if (ntries >= maxTryCount) {
ereport(ERROR,
(errmodule(MOD_WDR_SNAPSHOT),
errcode(ERRCODE_OPERATE_FAILED),
errmsg("WDR snapshot request can not be accepted, please retry later"),
errdetail("N/A"),
errcause("wdr snapshot thread does not exist"),
erraction("Check if wdr snapshot thread exists")));
}
} else if ((err = gs_signal_send(g_instance.pid_cxt.SnapshotPID, SIGINT)) != 0) {
if (ntries >= maxTryCount) {
ereport(ERROR,
(errmodule(MOD_WDR_SNAPSHOT),
errcode(ERRCODE_OPERATE_FAILED),
errmsg("Cannot respond to WDR snapshot request"),
errdetail("N/A"),
errcause("Execution failed due to: %s", gs_strerror(err)),
erraction("Check if wdr snapshot thread exists")));
}
} else {
break;
}
CHECK_FOR_INTERRUPTS();
pg_usleep(waitTime);
}
PG_RETURN_TEXT_P(cstring_to_text("WDR snapshot request has been submitted"));
}
* Execute query
* parameter:
* query -- query text
* SPI_OK_STAT -- Query execution status
*/
bool SnapshotNameSpace::ExecuteQuery(const char* query, int SPI_OK_STAT)
{
Assert(query != NULL);
if (SPI_execute(query, false, 0) != SPI_OK_STAT) {
ereport(WARNING, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("invalid query : %s", query)));
return false;
}
return true;
}
* GetDatumValue
* return the result of select sql
* but the type of return value is Datum, You must
* convert the value to the corresponding type
* row : the row of value (begin from 0)
* col : the col of value (begin from 0)
*/
Datum GetDatumValue(const char* query, uint32 row, uint32 col, bool* isnull)
{
Datum colval;
bool tmp_isnull = false;
Assert(query != NULL);
if (!SnapshotNameSpace::ExecuteQuery(query, SPI_OK_SELECT)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("query(%s) can not get datum values", query),
errdetail("An error occurred in function GetDatumValue"),
errcause("System error."),
erraction("Check whether the query can be executed")));
}
if (row >= SPI_processed || col >= (uint32)SPI_tuptable->tupdesc->natts) {
ereport(WARNING,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("row : %d, col : %d out of the size of query data in get datum value", row, col)));
return (Datum)NULL;
}
colval = SPI_getbinval(SPI_tuptable->vals[row], SPI_tuptable->tupdesc, (col + 1), isnull ? isnull : &tmp_isnull);
return colval;
}
* Initialize the size of the current table Every time
* you boot you need to check the size of the table
* size of the snapshot to initialize the curr_table_size.
*/
void SnapshotNameSpace::init_curr_table_size(void)
{
Datum colval;
bool isnull = false;
const char* sql = "select pg_catalog.count(*) from snapshot.snapshot";
colval = GetDatumValue(sql, 0, 0, &isnull);
if (isnull) {
colval = 0;
}
t_thrd.perf_snap_cxt.curr_table_size = DatumGetInt32(colval);
}
void SnapshotNameSpace::CreateSequence(void)
{
Datum colval;
bool isnull = false;
const char* query_seq_sql =
"select pg_catalog.count(*) from pg_class c left join pg_namespace n"
" on n.oid = c.relnamespace"
" where n.nspname = 'snapshot' and c.relname = 'snap_seq'";
colval = GetDatumValue(query_seq_sql, 0, 0, &isnull);
if (isnull) {
colval = 0;
}
if (!colval) {
const char* create_seq_sql = "create sequence snapshot.snap_seq CYCLE";
if (!SnapshotNameSpace::ExecuteQuery(create_seq_sql, SPI_OK_UTILITY)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("create sequence failed"),
errdetail("query(%s) execute failed", create_seq_sql),
errcause("System error."),
erraction("Check if sequence can be created")));
}
}
}
void SnapshotNameSpace::init_curr_snapid(void)
{
Datum colval;
bool isnull = false;
const char* sql = "select pg_catalog.nextval('snapshot.snap_seq')";
colval = GetDatumValue(sql, 0, 0, &isnull);
if (isnull) {
colval = 0;
}
t_thrd.perf_snap_cxt.curr_snapid = numeric_int16_internal(DatumGetNumeric(colval));
}
* UpdateSnapEndTime -- update snapshot end time stamp
*/
void SnapshotNameSpace::UpdateSnapEndTime(uint64 curr_snapid)
{
if (curr_snapid == 0) {
return;
}
StringInfoData query;
initStringInfo(&query);
appendStringInfo(&query,
"update snapshot.snapshot set end_ts = '%s' where snapshot_id = %lu",
GetCurrentTimeStampString(),
curr_snapid);
if (!SnapshotNameSpace::ExecuteQuery(query.data, SPI_OK_UPDATE)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("update snapshot end time stamp filled"),
errdetail("query(%s) execute failed", query.data),
errcause("System error."),
erraction("Check whether the snapshot retry is successful")));
}
pfree_ext(query.data);
}
void SnapshotNameSpace::SetSnapshotSize(uint32* max_snap_size)
{
const int HOUR_OF_DAY = 24;
const int MINUTE_Of_HOUR = 60;
* 1 <= wdr_snapshot_retention_days <= 8(d)
* 10 <= wdr_snapshot_interval <= 60(min)
*/
*max_snap_size = u_sess->attr.attr_common.wdr_snapshot_retention_days * HOUR_OF_DAY * MINUTE_Of_HOUR /
u_sess->attr.attr_common.wdr_snapshot_interval;
}
void SnapshotNameSpace::GetQueryData(const char* query, bool with_column_name, List** cstring_values)
{
bool isnull = false;
List* colname_cstring = NIL;
if (cstring_values != NULL) {
list_free_deep(*cstring_values);
}
ereport(DEBUG1, (errmodule(MOD_WDR_SNAPSHOT), errmsg("[Instruments/Report] query: %s", query)));
if (SPI_execute(query, false, 0) != SPI_OK_SELECT) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("query can not get datum values"),
errdetail("query(%s) execute failed", query),
errcause("System error."),
erraction("Check whether the query can be executed")));
}
if (with_column_name) {
for (int32 i = 0; i < SPI_tuptable->tupdesc->natts; i++) {
char* strName = pstrdup(SPI_tuptable->tupdesc->attrs[i].attname.data);
colname_cstring = lappend(colname_cstring, strName);
}
*cstring_values = lappend(*cstring_values, colname_cstring);
}
for (uint32 i = 0; i < SPI_processed; i++) {
List* row_string = NIL;
uint32 colNum = (uint32)SPI_tuptable->tupdesc->natts;
for (uint32 j = 1; j <= colNum; j++) {
Oid type = SPI_gettypeid(SPI_tuptable->tupdesc, j);
Datum colval = SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, j, &isnull);
char* tmpStr = Datum_to_string(colval, type, isnull);
row_string = lappend(row_string, tmpStr);
}
*cstring_values = lappend(*cstring_values, row_string);
}
}
#if ((defined(ENABLE_MULTIPLE_NODES)) || (defined(ENABLE_PRIVATEGAUSS)))
* SnapshotView
* Since the snapshot table can be queried only when the parameter is on,
* the view is created during snapshot initialization.
* when the parameter is off, an empty view is provided.
*/
static void SnapshotView()
{
const char* createSnapshotviewdrop = "drop view if exists SYS.ADM_HIST_SNAPSHOT";
if (SPI_execute(createSnapshotviewdrop, false, 0) != SPI_OK_UTILITY) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("drop view failed"),
errdetail("query(%s) execute failed", createSnapshotviewdrop),
errcause("System error."),
erraction("Check whether the query can be executed")));
}
const char* createSnapshotview = "CREATE OR REPLACE VIEW SYS.ADM_HIST_SNAPSHOT AS "
"SELECT s.snapshot_id AS SNAP_ID, s.start_ts AS BEGIN_INTERVAL_TIME FROM SNAPSHOT.SNAPSHOT s";
if (SPI_execute(createSnapshotview, false, 0) != SPI_OK_UTILITY) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("CREATE VIEW failed"),
errdetail("query(%s) execute failed", createSnapshotview),
errcause("System error."),
erraction("Check whether the query can be executed")));
}
}
#endif
* take_snapshot
* All the snapshot processes are executed here,
* we will collect snapshot info from views in dbe_perf schema,
* then insert into snapshot tables in snapshot schema, In order to control
* the amount of disk space occupied by the snapshot,
* we use curr_tables_size to record the number of snapshots,
* and use MAX_TABLE_SIZE to control the size of the snapshot disk space.
*/
void SnapshotNameSpace::take_snapshot()
{
StringInfoData query;
initStringInfo(&query);
PG_TRY();
{
int rc = 0;
uint32 max_snap_size = 0;
SnapshotNameSpace::SetSnapshotSize(&max_snap_size);
SPI_STACK_LOG("connect", NULL, NULL);
if ((rc = SPI_connect()) != SPI_OK_CONNECT) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_INTERNAL_ERROR),
errmsg("SPI_connect failed: %s", SPI_result_code_string(rc)),
errdetail("N/A"), errcause("System error."),
erraction("Check whether the snapshot retry is successful")));
}
set_lock_timeout();
SnapshotNameSpace::init_curr_table_size();
while (t_thrd.perf_snap_cxt.curr_table_size >= max_snap_size) {
bool isnull = false;
const char* sql = "select snapshot_id from snapshot.snapshot order by start_ts limit 1";
Datum colval = GetDatumValue(sql, 0, 0, &isnull);
if (isnull) {
colval = 0;
}
SnapshotNameSpace::CleanSnapshot(DatumGetObjectId(colval));
t_thrd.perf_snap_cxt.curr_table_size--;
}
SnapshotNameSpace::init_curr_snapid();
appendStringInfo(&query,
"INSERT INTO snapshot.snapshot(snapshot_id, start_ts) "
"values (%lu, '%s')", t_thrd.perf_snap_cxt.curr_snapid,
GetCurrentTimeStampString());
if (SPI_execute(query.data, false, 0) == SPI_OK_INSERT) {
SnapshotNameSpace::InsertTablesData(t_thrd.perf_snap_cxt.curr_snapid);
t_thrd.perf_snap_cxt.curr_table_size++;
} else {
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
errmsg("query(%s) execute failed", query.data),
errdetail("N/A"), errcause("System error."),
erraction("Check whether the snapshot retry is successful")));
}
SnapshotNameSpace::UpdateSnapEndTime(t_thrd.perf_snap_cxt.curr_snapid);
SPI_STACK_LOG("finish", query.data, NULL);
pfree_ext(query.data);
SPI_finish();
}
PG_CATCH();
{
SPI_STACK_LOG("finish", query.data, NULL);
pfree_ext(query.data);
SPI_finish();
PG_RE_THROW();
}
PG_END_TRY();
}
static bool IsCgroupInit(const char* viewname)
{
StringInfoData query;
initStringInfo(&query);
if (strncmp(viewname, "wlm_controlgroup_config", strlen("wlm_controlgroup_config")) == 0 ||
strncmp(viewname, "wlm_controlgroup_ng_config", strlen("wlm_controlgroup_ng_config")) == 0 ||
strncmp(viewname, "wlm_cgroup_config", strlen("wlm_cgroup_config")) == 0) {
if (g_instance.wlm_cxt->gscgroup_init_done == 0) {
return false;
}
}
pfree_ext(query.data);
return true;
}
void SnapshotNameSpace::DeleteTablesData(const char** views, int numViews, uint64 snapid)
{
StringInfoData query;
initStringInfo(&query);
for (int i = 0; i < numViews; i++) {
if (!IsCgroupInit(views[i])) {
continue;
}
resetStringInfo(&query);
appendStringInfo(&query, "delete from snapshot.snap_%s where snapshot_id = %lu", views[i], snapid);
if (!SnapshotNameSpace::ExecuteQuery(query.data, SPI_OK_DELETE)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("clean table of snap_%s is failed", views[i]),
errdetail("N/A"),
errcause("System error."),
erraction("Check whether the snapshot retry is successful")));
}
}
pfree_ext(query.data);
}
void SnapshotNameSpace::GetAnalyzeList(List** analyzeTableList)
{
Datum colval;
bool isnull = false;
StringInfoData query;
initStringInfo(&query);
const char* tableName = "tables_snap_timestamp";
appendStringInfo(&query,
"select relid from dbe_perf.class_vital_info "
"where schemaname = 'snapshot' and relname = '%s'",
tableName);
colval = GetDatumValue(query.data, 0, 0, &isnull);
if (!colval) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("analyze table failed"),
errdetail("table(%s) not exist", tableName),
errcause("System error."),
erraction("Check whether the snapshot retry is successful")));
}
MemoryContext old_context = MemoryContextSwitchTo(t_thrd.perf_snap_cxt.PerfSnapMemCxt);
Oid* relationObjectId = (Oid*)palloc(sizeof(Oid));
*relationObjectId = DatumGetObjectId(colval);
*analyzeTableList = lappend(*analyzeTableList, relationObjectId);
(void)MemoryContextSwitchTo(old_context);
pfree_ext(query.data);
}
void SnapshotNameSpace::AnalyzeTable(List* analyzeList)
{
Relation relation = NULL;
foreach_cell(cellAnalyze, analyzeList)
{
Oid* analyzeName = (Oid*)lfirst(cellAnalyze);
relation = heap_open(*analyzeName, NoLock);
bool is_analyzed = AutoAnaProcess::runAutoAnalyze(relation);
if (!is_analyzed) {
ereport(WARNING, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("snapshot analyze is failed")));
}
heap_close(relation, NoLock);
}
}
* InsertOneTableData -- insert snapshot into snapshot.snap_xxx_xxx table
*
* insert into a table of snapshot schema from a view data from dbe_perf schema
* and must get start time and end time when snapshot table insert
*/
void SnapshotNameSpace::InsertOneTableData(const char** views, int numViews, uint64 snapid)
{
char* dbName = NULL;
StringInfoData query;
initStringInfo(&query);
dbName = get_and_check_db_name(u_sess->proc_cxt.MyDatabaseId, true);
for (int i = 0; i < numViews; i++) {
if (!t_thrd.perf_snap_cxt.is_mem_protect &&
(strcmp(views[i], "global_memory_node_detail") == 0))
continue;
resetStringInfo(&query);
CHECK_FOR_INTERRUPTS();
appendStringInfo(&query,
"INSERT INTO snapshot.tables_snap_timestamp(snapshot_id, db_name, tablename, start_ts) "
"values(%lu, '%s', 'snap_%s', '%s')",
snapid,
dbName,
views[i],
GetCurrentTimeStampString());
if (!SnapshotNameSpace::ExecuteQuery(query.data, SPI_OK_INSERT)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("insert into tables_snap_timestamp start time stamp is failed"),
errdetail("query(%s) execute failed", query.data),
errcause("System error."),
erraction("Check whether the snapshot retry is successful")));
}
CHECK_FOR_INTERRUPTS();
resetStringInfo(&query);
char* snapColAttr = SnapshotNameSpace::GetTableColAttr(views[i], false, false);
char* colAttr = SnapshotNameSpace::GetTableColAttr(views[i], true, false);
appendStringInfo(&query,
"INSERT INTO snapshot.snap_%s(snapshot_id, %s) select %lu, %s from dbe_perf.%s",
views[i],
snapColAttr,
snapid,
colAttr,
views[i]);
pfree(colAttr);
pfree(snapColAttr);
if (!SnapshotNameSpace::ExecuteQuery(query.data, SPI_OK_INSERT)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("insert data failed"),
errdetail("query(%s) execute failed", query.data),
errcause("System error."),
erraction("Check whether the snapshot retry is successful "
"and check whether the query can be executed")));
}
CHECK_FOR_INTERRUPTS();
resetStringInfo(&query);
appendStringInfo(&query,
"update snapshot.tables_snap_timestamp set end_ts = '%s' "
"where snapshot_id = %lu and db_name = '%s' and tablename = 'snap_%s'",
GetCurrentTimeStampString(), snapid, dbName, views[i]);
if (views == lastStatViews) {
appendStringInfo(&query,
" and start_ts = (select pg_catalog.max(start_ts) from snapshot.tables_snap_timestamp "
"where snapshot_id = %lu and db_name = '%s' and tablename = 'snap_%s')",
snapid, dbName, views[i]);
}
if (!SnapshotNameSpace::ExecuteQuery(query.data, SPI_OK_UPDATE)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("update tables_snap_timestamp end time stamp is failed"),
errdetail("query(%s) execute failed", query.data),
errcause("System error."),
erraction("Check whether the snapshot retry is successful")));
}
}
pfree_ext(dbName);
pfree_ext(query.data);
}
void SnapshotNameSpace::InsertTablesData(uint64 snapid)
{
int numViews = COUNT_ARRAY_SIZE(lastStatViews);
SnapshotNameSpace::InsertOneTableData(lastStatViews, numViews, snapid);
numViews = COUNT_ARRAY_SIZE(dbRelatedViews);
SnapshotNameSpace::InsertOneDbTables(dbRelatedViews, numViews, snapid);
numViews = COUNT_ARRAY_SIZE(sharedViews);
SnapshotNameSpace::InsertOneTableData(sharedViews, numViews, snapid);
numViews = COUNT_ARRAY_SIZE(lastStatViews);
SnapshotNameSpace::InsertOneTableData(lastStatViews, numViews, snapid);
numViews = COUNT_ARRAY_SIZE(lastDbRelatedViews);
SnapshotNameSpace::InsertOneDbTables(lastDbRelatedViews, numViews, snapid);
}
static void DeleteStatTableDate(uint64 curr_min_snapid)
{
StringInfoData query;
initStringInfo(&query);
appendStringInfo(&query, "delete from snapshot.snapshot where snapshot_id = %lu", curr_min_snapid);
if (!SnapshotNameSpace::ExecuteQuery(query.data, SPI_OK_DELETE)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("clean snapshot id %lu is failed in snapshot table", curr_min_snapid),
errdetail("query(%s) execute failed", query.data),
errcause("System error."),
erraction("Check whether the snapshot retry is successful"
" and check whether the query can be executed")));
}
resetStringInfo(&query);
appendStringInfo(&query, "delete from snapshot.tables_snap_timestamp where snapshot_id = %lu", curr_min_snapid);
if (!SnapshotNameSpace::ExecuteQuery(query.data, SPI_OK_DELETE)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("clean snapshot failed"),
errdetail("query(%s) execute failed", query.data),
errcause("System error."),
erraction("Check whether the snapshot retry is successful")));
}
pfree_ext(query.data);
}
* Called when the tablespace exceeds the set value
* Delete the smallest case of snapid in the table
*/
void SnapshotNameSpace::CleanSnapshot(uint64 curr_min_snapid)
{
DeleteStatTableDate(curr_min_snapid);
int numViews = COUNT_ARRAY_SIZE(sharedViews);
SnapshotNameSpace::DeleteTablesData(sharedViews, numViews, curr_min_snapid);
numViews = COUNT_ARRAY_SIZE(dbRelatedViews);
SnapshotNameSpace::DeleteTablesData(dbRelatedViews, numViews, curr_min_snapid);
numViews = COUNT_ARRAY_SIZE(lastDbRelatedViews);
SnapshotNameSpace::DeleteTablesData(lastDbRelatedViews, numViews, curr_min_snapid);
numViews = COUNT_ARRAY_SIZE(lastStatViews);
SnapshotNameSpace::DeleteTablesData(lastStatViews, numViews, curr_min_snapid);
ereport(LOG, (errmsg("delete snapshot where snapshot_id = " UINT64_FORMAT, curr_min_snapid)));
}
* snapshot_start
*
* Called from postmaster at startup or after an existing collector
* died. Attempt to fire up a fresh statistics collector.
*
* Returns PID of child process, or 0 if fail.
*
* Note: if fail, we will be called again from the postmaster main loop.
*/
ThreadId snapshot_start(void)
{
time_t curtime;
* Do nothing if too soon since last collector start. This is a safety
* valve to protect against continuous respawn attempts if the collector
* is dying immediately at launch. Note that since we will be re-called
* from the postmaster main loop, we will get another chance later.
*/
curtime = time(NULL);
if ((unsigned int)(curtime - t_thrd.perf_snap_cxt.last_snapshot_start_time) < (unsigned int)PGSTAT_RESTART_INTERVAL) {
return 0;
}
t_thrd.perf_snap_cxt.last_snapshot_start_time = curtime;
return initialize_util_thread(SNAPSHOT_WORKER);
}
void JobSnapshotIAm(void)
{
t_thrd.role = SNAPSHOT_WORKER;
}
* called in initpostgres() function
*/
bool IsJobSnapshotProcess(void)
{
return t_thrd.role == SNAPSHOT_WORKER;
}
static void CreateStatTable(const char* query, const char* tablename)
{
Datum colval;
bool isnull = false;
StringInfoData sql;
initStringInfo(&sql);
appendStringInfo(
&sql, "select pg_catalog.count(*) from pg_tables where tablename = '%s' and schemaname = 'snapshot'",
tablename);
colval = GetDatumValue(sql.data, 0, 0, &isnull);
if (!DatumGetInt32(colval)) {
if (!SnapshotNameSpace::ExecuteQuery(query, SPI_OK_UTILITY)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("can not create snapshot stat table"),
errdetail("query(%s) execute failed", sql.data),
errcause("System error."),
erraction("Check whether the query can be executed")));
}
}
pfree_ext(sql.data);
}
void SnapshotNameSpace::CreateSnapStatTables(void)
{
StringInfoData createTs;
StringInfoData createSnapshot;
const char* tablename1 = "tables_snap_timestamp";
const char* tablename2 = "snapshot";
initStringInfo(&createTs);
appendStringInfo(
&createTs, "create table snapshot.tables_snap_timestamp(snapshot_id bigint not null, db_name text, "
"tablename text, start_ts timestamp with time zone, end_ts timestamp with time zone)");
initStringInfo(&createSnapshot);
appendStringInfo(
&createSnapshot, "create table snapshot.snapshot(snapshot_id bigint not null, "
"start_ts timestamp with time zone, end_ts timestamp with time zone, primary key (snapshot_id))");
if (ENABLE_DSS) {
appendStringInfo(&createTs, " with (segment = on)");
appendStringInfo(&createSnapshot, " with (segment = on)");
}
CreateStatTable(createTs.data, tablename1);
CreateStatTable(createSnapshot.data, tablename2);
}
static void DropIndexes(const char* indexName)
{
StringInfoData query;
initStringInfo(&query);
appendStringInfo(&query, "drop index IF EXISTS snapshot.%s", indexName);
if (!SnapshotNameSpace::ExecuteQuery(query.data, SPI_OK_UTILITY)) {
pfree_ext(query.data);
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("create index failed"), errdetail("drop index snapshot.%s execute error", indexName),
errcause("System error."), erraction("Check whether the query can be executed")));
}
pfree_ext(query.data);
}
void SnapshotNameSpace::InitTables()
{
SnapshotNameSpace::CreateSnapStatTables();
SnapshotNameSpace::CreateSequence();
int numViews = COUNT_ARRAY_SIZE(sharedViews);
SnapshotNameSpace::CreateTable(sharedViews, numViews, true);
numViews = COUNT_ARRAY_SIZE(dbRelatedViews);
SnapshotNameSpace::CreateTable(dbRelatedViews, numViews, false);
numViews = COUNT_ARRAY_SIZE(lastDbRelatedViews);
SnapshotNameSpace::CreateTable(lastDbRelatedViews, numViews, false);
numViews = COUNT_ARRAY_SIZE(lastStatViews);
SnapshotNameSpace::CreateTable(lastStatViews, numViews, true);
DropIndexes("snap_summary_statio_indexes_name");
DropIndexes("snap_summary_statio_tables_name");
DropIndexes("snap_summary_stat_indexes_name");
DropIndexes("snap_class_info_name");
#if ((defined(ENABLE_MULTIPLE_NODES)) || (defined(ENABLE_PRIVATEGAUSS)))
SnapshotView();
#endif
}
void SnapshotNameSpace::CreateTable(const char** views, int numViews, bool isSharedViews)
{
bool isnull = false;
StringInfoData query;
initStringInfo(&query);
for (int i = 0; i < numViews; i++) {
resetStringInfo(&query);
appendStringInfo(&query,
"select pg_catalog.count(*) from pg_tables where tablename = 'snap_%s' "
"and schemaname = 'snapshot'",
views[i]);
Datum colval = GetDatumValue(query.data, 0, 0, &isnull);
if (!DatumGetInt32(colval)) {
resetStringInfo(&query);
char* snapColAttrType = SnapshotNameSpace::GetTableColAttr(views[i], false, true);
if (isSharedViews) {
appendStringInfo(
&query, "create table snapshot.snap_%s(snapshot_id bigint, %s)", views[i], snapColAttrType);
} else {
appendStringInfo(&query,
"create table snapshot.snap_%s(snapshot_id bigint, db_name text, %s)",
views[i],
snapColAttrType);
}
if (ENABLE_DSS) {
appendStringInfo(&query, " with (segment = on)");
}
if (!SnapshotNameSpace::ExecuteQuery(query.data, SPI_OK_UTILITY)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("create WDR snapshot data table failed"),
errdetail("query(%s) execute error", query.data),
errcause("System error."),
erraction("Check whether the query can be executed")));
}
pfree(snapColAttrType);
}
SnapshotNameSpace::CreateIndexes(views[i]);
}
pfree_ext(query.data);
}
char* SnapshotNameSpace::GetTableColAttr(const char* viewName, bool onlyViewCol, bool addType)
{
List* stringtables = NIL;
StringInfoData sql;
initStringInfo(&sql);
StringInfoData colType;
initStringInfo(&colType);
appendStringInfo(&sql,
"SELECT a.attname AS field,t.typname AS type FROM "
"pg_class c,pg_attribute a,pg_type t, pg_namespace n"
" WHERE n.nspname = 'dbe_perf' and c.relnamespace = n.oid and c.relname = '%s' "
"and a.attnum > 0 and a.attrelid = c.oid and a.atttypid = t.oid ORDER BY a.attnum;",
viewName);
SnapshotNameSpace::GetQueryData(sql.data, false, &stringtables);
foreach_cell(cell, stringtables)
{
ListCell* row = ((List*)lfirst(cell))->head;
if (onlyViewCol) {
appendStringInfo(&colType, "\"%s\" ", (char*)lfirst(row));
} else {
appendStringInfo(&colType, "\"snap_%s\" ", (char*)lfirst(row));
}
if (addType) {
appendStringInfo(&colType, "%s", (char*)lfirst(row->next));
}
if (cell->next != NULL)
appendStringInfo(&colType, ", ");
}
pfree_ext(sql.data);
DeepListFree(stringtables, true);
return colType.data;
}
void SnapshotNameSpace::GetQueryStr(StringInfoData& query, const char* viewname, uint64 curr_snapid, const char* dbname)
{
char* snapColAttr = SnapshotNameSpace::GetTableColAttr(viewname, false, false);
char* snapColAttrType = SnapshotNameSpace::GetTableColAttr(viewname, false, true);
appendStringInfo(&query,
"insert into snapshot.snap_%s(snapshot_id, db_name, %s) select snapshot_id, dbname1, %s from"
" pg_catalog.wdr_xdb_query('dbname=%s'::text, 'select %lu, ''%s'', t.* from dbe_perf.%s t'::text)"
" as i(snapshot_id int8, dbname1 text, %s)",
viewname,
snapColAttr,
snapColAttr,
dbname,
curr_snapid,
dbname,
viewname,
snapColAttrType);
pfree_ext(snapColAttr);
pfree(snapColAttrType);
}
void SnapshotNameSpace::GetDatabaseList(List** databaselist)
{
StringInfoData query;
initStringInfo(&query);
appendStringInfo(&query, "select datname from pg_database where datistemplate = 'f'");
SnapshotNameSpace::GetQueryData(query.data, false, databaselist);
pfree_ext(query.data);
}
* InsertOneDbTables
* for exemple pg_stat_xxx and pg_statio_xxx tables relation with pg_class
*/
void SnapshotNameSpace::InsertOneDbTables(const char** views, int numViews, uint64 curr_snapid)
{
List* databaseList = NIL;
SnapshotNameSpace::GetDatabaseList(&databaseList);
foreach_cell(cell, databaseList)
{
List* row = (List*)lfirst(cell);
SnapshotNameSpace::InsertDatabaseData((char*)linitial(row), curr_snapid, views, numViews);
}
DeepListFree(databaseList, true);
}
* SpiltString
* @str :string will be split
* @delim :Splitter
* @res :splited strings
*/
void SnapshotNameSpace::SplitString(const char* str, const char* delim, List** res)
{
errno_t rc;
char* next_token = NULL;
char* token = NULL;
if (str == NULL) {
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("spilt str can not null")));
}
char* strs = (char*)palloc(strlen(str) + 1);
rc = strncpy_s(strs, strlen(str) + 1, str, strlen(str));
securec_check_c(rc, "\0", "\0");
token = strtok_s(strs, delim, &next_token);
while (token != NULL) {
size_t strSize = strlen(token) + 1;
char* subStr = (char*)palloc(strSize * sizeof(char));
rc = strncpy_s(subStr, strSize, token, strSize - 1);
securec_check_c(rc, "\0", "\0");
*res = lappend(*res, subStr);
token = strtok_s(NULL, delim, &next_token);
}
pfree(strs);
}
* GetDatabaseData
* get other database data for snapshot
*/
void SnapshotNameSpace::InsertDatabaseData(const char* dbname, uint64 curr_snapid, const char** views, int numViews)
{
StringInfoData query;
initStringInfo(&query);
StringInfoData sql;
initStringInfo(&sql);
for (int i = 0; i < numViews; i++) {
SnapshotNameSpace::GetQueryStr(query, views[i], curr_snapid, dbname);
resetStringInfo(&sql);
CHECK_FOR_INTERRUPTS();
appendStringInfo(&sql, "INSERT INTO snapshot.tables_snap_timestamp"
"(snapshot_id, db_name, tablename, start_ts) "
"values(%lu, '%s', 'snap_%s', '%s')",
curr_snapid, dbname, views[i], GetCurrentTimeStampString());
if (!SnapshotNameSpace::ExecuteQuery(sql.data, SPI_OK_INSERT)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("insert into tables_snap_timestamp start time stamp failed"),
errdetail("query(%s) execute failed", sql.data), errcause("System error."),
erraction("Check whether the query can be executed")));
}
CHECK_FOR_INTERRUPTS();
if (!SnapshotNameSpace::ExecuteQuery(query.data, SPI_OK_INSERT)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("insert into snap_%s is failed", views[i]),
errdetail("query(%s) execute failed", query.data), errcause("System error."),
erraction("Check whether the query can be executed")));
}
CHECK_FOR_INTERRUPTS();
resetStringInfo(&sql);
appendStringInfo(&sql,
"update snapshot.tables_snap_timestamp set end_ts = '%s' "
"where snapshot_id = %lu and db_name = '%s' and tablename = 'snap_%s'",
GetCurrentTimeStampString(), curr_snapid, dbname, views[i]);
if (!SnapshotNameSpace::ExecuteQuery(sql.data, SPI_OK_UPDATE)) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("update tables_snap_timestamp end time stamp failed"),
errdetail("query(%s) execute failed", sql.data),
errcause("System error."),
erraction("Check whether the query can be executed")));
}
resetStringInfo(&query);
}
pfree_ext(query.data);
pfree_ext(sql.data);
}
In order to accelerate query for awr report, the index of some tables need to create
The index is created immediately after whose table has existed at the start phase
*/
void SnapshotNameSpace::CreateIndexes(const char* views)
{
bool isnull = false;
StringInfoData query;
initStringInfo(&query);
appendStringInfo(&query,
"select pg_catalog.count(*) from pg_indexes where schemaname = 'snapshot' and "
"tablename = 'snap_%s' and indexname = 'snap_%s_idx'",
views, views);
Datum indexNum = GetDatumValue(query.data, 0, 0, &isnull);
if (!DatumGetInt32(indexNum)) {
resetStringInfo(&query);
appendStringInfo(&query, "create index snapshot.snap_%s_idx on snapshot.snap_%s(snapshot_id)",
views, views);
if (!SnapshotNameSpace::ExecuteQuery(query.data, SPI_OK_UTILITY)) {
pfree_ext(query.data);
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_DATA_EXCEPTION),
errmsg("create WDR snapshot index failed"),
errdetail("create index snapshot.snap_%s_idx execute error", views),
errcause("System error."),
erraction("Check whether the query can be executed")));
}
}
pfree_ext(query.data);
}
void SnapshotNameSpace::SleepCheckInterrupt(uint32 minutes)
{
const int ONE_SECOND = 1000000;
const uint32 ONE_MINUTE = 60;
uint32 waitCycle = ONE_MINUTE * minutes;
for (uint32 i = 0; i < waitCycle; i++) {
if (t_thrd.perf_snap_cxt.need_exit) {
break;
}
pg_usleep(ONE_SECOND);
}
}
static void SleepToNextTS(TimestampTz nextTimeStamp)
{
const int ONE_SECOND = 1000000;
while (GetCurrentTimestamp() < nextTimeStamp) {
if (t_thrd.perf_snap_cxt.need_exit ||
t_thrd.perf_snap_cxt.request_snapshot ||
t_thrd.perf_snap_cxt.got_SIGHUP) {
break;
}
pg_usleep(ONE_SECOND);
}
}
static TimestampTz GetNextSnapshotTS(TimestampTz lastSnapTS)
{
TimestampTz nextSnapTS = lastSnapTS;
while (GetCurrentTimestamp() >= nextSnapTS) {
nextSnapTS += u_sess->attr.attr_common.wdr_snapshot_interval * USECS_PER_MINUTE;
}
return nextSnapTS;
}
static void ProcessSignal(void)
{
* Ignore all signals usually bound to some action in the postmaster,
* except SIGHUP, SIGTERM and SIGQUIT.
*/
(void)gspqsignal(SIGHUP, instr_snapshot_sighup_handler);
(void)gspqsignal(SIGINT, request_snapshot);
(void)gspqsignal(SIGTERM, instr_snapshot_exit);
(void)gspqsignal(SIGQUIT, quickdie);
(void)gspqsignal(SIGUSR1, procsignal_sigusr1_handler);
(void)gspqsignal(SIGALRM, SIG_IGN);
(void)gspqsignal(SIGPIPE, SIG_IGN);
(void)gspqsignal(SIGUSR2, SIG_IGN);
(void)gspqsignal(SIGCHLD, SIG_DFL);
(void)gspqsignal(SIGTTIN, SIG_DFL);
(void)gspqsignal(SIGTTOU, SIG_DFL);
(void)gspqsignal(SIGCONT, SIG_DFL);
(void)gspqsignal(SIGWINCH, SIG_DFL);
(void)gspqsignal(SIGURG, print_stack);
gs_signal_setmask(&t_thrd.libpq_cxt.UnBlockSig, NULL);
(void)gs_signal_unblock_sigusr2();
if (u_sess->proc_cxt.MyProcPort->remote_host) {
pfree(u_sess->proc_cxt.MyProcPort->remote_host);
}
u_sess->proc_cxt.MyProcPort->remote_host = pstrdup("localhost");
t_thrd.wlm_cxt.thread_node_group = &g_instance.wlm_cxt->MyDefaultNodeGroup;
t_thrd.wlm_cxt.thread_climgr = &t_thrd.wlm_cxt.thread_node_group->climgr;
t_thrd.wlm_cxt.thread_srvmgr = &t_thrd.wlm_cxt.thread_node_group->srvmgr;
}
* The port info is initial status in snapshot thread when select
* the config_setting view. The getsockopt is performed when the following
* parameters are 0. The sock is -1 which make getsockopt failed and
* unexpected log is printed. In order to avoid this, default values are set to -1.
*/
void SnapshotNameSpace::ResetPortDefaultKeepalivesValue(void)
{
Port* port = u_sess->proc_cxt.MyProcPort;
if (port->default_keepalives_idle == 0) {
port->default_keepalives_idle = -1;
}
if (port->default_keepalives_interval == 0) {
port->default_keepalives_interval = -1;
}
if (port->default_keepalives_count == 0) {
port->default_keepalives_count = -1;
}
}
void SnapshotNameSpace::SetThrdCxt(void)
{
t_thrd.mem_cxt.msg_mem_cxt = AllocSetContextCreate(TopMemoryContext,
"MessageContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
t_thrd.mem_cxt.mask_password_mem_cxt = AllocSetContextCreate(TopMemoryContext,
"MaskPasswordCtx",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
t_thrd.perf_snap_cxt.PerfSnapMemCxt = AllocSetContextCreate(TopMemoryContext,
"SnapshotContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
* Create a resource owner to keep track of our resources (currently only
* buffer pins).
*/
t_thrd.utils_cxt.CurrentResourceOwner = ResourceOwnerCreate(NULL, "Snapshot",
THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_DFX));
}
static void SetMyproc()
{
t_thrd.proc_cxt.MyProcPid = gs_thread_self();
t_thrd.proc_cxt.MyStartTime = time(NULL);
t_thrd.proc_cxt.MyProgName = "WDRSnapshot";
u_sess->attr.attr_common.application_name = pstrdup("WDRSnapshot");
}
* This is the main function of the background thread
* A node set to ccn by cm will call the SubSnapshotMain
* function. Loop collection of snapshot information
*/
NON_EXEC_STATIC void SnapshotMain()
{
ereport(LOG, (errmsg("snapshot thread is started")));
pg_atomic_add_fetch_u32(&g_instance.stat_cxt.snapshot_thread_counter, 1);
const int INTERVAL = 20000;
char username[NAMEDATALEN] = {'\0'};
IsUnderPostmaster = true;
JobSnapshotIAm();
SetMyproc();
if (IS_PGXC_COORDINATOR && IsPostmasterEnvironment) {
on_shmem_exit(PGXCNodeCleanAndRelease, 0);
}
ProcessSignal();
BaseInit();
#ifndef EXEC_BACKEND
InitProcess();
#endif
t_thrd.proc_cxt.PostInit->SetDatabaseAndUser((char*)pstrdup(DEFAULT_DATABASE), InvalidOid, username);
t_thrd.proc_cxt.PostInit->InitSnapshotWorker();
SetProcessingMode(NormalProcessing);
init_ps_display("WDR snapshot process", "", "", "");
SnapshotNameSpace::SetThrdCxt();
u_sess->proc_cxt.MyProcPort->SessionStartTime = GetCurrentTimestamp();
SnapshotNameSpace::ResetPortDefaultKeepalivesValue();
Reset_Pseudo_CurrentUserId();
exec_init_poolhandles();
InitVecFuncMap();
#ifndef ENABLE_MULTIPLE_NODES
AutoDopControl dopControl;
dopControl.CloseSmp();
#endif
pgstat_bestart();
pgstat_report_appname("WDRSnapshot");
pgstat_report_activity(STATE_IDLE, NULL);
while (!t_thrd.perf_snap_cxt.need_exit) {
ReloadInfo();
* First sleep and then judge CCN, mainly considering that
* the wlm thread has not completed initialization
* when the snapshot thread starts at startup.
*/
pg_usleep(INTERVAL);
if ((PgxcIsCentralCoordinator(g_instance.attr.attr_common.PGXCNodeName) || IS_SINGLE_NODE) &&
u_sess->attr.attr_common.enable_wdr_snapshot && !SS_IN_REFORM) {
check_snapshot_thd_exit();
SnapshotNameSpace::SubSnapshotMain();
}
pgstat_report_activity(STATE_IDLE, NULL);
}
gs_thread_exit(0);
}
* To avoid generate below message,
* "Do analyze for them in order to generate optimized plan",
* request analyze from snapshot thread.
*/
static void analyze_snap_table()
{
List* analyzeTableList = NULL;
int rc = 0;
start_xact_command();
PushActiveSnapshot(GetTransactionSnapshot());
SPI_STACK_LOG("connect", NULL, NULL);
if ((rc = SPI_connect()) != SPI_OK_CONNECT) {
ereport(ERROR, (errmodule(MOD_WDR_SNAPSHOT), errcode(ERRCODE_INTERNAL_ERROR),
errmsg("analyze table, connection failed: %s", SPI_result_code_string(rc)),
errdetail("SPI_connect failed in function analyze_snap_table"),
errcause("System error."),
erraction("Check whether the snapshot retry is successful")));
}
SnapshotNameSpace::GetAnalyzeList(&(analyzeTableList));
set_lock_timeout();
SnapshotNameSpace::AnalyzeTable(analyzeTableList);
SPI_STACK_LOG("finish", NULL, NULL);
SPI_finish();
PopActiveSnapshot();
finish_xact_command();
}
void InitSnapshot()
{
t_thrd.perf_snap_cxt.is_mem_protect = CheckMemProtectInit();
start_xact_command();
PushActiveSnapshot(GetTransactionSnapshot());
int rc = 0;
SPI_STACK_LOG("connect", NULL, NULL);
if ((rc = SPI_connect()) != SPI_OK_CONNECT) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("snapshot thread SPI_connect failed: %s", SPI_result_code_string(rc)),
errdetail("SPI_connect failed in function analyze_snap_table"),
errcause("System error."),
erraction("Check whether the snapshot retry is successful")));
}
set_lock_timeout();
SnapshotNameSpace::InitTables();
SPI_STACK_LOG("finish", NULL, NULL);
SPI_finish();
PopActiveSnapshot();
finish_xact_command();
ereport(LOG, (errcode(ERRCODE_SUCCESSFUL_COMPLETION), errmsg("create snapshot tables succeed")));
analyze_snap_table();
}
* Background thread for loop collection snapshot
*/
void SnapshotNameSpace::SubSnapshotMain(void)
{
pgstat_report_activity(STATE_RUNNING, NULL);
TimestampTz last_auto_time = GetCurrentTimestamp();
TimestampTz next_auto_time = last_auto_time;
const int SLEEP_GAP_AFTER_ERROR = 1;
InitSnapshot();
u_sess->attr.attr_common.ExitOnAnyError = false;
while (!t_thrd.perf_snap_cxt.need_exit &&
(PgxcIsCentralCoordinator(g_instance.attr.attr_common.PGXCNodeName) || IS_SINGLE_NODE) &&
u_sess->attr.attr_common.enable_wdr_snapshot) {
2. to avoid insert failed, disable snapshot during upgrade */
check_snapshot_thd_exit();
ReloadInfo();
PG_TRY();
{
* be careful not mess the auto interval snapshot-creating schedule */
if (t_thrd.perf_snap_cxt.request_snapshot || GetCurrentTimestamp() >= next_auto_time) {
last_auto_time = t_thrd.perf_snap_cxt.request_snapshot ? last_auto_time : GetCurrentTimestamp();
pgstat_report_activity(STATE_RUNNING, NULL);
ereport(LOG, (errcode(ERRCODE_SUCCESSFUL_COMPLETION), errmsg("WDR snapshot start")));
start_xact_command();
PushActiveSnapshot(GetTransactionSnapshot());
SnapshotNameSpace::take_snapshot();
PopActiveSnapshot();
finish_xact_command();
if (OidIsValid(u_sess->proc_cxt.MyDatabaseId)) {
pgstat_report_stat(true);
}
ereport(LOG, (errcode(ERRCODE_SUCCESSFUL_COMPLETION), errmsg("WDR snapshot end")));
t_thrd.perf_snap_cxt.request_snapshot = false;
pgstat_report_activity(STATE_IDLE, NULL);
}
next_auto_time = GetNextSnapshotTS(last_auto_time);
SleepToNextTS(next_auto_time);
}
PG_CATCH();
{
EmitErrorReport();
FlushErrorState();
ereport(WARNING, (errcode(ERRCODE_WARNING), errmsg("WDR snapshot failed")));
PopActiveSnapshot();
AbortCurrentTransaction();
t_thrd.postgres_cxt.xact_started = false;
pgstat_report_activity(STATE_IDLE, NULL);
t_thrd.perf_snap_cxt.request_snapshot = false;
SnapshotNameSpace::SleepCheckInterrupt(SLEEP_GAP_AFTER_ERROR);
}
PG_END_TRY();
}
}