* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 2021, openGauss Contributors
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* pg_job.cpp
*
* IDENTIFICATION
* src/common/backend/catalog/catalog/pg_job.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include <limits.h>
#include "access/sysattr.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_type.h"
#include "commands/alter.h"
#include "commands/comment.h"
#include "commands/dbcommands.h"
#include "commands/extension.h"
#include "commands/schemacmds.h"
#include "executor/spi.h"
#include "funcapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgxc/pgxc.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/dbe_scheduler.h"
#include "utils/fmgroids.h"
#include "utils/formatting.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/rel_gs.h"
#include "utils/snapmgr.h"
#include "access/heapam.h"
#include "access/tableam.h"
#include "catalog/pg_job.h"
#include "catalog/pg_job_proc.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_database.h"
#include "catalog/gs_job_attribute.h"
#include "fmgr.h"
#include "utils/syscache.h"
#include "utils/timestamp.h"
#include "pgxc/execRemote.h"
#define JOBID_MASK 0xffff
#define NODEID_MASK 0xffffffffffff
#define GENERATE_MIX_ID(node_id, job_id) ((int64)((((node_id)&NODEID_MASK) << 16) | ((uint4)(job_id)&JOBID_MASK)))
#define booltostr(x) ((x) ? "true" : "false")
#define JOB_MAX_FAIL_COUNT 16
#define InvalidJobId 0
extern void syn_command_to_other_node(int4 job_id, Pgjob_Command_Type command, Datum what = 0, Datum next_date = 0,
Datum job_interval = 0, bool broken = 0);
extern void syn_command_to_other_node_internal(Datum node_name, Datum database, int4 job_id, Pgjob_Command_Type command,
Datum what, Datum next_date, Datum job_interval, bool broken);
static void elog_job_detail(int4 job_id, char* what, Update_Pgjob_Status status, char* errmsg);
static char* query_with_update_job(int4 job_id, Datum job_status, int64 pid, Datum last_start_date, Datum last_end_date,
Datum last_suc_date, Datum this_run_date, Datum next_run_date, int2 failure_count, Datum node_name, Datum fail_msg);
static bool is_internal_perf_job(int64 job_id);
static bool is_job_aborted(Datum job_status);
static HeapTuple get_job_tup_from_rel(Relation job_rel, int job_id);
static char* get_job_what(int4 job_id, bool throw_not_found_error = true);
* @brief is_scheduler_job_id
* Check if a job is scheduler job by searching its job name.
*/
bool is_scheduler_job_id(Relation relation, int64 job_id)
{
bool is_regular_job = true;
HeapTuple tuple = NULL;
tuple = get_job_tup_from_rel(relation, job_id);
if (!HeapTupleIsValid(tuple)) {
return false;
}
(void)heap_getattr(tuple, Anum_pg_job_job_name, relation->rd_att, &is_regular_job);
return !is_regular_job;
}
* @Description: Insert a new record to pg_job_proc.
* @in job_id - Job id.
* @in what - Job task.
* @returns - void
*/
static void insert_pg_job_proc(int4 job_id, Datum what)
{
Datum values[Natts_pg_job_proc];
bool nulls[Natts_pg_job_proc];
HeapTuple tuple = NULL;
Relation rel = NULL;
errno_t rc = 0;
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check_c(rc, "\0", "\0");
nulls[Anum_pg_job_proc_job_name - 1] = true;
rel = heap_open(PgJobProcRelationId, RowExclusiveLock);
values[Anum_pg_job_proc_job_id - 1] = ObjectIdGetDatum(job_id);
values[Anum_pg_job_proc_what - 1] = what;
tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
(void)simple_heap_insert(rel, tuple);
CatalogUpdateIndexes(rel, tuple);
heap_freetuple_ext(tuple);
heap_close(rel, RowExclusiveLock);
}
* Encapsulating a function with the job interface will cause the function's schema be
* inserted into the pg_job system table.
* In order to avoid the situation that the job function in the dbe_task schema can only
* call functions or procedures under the dbe_task schema, convert the dbe_task to public.
*/
char* get_real_search_schema()
{
char *cur_schema = get_namespace_name(SchemaNameGetSchemaOid(NULL));
if (strcmp(cur_schema, "dbe_task") == 0) {
pfree_ext(cur_schema);
cur_schema = "public";
}
return cur_schema;
}
* Description: Insert a new record to pg_job.
*
* Parameters:
* @in rel: pg_job relation.
* @in job_id: Job id.
* @in next_date: Next execute time.
* @in job_interval: Time interval.
* @in node_id: If local cn received isubmit from original cn, the node_id identify original cn.
* Returns: void
*/
static void insert_pg_job(Relation rel, int job_id, Datum next_date, Datum job_interval, int4 node_id,
Datum database_name, Datum node_name, char* job_nspname = NULL)
{
if (!(IsConnFromCoord() && IS_PGXC_DATANODE) && !IS_SINGLE_NODE && !IS_PGXC_COORDINATOR &&
!(isSingleMode && IS_PGXC_DATANODE)) {
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("Submit or Isubmit job only can be operate on coordinator.")));
}
HeapTuple tuple = NULL;
Datum values[Natts_pg_job];
bool nulls[Natts_pg_job];
errno_t rc = 0;
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check_c(rc, "\0", "\0");
values[Anum_pg_job_job_id - 1] = Int64GetDatum(job_id);
values[Anum_pg_job_log_user - 1] = DirectFunctionCall1(namein, CStringGetDatum(GetUserNameFromId(GetUserId())));
values[Anum_pg_job_priv_user - 1] = DirectFunctionCall1(namein, CStringGetDatum(GetUserNameFromId(GetUserId())));
if (database_name == 0) {
values[Anum_pg_job_dbname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(get_and_check_db_name(u_sess->proc_cxt.MyDatabaseId, true)));
} else {
values[Anum_pg_job_dbname - 1] = database_name;
}
if (PointerIsValid(job_nspname)) {
values[Anum_pg_job_nspname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(job_nspname));
} else {
values[Anum_pg_job_nspname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(get_real_search_schema()));
}
if (node_name != 0) {
values[Anum_pg_job_node_name - 1] = node_name;
} else {
if (!IsConnFromCoord())
values[Anum_pg_job_node_name - 1] =
DirectFunctionCall1(namein, CStringGetDatum(g_instance.attr.attr_common.PGXCNodeName));
else if (IS_PGXC_COORDINATOR)
values[Anum_pg_job_node_name - 1] =
DirectFunctionCall1(namein, CStringGetDatum(get_pgxc_node_name_by_node_id(node_id)));
else {
values[Anum_pg_job_node_name - 1] =
DirectFunctionCall1(namein, CStringGetDatum(""));
}
}
values[Anum_pg_job_start_date - 1] = next_date;
values[Anum_pg_job_job_status - 1] = CharGetDatum(PGJOB_SUCC_STATUS);
values[Anum_pg_job_current_postgres_pid - 1] = Int64GetDatum(-1);
* Create a job for the first time, we should set
* last_start_date/last_end_date/last_suc_date/this_run_date as null .
*/
nulls[Anum_pg_job_last_start_date - 1] = true;
nulls[Anum_pg_job_last_end_date - 1] = true;
nulls[Anum_pg_job_last_suc_date - 1] = true;
nulls[Anum_pg_job_this_run_date - 1] = true;
values[Anum_pg_job_next_run_date - 1] = next_date;
values[Anum_pg_job_interval - 1] = job_interval;
values[Anum_pg_job_failure_count - 1] = Int16GetDatum(0);
* These entries are exclusive for dbms package extensions.
*/
nulls[Anum_pg_job_job_name - 1] = true;
nulls[Anum_pg_job_end_date - 1] = true;
nulls[Anum_pg_job_enable - 1] = true;
nulls[Anum_pg_job_failure_msg - 1] = PointerGetDatum(cstring_to_text(""));
tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
(void)simple_heap_insert(rel, tuple);
CatalogUpdateIndexes(rel, tuple);
heap_freetuple_ext(tuple);
}
* Description: Get all values of job tuple from pg_job by job_id.
*
* Parameters:
* @in job_id: Job id.
* @in tup: HeapTuple for pg_job.
* @in relation: Relation for pg_job.
* @in values: Column values for pg_job.
* @in visnull: Identify the each value is null or not.
* Returns: HeapTuple
*/
void get_job_values(int4 job_id, HeapTuple tup, Relation relation, Datum* values, bool* visnull)
{
errno_t rc = 0;
rc = memset_s(values, sizeof(Datum) * Natts_pg_job, 0, sizeof(Datum) * Natts_pg_job);
securec_check(rc, "\0", "\0");
rc = memset_s(visnull, sizeof(bool) * Natts_pg_job, false, sizeof(bool) * Natts_pg_job);
securec_check_c(rc, "\0", "\0");
for (int i = 0; i < Natts_pg_job; i++) {
values[i] = heap_getattr(tup, i + 1, relation->rd_att, &visnull[i]);
}
}
* Description: Get job tuple from pg_job by job_id.
*
* Parameters:
* @in job_id: Job id.
* Returns: HeapTuple
*/
static HeapTuple get_job_tup(int job_id)
{
HeapTuple tup = NULL;
char* myrolename = NULL;
tup = SearchSysCache1(PGJOBID, Int64GetDatum(job_id));
if (!HeapTupleIsValid(tup)) {
if (IS_PGXC_DATANODE && IsConnFromCoord())
return NULL;
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("Can not find job id %d in system table pg_job.", job_id)));
}
Form_pg_job job = ((Form_pg_job)GETSTRUCT(tup));
myrolename = GetUserNameFromId(GetUserId());
if (!(superuser_arg(GetUserId()) || systemDBA_arg(GetUserId())) &&
0 != strcmp(NameStr(job->log_user), myrolename)) {
ReleaseSysCache(tup);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("Permission for current user to get this job. current_user: %s, job_user: %s, job_id: %ld",
myrolename,
NameStr(job->log_user),
job->job_id)));
}
return tup;
}
* Description: Delete job from pg_job_proc.
*
* Parameters:
* @in job_id: Job id.
* Returns: void
*/
static void delete_job_proc(int4 job_id)
{
Relation relation = NULL;
HeapTuple tup = NULL;
relation = heap_open(PgJobProcRelationId, RowExclusiveLock);
tup = SearchSysCache1(PGJOBPROCID, Int32GetDatum(job_id));
if (!HeapTupleIsValid(tup)) {
heap_close(relation, RowExclusiveLock);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("Can not find jobid %d in system table pg_job_proc.", job_id)));
}
simple_heap_delete(relation, &tup->t_self);
ReleaseSysCache(tup);
heap_close(relation, RowExclusiveLock);
}
* Description: Get job task info from pg_job_proc.
*
* Parameters:
* @in job_id: Job id.
* Returns: char*
*/
static char* get_job_what(int4 job_id, bool throw_not_found_error)
{
Relation job_proc_rel = NULL;
HeapTuple proc_tup = NULL;
char* what = NULL;
Datum what_datum;
bool isnull = false;
job_proc_rel = heap_open(PgJobProcRelationId, AccessShareLock);
proc_tup = SearchSysCache1(PGJOBPROCID, Int32GetDatum(job_id));
if (!HeapTupleIsValid(proc_tup)) {
heap_close(job_proc_rel, AccessShareLock);
if (throw_not_found_error) {
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("Can not find jobid %d in system table pg_job_proc.", job_id)));
} else {
return NULL;
}
}
what_datum = heap_getattr(proc_tup, Anum_pg_job_proc_what, job_proc_rel->rd_att, &isnull);
what = text_to_cstring(DatumGetTextP(what_datum));
ReleaseSysCache(proc_tup);
heap_close(job_proc_rel, AccessShareLock);
return what;
}
static void update_pg_job_on_remote(const char* update_query, int64 job_id, MemoryContext current_context)
{
PG_TRY();
{
ExecUtilityStmtOnNodes(update_query, NULL, false, false, EXEC_ON_COORDS, false);
}
PG_CATCH();
{
(void)MemoryContextSwitchTo(current_context);
ErrorData* edata = CopyErrorData();
FlushErrorState();
ereport(WARNING,
(errcode(ERRCODE_OPERATE_FAILED),
errmsg("Synchronize job info to other coordinator failed, job_id: %ld.", job_id),
errdetail("Synchronize fail reason: %s.", edata->message)));
}
PG_END_TRY();
}
void update_pg_job_dbname(Oid jobid, const char* dbname)
{
Relation job_relation = NULL;
HeapTuple tup = NULL;
HeapTuple newtuple = NULL;
Datum values[Natts_pg_job];
bool nulls[Natts_pg_job];
bool replaces[Natts_pg_job];
errno_t rc = 0;
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check_c(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check_c(rc, "\0", "\0");
replaces[Anum_pg_job_dbname - 1] = true;
values[Anum_pg_job_dbname - 1] = DirectFunctionCall1(namein, CStringGetDatum(dbname));
job_relation = heap_open(PgJobRelationId, RowExclusiveLock);
tup = get_job_tup(jobid);
newtuple = heap_modify_tuple(tup, RelationGetDescr(job_relation), values, nulls, replaces);
simple_heap_update(job_relation, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(job_relation, newtuple);
ReleaseSysCache(tup);
heap_freetuple_ext(newtuple);
heap_close(job_relation, NoLock);
}
void update_pg_job_username(Oid jobid, const char* username)
{
Relation job_relation = NULL;
HeapTuple tup = NULL;
HeapTuple newtuple = NULL;
Datum values[Natts_pg_job];
bool nulls[Natts_pg_job];
bool replaces[Natts_pg_job];
errno_t rc = 0;
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check_c(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check_c(rc, "\0", "\0");
Assert(username != NULL);
replaces[Anum_pg_job_log_user - 1] = true;
replaces[Anum_pg_job_priv_user - 1] = true;
replaces[Anum_pg_job_nspname - 1] = true;
values[Anum_pg_job_log_user - 1] = DirectFunctionCall1(namein, CStringGetDatum(username));
values[Anum_pg_job_priv_user - 1] = DirectFunctionCall1(namein, CStringGetDatum(username));
values[Anum_pg_job_nspname - 1] = DirectFunctionCall1(namein, CStringGetDatum(username));
job_relation = heap_open(PgJobRelationId, RowExclusiveLock);
tup = get_job_tup(jobid);
newtuple = heap_modify_tuple(tup, RelationGetDescr(job_relation), values, nulls, replaces);
simple_heap_update(job_relation, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(job_relation, newtuple);
ReleaseSysCache(tup);
heap_freetuple_ext(newtuple);
heap_close(job_relation, RowExclusiveLock);
}
* Description: Update job info to pg_job according to job execute status.
*
* Parameters:
* @in job_id: Job id.
* @in status: Job status.
* @in start_date: Job start time.
* @in next_date: Job next time.
* Returns: void
*/
static void update_pg_job_info(int job_id, Update_Pgjob_Status status, Datum start_date, Datum next_date,
const char* failure_msg, bool is_scheduler_job)
{
Relation relation = NULL;
HeapTuple tup = NULL;
HeapTuple newtuple = NULL;
Datum curtime;
Datum values[Natts_pg_job], old_value[Natts_pg_job];
bool nulls[Natts_pg_job], visnull[Natts_pg_job];
bool replaces[Natts_pg_job];
errno_t rc = 0;
int2 failure_count;
char* update_query = NULL;
bool is_job_abort = false;
bool is_perf_job = false;
MemoryContext current_context = CurrentMemoryContext;
ResourceOwner save = t_thrd.utils_cxt.CurrentResourceOwner;
if (Pgjob_Fail == status) {
* Abort transaction or subtransaction when execute job fail be
* ensure the state of transaction is ok.
*/
AbortOutOfAnyTransaction();
}
StartTransactionCommand();
is_perf_job = is_internal_perf_job(job_id);
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check_c(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check_c(rc, "\0", "\0");
relation = heap_open(PgJobRelationId, RowExclusiveLock);
tup = get_job_tup(job_id);
get_job_values(job_id, tup, relation, old_value, visnull);
is_job_abort = is_job_aborted(old_value[Anum_pg_job_job_status - 1]);
curtime = DirectFunctionCall1(timestamptz_timestamp, GetCurrentTimestamp());
replaces[Anum_pg_job_job_status - 1] = !is_job_abort;
replaces[Anum_pg_job_current_postgres_pid - 1] = true;
failure_count = DatumGetInt16(old_value[Anum_pg_job_failure_count - 1]);
switch (status) {
case Pgjob_Run: {
replaces[Anum_pg_job_this_run_date - 1] = true;
values[Anum_pg_job_job_status - 1] = CharGetDatum(is_job_abort ? PGJOB_ABORT_STATUS : PGJOB_RUN_STATUS);
values[Anum_pg_job_current_postgres_pid - 1] = Int64GetDatum(t_thrd.proc_cxt.MyProcPid);
values[Anum_pg_job_this_run_date - 1] = start_date;
if (visnull[Anum_pg_job_this_run_date - 1]) {
replaces[Anum_pg_job_start_date - 1] = true;
values[Anum_pg_job_start_date - 1] = start_date;
}
update_query = query_with_update_job(job_id,
values[Anum_pg_job_job_status - 1],
values[Anum_pg_job_current_postgres_pid - 1],
visnull[Anum_pg_job_last_start_date - 1] ? 0 : old_value[Anum_pg_job_last_start_date - 1],
visnull[Anum_pg_job_last_end_date - 1] ? 0 : old_value[Anum_pg_job_last_end_date - 1],
visnull[Anum_pg_job_last_suc_date - 1] ? 0 : old_value[Anum_pg_job_last_suc_date - 1],
values[Anum_pg_job_this_run_date - 1],
old_value[Anum_pg_job_next_run_date - 1],
failure_count,
values[Anum_pg_job_node_name - 1],
visnull[Anum_pg_job_failure_msg - 1] ? 0 : old_value[Anum_pg_job_failure_msg - 1]);
break;
}
case Pgjob_Succ: {
replaces[Anum_pg_job_last_start_date - 1] = true;
replaces[Anum_pg_job_last_end_date - 1] = true;
replaces[Anum_pg_job_last_suc_date - 1] = true;
replaces[Anum_pg_job_failure_count - 1] = true;
replaces[Anum_pg_job_next_run_date - 1] = true;
replaces[Anum_pg_job_failure_msg - 1] = true;
values[Anum_pg_job_job_status - 1] = CharGetDatum(is_job_abort ? PGJOB_ABORT_STATUS : PGJOB_SUCC_STATUS);
values[Anum_pg_job_current_postgres_pid - 1] = Int64GetDatum(-1);
values[Anum_pg_job_last_start_date - 1] = start_date;
values[Anum_pg_job_last_end_date - 1] = curtime;
values[Anum_pg_job_last_suc_date - 1] = start_date;
failure_count = 0;
values[Anum_pg_job_failure_count - 1] = Int16GetDatum(failure_count);
if (next_date == 0) {
values[Anum_pg_job_job_status - 1] = is_scheduler_job ? values[Anum_pg_job_job_status - 1] : \
CharGetDatum(PGJOB_ABORT_STATUS);
values[Anum_pg_job_next_run_date - 1] = DirectFunctionCall2(to_timestamp,
DirectFunctionCall1(textin, CStringGetDatum("4000-1-1")),
DirectFunctionCall1(textin, CStringGetDatum("yyyy-mm-dd")));
} else {
values[Anum_pg_job_next_run_date - 1] = next_date;
}
values[Anum_pg_job_failure_msg - 1] = PointerGetDatum(cstring_to_text(""));
update_query = query_with_update_job(job_id,
values[Anum_pg_job_job_status - 1],
values[Anum_pg_job_current_postgres_pid - 1],
values[Anum_pg_job_last_start_date - 1],
values[Anum_pg_job_last_end_date - 1],
values[Anum_pg_job_last_suc_date - 1],
visnull[Anum_pg_job_this_run_date - 1] ? 0 : old_value[Anum_pg_job_this_run_date - 1],
values[Anum_pg_job_next_run_date - 1],
failure_count,
values[Anum_pg_job_node_name - 1],
values[Anum_pg_job_failure_msg - 1]);
break;
}
case Pgjob_Fail: {
replaces[Anum_pg_job_last_start_date - 1] = true;
replaces[Anum_pg_job_last_end_date - 1] = true;
replaces[Anum_pg_job_failure_count - 1] = true;
replaces[Anum_pg_job_next_run_date - 1] = true;
replaces[Anum_pg_job_failure_msg - 1] = true;
values[Anum_pg_job_job_status - 1] = CharGetDatum(is_job_abort ? PGJOB_ABORT_STATUS : PGJOB_FAIL_STATUS);
values[Anum_pg_job_current_postgres_pid - 1] = Int64GetDatum(-1);
values[Anum_pg_job_last_start_date - 1] = start_date;
values[Anum_pg_job_last_end_date - 1] = curtime;
failure_count = failure_count + 1;
values[Anum_pg_job_failure_count - 1] = Int16GetDatum(failure_count);
* Set job_status as 'd' if failure_count more or equal to 16 or
* execute the job once if interval is 'null'.
* Then set next_run_date as default date "4000-1-1".
*
* Scheduler jobs is an exception, since scheduler has proprietary enable flag which does
* not rely on PGJOB_ABORT_STATUS.
*/
if (next_date == 0) {
values[Anum_pg_job_job_status - 1] = is_scheduler_job ? values[Anum_pg_job_job_status - 1] : \
CharGetDatum(PGJOB_ABORT_STATUS);
values[Anum_pg_job_next_run_date - 1] = DirectFunctionCall2(to_timestamp,
DirectFunctionCall1(textin, CStringGetDatum("4000-1-1")),
DirectFunctionCall1(textin, CStringGetDatum("yyyy-mm-dd")));
} else if (failure_count >= JOB_MAX_FAIL_COUNT) {
ereport(WARNING,
(errcode(ERRCODE_OPERATE_FAILED),
errmsg("job with id % is abnormal, fail exceeds %d times", job_id, JOB_MAX_FAIL_COUNT)));
values[Anum_pg_job_job_status - 1] = is_scheduler_job ? values[Anum_pg_job_job_status - 1] : \
CharGetDatum(PGJOB_ABORT_STATUS);
values[Anum_pg_job_next_run_date - 1] = next_date;
} else {
values[Anum_pg_job_next_run_date - 1] = next_date;
}
values[Anum_pg_job_failure_msg - 1] = PointerGetDatum(cstring_to_text(failure_msg));
update_query = query_with_update_job(job_id,
values[Anum_pg_job_job_status - 1],
values[Anum_pg_job_current_postgres_pid - 1],
values[Anum_pg_job_last_start_date - 1],
values[Anum_pg_job_last_end_date - 1],
visnull[Anum_pg_job_last_suc_date - 1] ? 0 : old_value[Anum_pg_job_last_suc_date - 1],
visnull[Anum_pg_job_this_run_date - 1] ? 0 : old_value[Anum_pg_job_this_run_date - 1],
values[Anum_pg_job_next_run_date - 1],
failure_count,
values[Anum_pg_job_node_name - 1],
values[Anum_pg_job_failure_msg - 1]);
break;
}
default: {
ReleaseSysCache(tup);
heap_close(relation, is_perf_job ? NoLock : RowExclusiveLock);
AbortOutOfAnyTransaction();
ereport(ERROR, (errcode(ERRCODE_CASE_NOT_FOUND), errmsg("Invalid job status, job_id: %d.", job_id)));
}
}
newtuple = heap_modify_tuple(tup, RelationGetDescr(relation), values, nulls, replaces);
simple_heap_update(relation, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(relation, newtuple);
ReleaseSysCache(tup);
heap_freetuple_ext(newtuple);
if (IS_PGXC_COORDINATOR && update_query != NULL) {
Assert(!IS_SINGLE_NODE);
* If execute job in local success and only synchronize to other coordinator fail,
* we should consider the worker success. Because worker thread should finish this
* transaction to update job_status and failure_count local. Otherwise, the job will
* always restart again and couldn't finish.
*/
update_pg_job_on_remote(update_query, job_id, current_context);
pfree_ext(update_query);
}
heap_close(relation, is_perf_job ? NoLock : RowExclusiveLock);
CommitTransactionCommand();
(void)MemoryContextSwitchTo(current_context);
t_thrd.utils_cxt.CurrentResourceOwner = save;
}
static void check_job_id(int64 job_id, int64 job_max_number = JOBID_MAX_NUMBER)
{
if (job_id <= InvalidJobId || job_id > job_max_number) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Invalid job_id: %ld", job_id),
errdetail("The scope of jobid should between 1 and %ld", job_max_number)));
}
}
void check_job_status(Datum job_status)
{
char status = DatumGetChar(job_status);
if (status != PGJOB_RUN_STATUS &&
status != PGJOB_ABORT_STATUS &&
status != PGJOB_FAIL_STATUS &&
status != PGJOB_SUCC_STATUS) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Invalid job_status: \'%c\' ", status),
errdetail("The job status should be 'r','f','s','d'.")));
}
}
* Description: Update pg_job synchronize from original coordinator.
*
* Parameters:
* @in PG_FUNCTION_ARGS: Arguments passed to function.
* Returns: void
*/
void syn_update_pg_job(PG_FUNCTION_ARGS)
{
int64 job_id = PG_GETARG_INT64(0);
Datum job_status = PG_GETARG_DATUM(1);
Datum pid = PG_GETARG_DATUM(2);
Datum last_start_date = PG_GETARG_DATUM(3);
Datum last_end_date = PG_GETARG_DATUM(4);
Datum last_suc_date = PG_GETARG_DATUM(5);
Datum this_run_date = PG_GETARG_DATUM(6);
Datum next_run_date = PG_GETARG_DATUM(7);
Datum failure_count = PG_GETARG_DATUM(8);
Datum failure_msg = PG_GETARG_DATUM(9);
Relation relation;
HeapTuple tup = NULL;
HeapTuple newtuple = NULL;
Datum values[Natts_pg_job], old_value[Natts_pg_job];
bool nulls[Natts_pg_job], visnull[Natts_pg_job];
bool replaces[Natts_pg_job];
errno_t rc = 0;
check_job_id(job_id);
check_job_status(job_status);
tup = get_job_tup(job_id);
if (!HeapTupleIsValid(tup)) {
return;
}
check_job_permission(tup);
relation = heap_open(PgJobRelationId, RowExclusiveLock);
if (is_scheduler_job_id(relation, job_id)) {
heap_close(relation, RowExclusiveLock);
ereport(ERROR, (errmodule(MOD_JOB), errcode(ERRCODE_INVALID_STATUS),
errmsg("Cannot modify job with jobid:%ld.", job_id),
errdetail("Cannot modify scheduler job with current method."), errcause("Forbidden operation."),
erraction("Please use scheduler interface to operate this action.")));
}
get_job_values(job_id, tup, relation, old_value, visnull);
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check_c(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check_c(rc, "\0", "\0");
replaces[Anum_pg_job_job_status - 1] = true;
replaces[Anum_pg_job_current_postgres_pid - 1] = true;
replaces[Anum_pg_job_next_run_date - 1] = true;
replaces[Anum_pg_job_failure_count - 1] = true;
values[Anum_pg_job_job_status - 1] = job_status;
values[Anum_pg_job_current_postgres_pid - 1] = pid;
values[Anum_pg_job_next_run_date - 1] = next_run_date;
values[Anum_pg_job_failure_count - 1] = failure_count;
values[Anum_pg_job_failure_msg - 1] = failure_msg;
if (!PG_ARGISNULL(3)) {
replaces[Anum_pg_job_last_start_date - 1] = true;
values[Anum_pg_job_last_start_date - 1] = last_start_date;
}
if (!PG_ARGISNULL(4)) {
replaces[Anum_pg_job_last_end_date - 1] = true;
values[Anum_pg_job_last_end_date - 1] = last_end_date;
}
if (!PG_ARGISNULL(5)) {
replaces[Anum_pg_job_last_suc_date - 1] = true;
values[Anum_pg_job_last_suc_date - 1] = last_suc_date;
}
if (!PG_ARGISNULL(6)) {
replaces[Anum_pg_job_this_run_date - 1] = true;
values[Anum_pg_job_this_run_date - 1] = this_run_date;
if (visnull[Anum_pg_job_this_run_date - 1]) {
replaces[Anum_pg_job_start_date - 1] = true;
values[Anum_pg_job_start_date - 1] = this_run_date;
}
}
newtuple = heap_modify_tuple(tup, RelationGetDescr(relation), values, nulls, replaces);
simple_heap_update(relation, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(relation, newtuple);
ReleaseSysCache(tup);
heap_freetuple_ext(newtuple);
heap_close(relation, RowExclusiveLock);
}
* Description: Get interval and next_date by call spi interface.
*
* Parameters:
* @in job_id: Job id.
* @in ischeck: We don't need get value if only check the interval is valid.
* @in job_interval: character of job_interval.
* @in start_date: Old start_date value.
* @out new_interval: New interval value compute by call spi interface.
* @out new_next_date: New next_date value compute by interval.
* @in current_context: Current memory context.
* Returns: void
*/
static void get_interval_nextdate_by_spi(int4 job_id, bool ischeck, const char* job_interval, Datum start_date,
Datum* new_next_date, MemoryContext current_context)
{
int ret, interval_len;
bool isnull = false;
char* exec_job_interval = NULL;
Datum current_date;
TimestampTz result;
struct timeval tp;
gettimeofday(&tp, NULL);
result = (TimestampTz)tp.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
#ifdef HAVE_INT64_TIMESTAMP
result = result * USECS_PER_SEC;
#endif
current_date = DirectFunctionCall1(timestamptz_timestamp, result);
interval_len = strlen(job_interval) + strlen("select ") + 1;
exec_job_interval = (char*)palloc(interval_len);
ret = snprintf_s(exec_job_interval, interval_len, interval_len - 1, "select %s", job_interval);
securec_check_ss_c(ret, "\0", "\0");
SPI_STACK_LOG("connect", NULL, NULL);
if (SPI_OK_CONNECT != SPI_connect()) {
ereport(ERROR,
(errcode(ERRCODE_SPI_CONNECTION_FAILURE),
errmsg("Unable to connect to execute internal query, job_id: %d.", job_id)));
}
ret = SPI_execute(exec_job_interval, true, 1);
if (ret < 0) {
ereport(ERROR,
(errcode(ERRCODE_SPI_EXECUTE_FAILURE),
errmsg("Call SPI_execute execute job interval fail, job_id: %d.", job_id)));
}
pfree_ext(exec_job_interval);
exec_job_interval = NULL;
if (!(SPI_tuptable && SPI_tuptable->tupdesc &&
(SPI_tuptable->tupdesc->attrs[0].atttypid == TIMESTAMPOID ||
SPI_tuptable->tupdesc->attrs[0].atttypid == INTERVALOID))) {
ereport(ERROR,
(errcode(ERRCODE_SPI_ERROR), errmsg("Execute job interval for get next_date error, job_id: %d.", job_id)));
}
if (!ischeck) {
if (INTERVALOID == SPI_tuptable->tupdesc->attrs[0].atttypid) {
Datum new_interval = heap_getattr(SPI_tuptable->vals[0], 1, SPI_tuptable->tupdesc, &isnull);
MemoryContext oldcontext = MemoryContextSwitchTo(current_context);
new_interval = datumCopy(
new_interval, SPI_tuptable->tupdesc->attrs[0].attbyval, SPI_tuptable->tupdesc->attrs[0].attlen);
*new_next_date = DirectFunctionCall2(timestamp_pl_interval, start_date, new_interval);
(void)MemoryContextSwitchTo(oldcontext);
} else {
*new_next_date = heap_getattr(SPI_tuptable->vals[0], 1, SPI_tuptable->tupdesc, &isnull);
}
} else {
if (TIMESTAMPOID == SPI_tuptable->tupdesc->attrs[0].atttypid) {
Datum check_next_date;
check_next_date = heap_getattr(SPI_tuptable->vals[0], 1, SPI_tuptable->tupdesc, &isnull);
if (DatumGetBool(DirectFunctionCall2(timestamp_lt, check_next_date, current_date))) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS),
errmsg("Interval: %s must evaluate to a time in the future for job_id: %d.",
quote_literal_cstr(job_interval),
job_id)));
}
}
}
SPI_STACK_LOG("finish", NULL, NULL);
SPI_finish();
}
* @brief get_interval_nextdate
* Get next job run date base on interval string (can either be a INTERVAL type or a calendaring syntax)
*/
static Datum get_interval_nextdate(int4 job_id, bool ischeck, const char* interval, Datum start_date, Datum base_date,
MemoryContext current_context)
{
Datum new_next_date;
if (pg_strncasecmp(interval, "freq=", strlen("freq=")) != 0) {
get_interval_nextdate_by_spi(job_id, ischeck, interval, start_date, &new_next_date, current_context);
} else {
Datum start_date_tz = DirectFunctionCall1(timestamp_timestamptz, start_date);
Datum base_date_tz = DirectFunctionCall1(timestamp_timestamptz, base_date);
Datum next_date_tz = evaluate_repeat_interval(CStringGetTextDatum(interval), base_date_tz, start_date_tz);
new_next_date = DirectFunctionCall1(timestamptz_timestamp, next_date_tz);
}
return new_next_date;
}
* Description: Begin to execute the job.
*
* Parameters:
* @in job_id: Job id.
* Returns: void
*/
void execute_job(int4 job_id)
{
Datum start_date = 0;
Datum base_date = 0;
Datum old_next_date = 0;
* In RELEASE version, some of these variables will be optimized into
* registers. Be extra careful with updating the variables inside PG_TRY(),
* since PG_CATCH() will always flush and restore those registers to its
* original state(right before PG_CATCH()). An unexpected optimization would
* change how these codes behaves.
*/
volatile Datum new_next_date = 0;
Datum job_name = 0;
char* what = NULL;
char* job_interval = NULL;
char* nspname = NULL;
HeapTuple job_tup = NULL;
Relation job_rel = NULL;
Datum values[Natts_pg_job];
bool nulls[Natts_pg_job];
bool is_scheduler_job = false;
ResourceOwner save = t_thrd.utils_cxt.CurrentResourceOwner;
MemoryContext current_context = CurrentMemoryContext;
StringInfoData buf;
initStringInfo(&buf);
job_tup = get_job_tup(job_id);
job_rel = heap_open(PgJobRelationId, AccessShareLock);
get_job_values(job_id, job_tup, job_rel, values, nulls);
is_scheduler_job = !nulls[Anum_pg_job_job_name - 1];
job_name = is_scheduler_job ? PointerGetDatum(PG_DETOAST_DATUM_COPY(values[Anum_pg_job_job_name - 1])) : Datum(0);
job_interval = text_to_cstring(DatumGetTextP(values[Anum_pg_job_interval - 1]));
old_next_date = values[Anum_pg_job_next_run_date - 1];
new_next_date = old_next_date;
what = get_job_what(job_id);
start_date = DirectFunctionCall1(timestamptz_timestamp, GetCurrentTimestamp());
base_date = values[Anum_pg_job_start_date - 1];
if (!nulls[Anum_pg_job_nspname - 1]) {
nspname = DatumGetCString(DirectFunctionCall1(nameout, NameGetDatum(values[Anum_pg_job_nspname - 1])));
}
heap_close(job_rel, AccessShareLock);
ReleaseSysCache(job_tup);
PG_TRY();
{
update_pg_job_info(job_id, Pgjob_Run, start_date, 0, NULL, is_scheduler_job);
* Compute next_date if interval is not 'null'.
* Only execute the job once and set job_status as 'd' if interval is 'null'.
*/
if (0 != pg_strcasecmp(job_interval, "null")) {
save = t_thrd.utils_cxt.CurrentResourceOwner;
StartTransactionCommand();
new_next_date = get_interval_nextdate(job_id, false, job_interval, start_date, base_date, current_context);
CommitTransactionCommand();
t_thrd.utils_cxt.CurrentResourceOwner = save;
* There is a condition: we create a new job which the interval is a fixed timestamp.
* we should execute the job when the first times, and we don't need execute the job
* later because the new next_date always equal to old next_date.
*/
if (DatumGetBool(DirectFunctionCall2(timestamp_eq, new_next_date, old_next_date)) &&
!nulls[Anum_pg_job_last_end_date - 1]) {
pfree_ext(job_interval);
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS),
errmsg("It is invalid job interval for the reason that it is a fixed timestamp, job_id: %d, "
"interval: %s.",
job_id,
job_interval)));
}
} else {
new_next_date = 0;
elog(WARNING,
"The interval of current job is \'null\', we can execute this job only one times and set job_status as "
"\'d\', job_id: %d.",
job_id);
}
save = t_thrd.utils_cxt.CurrentResourceOwner;
if (nspname != NULL)
appendStringInfo(&buf, "set current_schema=%s;", quote_identifier(nspname));
if (!execute_backend_scheduler_job(job_name, &buf)) {
appendStringInfo(&buf, "%s", what);
execute_simple_query(buf.data);
}
pfree_ext(buf.data);
pfree_ext(job_interval);
t_thrd.utils_cxt.CurrentResourceOwner = save;
}
PG_CATCH();
{
MemoryContext ecxt = MemoryContextSwitchTo(current_context);
ErrorData* edata = CopyErrorData();
char* autoDrop;
FlushErrorState();
if (t_thrd.postgres_cxt.xact_started) {
t_thrd.postgres_cxt.xact_started = false;
}
t_thrd.utils_cxt.CurrentResourceOwner = save;
ResourceOwnerRelease(save, RESOURCE_RELEASE_BEFORE_LOCKS, false, true);
update_pg_job_info(job_id, Pgjob_Fail, start_date, new_next_date, edata->message, is_scheduler_job);
elog_job_detail(job_id, what, Pgjob_Fail, edata->message);
autoDrop = get_attribute_value_str(job_name, "auto_drop", AccessShareLock);
if (0 == pg_strcasecmp(job_interval, "null") && 0 == pg_strcasecmp(autoDrop, "true")) {
expire_backend_job(job_name, true);
}
(void)MemoryContextSwitchTo(ecxt);
pfree_ext(job_interval);
pfree_ext(what);
pfree_ext(autoDrop);
ereport(ERROR,
(errcode(ERRCODE_OPERATE_FAILED),
errmsg("Execute job failed, job_id: %d.", job_id),
errdetail("%s", edata->message)));
}
PG_END_TRY();
update_pg_job_info(job_id, Pgjob_Succ, start_date, new_next_date, NULL, is_scheduler_job);
elog_job_detail(job_id, what, Pgjob_Succ, NULL);
pfree_ext(what);
(void)MemoryContextSwitchTo(current_context);
}
* Description: Check the interval is valid or not.
*
* Parameters:
* @in job_id: Job id.
* @in rel: Relation for pg_job.
* @in interval: Interval for the job.
* @in next_date: Next_date for the job.
* Returns: void
*/
void check_interval_valid(int4 job_id, Relation rel, Datum interval)
{
char* job_interval = text_to_cstring(DatumGetTextP(interval));
MemoryContext current_context = CurrentMemoryContext;
ResourceOwner current_resource = t_thrd.utils_cxt.CurrentResourceOwner;
PG_TRY();
{
(void)get_interval_nextdate(job_id, true, job_interval, 0, 0, CurrentMemoryContext);
}
PG_CATCH();
{
t_thrd.utils_cxt.CurrentResourceOwner = current_resource;
heap_close(rel, RowExclusiveLock);
MemoryContext ecxt = MemoryContextSwitchTo(current_context);
ErrorData* edata = CopyErrorData();
FlushErrorState();
(void)MemoryContextSwitchTo(ecxt);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Invalid parameter interval: \'%s\'.", job_interval),
errdetail("%s", edata->message)));
}
PG_END_TRY();
}
* Description: Add an job to pg_job.
*
* Parameters:
* @in what: Task string.
* @in next_date: Next execute time.
* @in interval_time: Time interval.
* @in job_id: job id. default null.
* Returns: int
*/
Datum job_submit(PG_FUNCTION_ARGS)
{
Datum what = PG_GETARG_DATUM(1);
Datum next_date = PG_GETARG_DATUM(2);
Datum interval_time = PG_GETARG_DATUM(3);
int64 job_id = 0;
int4 real_job_id = 0;
int4 node_id = 0;
Relation rel = NULL;
char *c_what = NULL;
char *c_interval_time = NULL;
if (PG_ARGISNULL(1)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Parameter what can not be null.")));
}
if (PG_ARGISNULL(2)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Parameter next date can not be null.")));
}
rel = heap_open(PgJobRelationId, RowExclusiveLock);
if (PG_ARGISNULL(0)) {
uint16 id = 0;
int ret = jobid_alloc(&id);
if (JOBID_ALLOC_ERROR == ret) {
heap_close(rel, RowExclusiveLock);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("All 32768 jobids have alloc, and there is no free jobid")));
}
job_id = id;
} else {
job_id = PG_GETARG_INT64(0);
if ((IS_PGXC_DATANODE || IS_PGXC_COORDINATOR) && IsConnFromCoord()) {
real_job_id = (uint64)job_id & JOBID_MASK;
node_id = ((uint64)job_id >> 16) & NODEID_MASK;
job_id = real_job_id;
}
check_job_id(job_id);
}
c_what = text_to_cstring(DatumGetTextP(what));
if (PG_ARGISNULL(3)) {
interval_time = CStringGetTextDatum("null");
}
c_interval_time = text_to_cstring(DatumGetTextP(interval_time));
if (0 != pg_strcasecmp(c_interval_time, "null") && !IsConnFromCoord()) {
check_interval_valid(job_id, rel, interval_time);
}
insert_pg_job(rel, job_id, next_date, interval_time, node_id, 0, 0);
insert_pg_job_proc(job_id, what);
#ifdef ENABLE_MULTIPLE_NODES
if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
syn_command_to_other_node(job_id, Job_ISubmit, what, next_date, interval_time);
}
#endif
heap_close(rel, RowExclusiveLock);
elog(LOG,
"Success to Submit job, job_id: %ld, what: %s, next_date: %s, job_interval: %s.",
job_id,
quote_literal_cstr(c_what),
quote_literal_cstr(DatumGetCString(DirectFunctionCall1(timestamp_out, DatumGetTimestamp(next_date)))),
quote_literal_cstr(c_interval_time));
pfree_ext(c_what);
pfree_ext(c_interval_time);
PG_RETURN_INT32(job_id);
}
static void check_parameter_for_nodes(PG_FUNCTION_ARGS, int node_name_idx, int database_idx, int what_idx,
int next_date_idx)
{
if (!superuser() && !isMonitoradmin(GetUserId()))
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("only system/monitor admin can submit multi-node jobs!")));
if (PG_ARGISNULL(node_name_idx)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Parameter node_name can not be null.")));
}
if (strcmp(DatumGetCString(PG_GETARG_DATUM(node_name_idx)), PGJOB_TYPE_ALL) != 0 &&
strcmp(DatumGetCString(PG_GETARG_DATUM(node_name_idx)), PGJOB_TYPE_CCN) != 0) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Parameter node_name can only be 'ALL_NODE' or 'CCN' for multi-node jobs.")));
}
if (PG_ARGISNULL(database_idx)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Parameter Database can not be null.")));
}
if (strcmp(DatumGetCString(PG_GETARG_DATUM(database_idx)), DEFAULT_DATABASE) != 0 &&
strcmp(DatumGetCString(PG_GETARG_DATUM(node_name_idx)), PGJOB_TYPE_CCN) != 0) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Parameter Database can only be 'postgres' except for jobs on CCN.")));
}
(void) get_database_oid(DatumGetCString(PG_GETARG_DATUM(database_idx)), false);
if (PG_ARGISNULL(what_idx)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Parameter what can not be null.")));
}
if (PG_ARGISNULL(next_date_idx)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Parameter next date can not be null.")));
}
}
static void check_job_interval(PG_FUNCTION_ARGS, Relation rel, int job_id, Datum *job_interval, int interval_idx)
{
if (PG_ARGISNULL(4)) {
*job_interval = CStringGetTextDatum("null");
} else {
if (0 != pg_strcasecmp(text_to_cstring(DatumGetTextP(*job_interval)), "null") && !IsConnFromCoord()) {
check_interval_valid(job_id, rel, *job_interval);
}
}
}
* support sumbit job with id on specific CN or DN/ALL_DN/ALL_CN/ALL,
* the job record will be inserted to the pg_job table on all Nodes,
*/
Datum isubmit_job_on_nodes(PG_FUNCTION_ARGS)
{
Datum job_id = PG_GETARG_INT64(0);
Datum node_name = PG_GETARG_DATUM(1);
Datum database = PG_GETARG_DATUM(2);
Datum what = PG_GETARG_DATUM(3);
Datum next_date = PG_GETARG_DATUM(4);
Datum job_interval = PG_GETARG_DATUM(5);
Relation rel = NULL;
check_parameter_for_nodes(fcinfo, 1, 2, 3, 4);
check_job_id(job_id);
rel = heap_open(PgJobRelationId, RowExclusiveLock);
check_job_interval(fcinfo, rel, job_id, &job_interval, 5);
insert_pg_job(rel, job_id, next_date, job_interval, 0, database, node_name, "public");
insert_pg_job_proc(job_id, what);
#ifdef ENABLE_MULTIPLE_NODES
if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
syn_command_to_other_node_internal(node_name, database, job_id, Job_ISubmit_Node, what, next_date, job_interval,
false);
}
#endif
heap_close(rel, RowExclusiveLock);
PG_RETURN_INT32(job_id);
}
* support sumbit tsdb job with id on ALL NODE ,
* the job record will be inserted to the pg_job table on all Nodes,
*/
Datum isubmit_job_on_nodes_internal(PG_FUNCTION_ARGS)
{
if (PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3) || PG_ARGISNULL(4) || PG_ARGISNULL(5)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Parameter can not be null.")));
}
Datum job_id = PG_GETARG_INT64(0);
Datum node_name = PG_GETARG_DATUM(1);
Datum database = PG_GETARG_DATUM(2);
Datum what = PG_GETARG_DATUM(3);
Datum next_date = PG_GETARG_DATUM(4);
Datum job_interval = PG_GETARG_DATUM(5);
Relation rel = NULL;
check_job_id(job_id);
rel = heap_open(PgJobRelationId, RowExclusiveLock);
check_job_interval(fcinfo, rel, job_id, &job_interval, 5);
insert_pg_job(rel, job_id, next_date, job_interval, 0, database, node_name);
insert_pg_job_proc(job_id, what);
#ifdef ENABLE_MULTIPLE_NODES
if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
syn_command_to_other_node_internal(node_name, database, job_id, Job_ISubmit_Node_Internal,
what, next_date, job_interval, false);
}
#endif
heap_close(rel, RowExclusiveLock);
PG_RETURN_INT32(job_id);
}
* support sumbit job on specific CN or DN/ALL_DN/ALL_CN/ALL
*
* Parameters:
* @in node: specifc node name/ALL_CN/ALL_DN/ALL
* @in database: database name
* @in what: task string
* @in next_data: next execute time.
* @in job_interval: Time interval(seconds)
*/
Datum submit_job_on_nodes(PG_FUNCTION_ARGS)
{
Datum node_name = PG_GETARG_DATUM(0);
Datum database = PG_GETARG_DATUM(1);
Datum what = PG_GETARG_DATUM(2);
Datum next_date = PG_GETARG_DATUM(3);
Datum job_interval = PG_GETARG_DATUM(4);
uint16 job_id = 0;
int ret = 0;
Relation rel = NULL;
check_parameter_for_nodes(fcinfo, 0, 1, 2, 3);
rel = heap_open(PgJobRelationId, RowExclusiveLock);
ret = jobid_alloc(&job_id);
if (ret == JOBID_ALLOC_ERROR) {
heap_close(rel, RowExclusiveLock);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("All 32768 jobids have alloc, and there is no free jobid")));
}
check_job_interval(fcinfo, rel, job_id, &job_interval, 4);
insert_pg_job(rel, job_id, next_date, job_interval, 0, database, node_name);
insert_pg_job_proc(job_id, what);
#ifdef ENABLE_MULTIPLE_NODES
if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
syn_command_to_other_node_internal(node_name, database, job_id, Job_ISubmit_Node, what,
next_date, job_interval, false);
}
#endif
heap_close(rel, RowExclusiveLock);
PG_RETURN_INT32(job_id);
}
* Description: Delete job from pg_job and pg_job_proc and tell other cn delete.
*
* Parameters:
* @in pg_job: pg_job relation.
* @in job_id: Job id.
* @in local: remove job locally if true
* Returns: void
*/
static void remove_job_internal(Relation pg_job, int4 job_id, bool ischeck, bool local)
{
HeapTuple tup = NULL;
check_job_id(job_id);
tup = get_job_tup_from_rel(pg_job, job_id);
if (!HeapTupleIsValid(tup)) {
return;
}
if (ischeck) {
check_job_permission(tup, !is_internal_perf_job(job_id));
}
simple_heap_delete(pg_job, &tup->t_self);
heap_freetuple_ext(tup);
delete_job_proc(job_id);
#ifdef ENABLE_MULTIPLE_NODES
if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && !local) {
syn_command_to_other_node(job_id, Job_Remove);
}
#endif
elog(LOG, "Success to remove job, job_id: %d.", job_id);
}
* Description: Remove job related to Oid, Database Oid or User Oid.
*
* Parameters:
* @in oid: Database Oid or User Oid.
* @in oidFlag: Identify Database Oid or User Oid.
* @in local: remove job locally if true
* Returns: void
*/
void remove_job_by_oid(Oid oid, Delete_Pgjob_Oid oidFlag, bool local)
{
Relation pg_job_tbl = NULL;
TableScanDesc scan = NULL;
HeapTuple tuple = NULL;
bool is_regular_job = true;
bool object_not_found = true;
const char *objname = NULL;
Oid jobid = InvalidOid;
switch (oidFlag) {
case DbOid:
objname = get_database_name(oid);
object_not_found = (objname == NULL);
break;
case UserOid:
objname = GetUserNameFromId(oid);
object_not_found = (objname == NULL);
break;
case RelOid:
jobid = oid;
object_not_found = false;
break;
default:
object_not_found = true;
}
if (object_not_found) {
return;
}
pg_job_tbl = heap_open(PgJobRelationId, ExclusiveLock);
scan = heap_beginscan(pg_job_tbl, SnapshotNow, 0, NULL);
while (HeapTupleIsValid(tuple = heap_getnext(scan, ForwardScanDirection))) {
(void)heap_getattr(tuple, Anum_pg_job_job_name, pg_job_tbl->rd_att, &is_regular_job);
Form_pg_job pg_job = (Form_pg_job)GETSTRUCT(tuple);
if (((oidFlag == DbOid && 0 == strcmp(NameStr(pg_job->dbname), objname)) ||
(oidFlag == UserOid && 0 == strcmp(NameStr(pg_job->log_user), objname)) ||
(oidFlag == RelOid && pg_job->job_id == jobid)) &&
(oidFlag != UserOid || is_regular_job)) {
remove_job_internal(pg_job_tbl, pg_job->job_id, false, local);
}
}
heap_endscan(scan);
heap_close(pg_job_tbl, ExclusiveLock);
if (oidFlag == UserOid) {
remove_scheduler_objects_from_owner(get_role_name_str(oid));
}
}
* Description: Delete a job by job_id.
*
* Parameters:
* @in job_id: Job id.
* Returns: void
*/
Datum job_cancel(PG_FUNCTION_ARGS)
{
int64 job_id = PG_GETARG_INT64(0);
Relation relation = NULL;
MemoryContext current_context = CurrentMemoryContext;
bool ischeck = (IS_PGXC_COORDINATOR && !IsConnFromCoord()) || IS_SINGLE_NODE;
bool is_perf_job = is_internal_perf_job(job_id);
LOCKMODE lock_mode = is_perf_job ? ExclusiveLock : RowExclusiveLock;
relation = heap_open(PgJobRelationId, lock_mode);
PG_TRY();
{
if (is_scheduler_job_id(relation, job_id)) {
ereport(ERROR, (errmodule(MOD_JOB), errcode(ERRCODE_INVALID_STATUS),
errmsg("Cannot remove job with jobid:%ld.", job_id),
errdetail("Cannot remove scheduler job with job_cancel"), errcause("Forbidden operation."),
erraction("Please use scheduler interface to operate this action.")));
}
remove_job_internal(relation, job_id, ischeck, false);
}
PG_CATCH();
{
ErrorData* edata = NULL;
MemoryContext ecxt = MemoryContextSwitchTo(current_context);
edata = CopyErrorData();
FlushErrorState();
heap_close(relation, lock_mode);
(void)MemoryContextSwitchTo(ecxt);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Remove jobid:%ld failed.", job_id),
errdetail("%s", edata->message)));
}
PG_END_TRY();
heap_close(relation, is_perf_job ? NoLock : lock_mode);
PG_RETURN_VOID();
}
* Remove from pg_job and pg_job_proc by pg_job.
*/
void RemoveJobById(Oid objectId)
{
int64 job_id = (int64)(objectId);
Relation relation;
MemoryContext current_context = CurrentMemoryContext;
bool ischeck = (IS_PGXC_COORDINATOR && !IsConnFromCoord()) || IS_SINGLE_NODE;
relation = heap_open(PgJobRelationId, RowExclusiveLock);
PG_TRY();
{
check_job_id(job_id);
TableScanDesc scan = NULL;
char* myrolename = NULL;
HeapTuple tuple = NULL;
HeapTuple cp_tuple = NULL;
scan = tableam_scan_begin(relation, SnapshotNow, 0, NULL);
while (HeapTupleIsValid(tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection))) {
Form_pg_job job = (Form_pg_job)GETSTRUCT(tuple);
if (job->job_id == job_id) {
myrolename = GetUserNameFromId(GetUserId());
if (!(superuser_arg(GetUserId()) || systemDBA_arg(GetUserId())) &&
0 != strcmp(NameStr(job->log_user), myrolename)) {
tableam_scan_end(scan);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("Permission for current user to get this job. current_user: %s, job_user: %s, job_id: %ld",
myrolename, NameStr(job->log_user), job->job_id)));
}
cp_tuple = heap_copytuple(tuple);
break;
}
}
heap_endscan(scan);
if (!HeapTupleIsValid(cp_tuple)) {
heap_close(relation, RowExclusiveLock);
PG_TRY_RETURN();
}
if (ischeck) {
check_job_permission(cp_tuple, !is_internal_perf_job(job_id));
}
simple_heap_delete(relation, &cp_tuple->t_self);
heap_freetuple_ext(cp_tuple);
delete_job_proc(job_id);
#ifdef ENABLE_MULTIPLE_NODES
if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
syn_command_to_other_node(job_id, Job_Remove);
}
#endif
elog(LOG, "Success to remove job, job_id: %ld.", job_id);
}
PG_CATCH();
{
ErrorData* edata = NULL;
MemoryContext ecxt = MemoryContextSwitchTo(current_context);
edata = CopyErrorData();
FlushErrorState();
heap_close(relation, RowExclusiveLock);
(void)MemoryContextSwitchTo(ecxt);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Remove jobid:%ld failed.", job_id),
errdetail("%s", edata->message)));
}
PG_END_TRY();
heap_close(relation, RowExclusiveLock);
}
* Description: Update job status to aborted('d') or successfully('s').
*
* Parameters:
* @in job_id: Job id.
* @in next_date: Next execute time.
* Returns: void
*/
void job_finish(PG_FUNCTION_ARGS)
{
int64 job_id = PG_GETARG_INT64(0);
bool finished = PG_GETARG_BOOL(1);
Datum next_time = PG_GETARG_DATUM(2);
HeapTuple tup = NULL;
HeapTuple newtuple = NULL;
Datum values[Natts_pg_job], oldvalues[Natts_pg_job];
bool nulls[Natts_pg_job], oldvisnulls[Natts_pg_job];
bool replaces[Natts_pg_job];
errno_t rc = 0;
Relation relation = NULL;
bool is_perf_job = is_internal_perf_job(job_id);
LOCKMODE lock_mode = is_perf_job ? ExclusiveLock : RowExclusiveLock;
check_job_id(job_id);
relation = heap_open(PgJobRelationId, lock_mode);
tup = get_job_tup_from_rel(relation, job_id);
if (!HeapTupleIsValid(tup)) {
heap_close(relation, lock_mode);
return;
}
check_job_permission(tup, !is_perf_job);
if (is_scheduler_job_id(relation, job_id)) {
heap_close(relation, RowExclusiveLock);
ereport(ERROR, (errmodule(MOD_JOB), errcode(ERRCODE_INVALID_STATUS),
errmsg("Cannot execute job with jobid:%ld.", job_id),
errdetail("Cannot execute scheduler job with current method."),
errcause("Forbidden operation."),
erraction("Please use scheduler interface.")));
}
get_job_values(job_id, tup, relation, oldvalues, oldvisnulls);
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check_c(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check_c(rc, "\0", "\0");
if (!PG_ARGISNULL(1)) {
replaces[Anum_pg_job_job_status - 1] = true;
} else {
finished = (PGJOB_ABORT_STATUS == DatumGetChar(oldvalues[Anum_pg_job_job_status - 1]) ? true : false);
}
if (finished) {
replaces[Anum_pg_job_next_run_date - 1] = true;
values[Anum_pg_job_job_status - 1] = CharGetDatum(PGJOB_ABORT_STATUS);
values[Anum_pg_job_next_run_date - 1] = DirectFunctionCall2(to_timestamp,
DirectFunctionCall1(textin, CStringGetDatum("4000-1-1")),
DirectFunctionCall1(textin, CStringGetDatum("yyyy-mm-dd")));
} else {
values[Anum_pg_job_job_status - 1] = CharGetDatum(PGJOB_SUCC_STATUS);
if (next_time != 0) {
replaces[Anum_pg_job_next_run_date - 1] = true;
values[Anum_pg_job_next_run_date - 1] = next_time;
}
}
newtuple = heap_modify_tuple(tup, RelationGetDescr(relation), values, nulls, replaces);
simple_heap_update(relation, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(relation, newtuple);
heap_freetuple_ext(tup);
heap_freetuple_ext(newtuple);
#ifdef ENABLE_MULTIPLE_NODES
if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
syn_command_to_other_node(job_id, Job_Finish, 0, next_time, 0, finished);
}
#endif
heap_close(relation, is_perf_job ? NoLock : lock_mode);
elog(LOG,
"Success to Finish job, job_id: %ld, finished: %s, next_time: %s.",
job_id,
booltostr(finished),
quote_literal_cstr(DatumGetCString(DirectFunctionCall1(timestamp_out, DatumGetTimestamp(next_time)))));
}
* Description: Update pg_job_proc.
*
* Parameters:
* @in job_id: Job id.
* @in task: Job task.
* Returns: void
*/
static void update_job_proc_what(int4 job_id, Datum task)
{
Relation job_pro_relation = NULL;
HeapTuple tup = NULL;
HeapTuple newtuple = NULL;
Datum values[Natts_pg_job_proc];
bool nulls[Natts_pg_job_proc];
bool replaces[Natts_pg_job_proc];
errno_t rc = 0;
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check_c(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check_c(rc, "\0", "\0");
replaces[Anum_pg_job_proc_what - 1] = true;
values[Anum_pg_job_proc_what - 1] = task;
job_pro_relation = heap_open(PgJobProcRelationId, RowExclusiveLock);
tup = SearchSysCache1(PGJOBPROCID, Int32GetDatum(job_id));
if (!HeapTupleIsValid(tup)) {
heap_close(job_pro_relation, RowExclusiveLock);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("can not find jobid in the job queue %d", job_id)));
}
newtuple = heap_modify_tuple(tup, RelationGetDescr(job_pro_relation), values, nulls, replaces);
simple_heap_update(job_pro_relation, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(job_pro_relation, newtuple);
ReleaseSysCache(tup);
heap_freetuple_ext(newtuple);
heap_close(job_pro_relation, RowExclusiveLock);
}
* Description: Update pg_job what, next_date or interval.
*
* Parameters:
* @in job_id: Job id.
* @in what: Task.
* @in next_date: Next time.
* @in interval_time: Interval.
* Returns: void
*/
void job_update(PG_FUNCTION_ARGS)
{
int64 job_id = PG_GETARG_INT64(0);
Datum next_time = PG_GETARG_DATUM(1);
Datum interval_time = PG_GETARG_DATUM(2);
Datum content = PG_GETARG_DATUM(3);
if (PG_ARGISNULL(0)) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Parameter id for job_update is unexpected.")));
}
Relation relation = NULL;
HeapTuple tup = NULL;
if (PG_ARGISNULL(1) && PG_ARGISNULL(2) && PG_ARGISNULL(3)) {
elog(LOG, "All parameters are NULL for Change, then leave the values.");
return;
}
check_job_id(job_id);
tup = get_job_tup(job_id);
if (!HeapTupleIsValid(tup)) {
return;
}
check_job_permission(tup);
relation = heap_open(PgJobRelationId, RowExclusiveLock);
if (is_scheduler_job_id(relation, job_id)) {
heap_close(relation, RowExclusiveLock);
ereport(ERROR, (errmodule(MOD_JOB), errcode(ERRCODE_INVALID_STATUS),
errmsg("Cannot update job with jobid:%ld.", job_id),
errdetail("Cannot update scheduler job with current method."), errcause("Forbidden operation."),
erraction("Please use scheduler interface.")));
}
if (!PG_ARGISNULL(3)) {
update_job_proc_what(job_id, content);
}
if (!PG_ARGISNULL(1) || !PG_ARGISNULL(2)) {
HeapTuple newtuple = NULL;
Datum values[Natts_pg_job];
bool nulls[Natts_pg_job];
bool replaces[Natts_pg_job];
errno_t rc = 0;
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check_c(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check_c(rc, "\0", "\0");
if (!PG_ARGISNULL(1)) {
replaces[Anum_pg_job_next_run_date - 1] = true;
values[Anum_pg_job_next_run_date - 1] = next_time;
}
if (!PG_ARGISNULL(2)) {
if (0 != pg_strcasecmp(text_to_cstring(DatumGetTextP(interval_time)), "null")
&& !IsConnFromCoord()) {
check_interval_valid(job_id, relation, interval_time);
}
replaces[Anum_pg_job_interval - 1] = true;
values[Anum_pg_job_interval - 1] = interval_time;
}
newtuple = heap_modify_tuple(tup, RelationGetDescr(relation), values, nulls, replaces);
simple_heap_update(relation, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(relation, newtuple);
ReleaseSysCache(tup);
heap_freetuple_ext(newtuple);
} else {
ReleaseSysCache(tup);
}
#ifdef ENABLE_MULTIPLE_NODES
if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
syn_command_to_other_node(job_id, Job_Update, content, next_time, interval_time);
}
#endif
heap_close(relation, RowExclusiveLock);
elog(LOG,
"Success to Update job, job_id: %ld, content: %s, next_time: %s, interval_time: %s.",
job_id,
0 == content ? "null" : quote_literal_cstr(text_to_cstring(DatumGetTextP(content))),
0 == next_time
? "null"
: quote_literal_cstr(DatumGetCString(DirectFunctionCall1(timestamp_out, DatumGetTimestamp(next_time)))),
0 == interval_time ? "null" : quote_literal_cstr(text_to_cstring(DatumGetTextP(interval_time))));
}
* Description: Check job status is 'r' and update to 'f' when start job scheduler thread
*
* Returns: void
*/
void update_run_job_to_fail()
{
Relation pg_job_tbl = NULL;
TableScanDesc scan = NULL;
HeapTuple tuple = NULL;
HeapTuple newtuple = NULL;
Datum values[Natts_pg_job], old_value[Natts_pg_job];
bool nulls[Natts_pg_job], visnull[Natts_pg_job];
bool replaces[Natts_pg_job];
errno_t rc;
MemoryContext current_context = CurrentMemoryContext;
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), 0, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), 0, sizeof(replaces));
securec_check(rc, "\0", "\0");
replaces[Anum_pg_job_job_status - 1] = true;
replaces[Anum_pg_job_failure_count - 1] = true;
pg_job_tbl = heap_open(PgJobRelationId, ExclusiveLock);
scan = heap_beginscan(pg_job_tbl, SnapshotNow, 0, NULL);
while (HeapTupleIsValid(tuple = heap_getnext(scan, ForwardScanDirection))) {
Form_pg_job pg_job = (Form_pg_job)GETSTRUCT(tuple);
#ifdef ENABLE_MULTIPLE_NODES
if (pg_job->job_status == PGJOB_RUN_STATUS &&
0 == strcmp(pg_job->node_name.data, g_instance.attr.attr_common.PGXCNodeName)) {
#else
if (pg_job->job_status == PGJOB_RUN_STATUS) {
#endif
get_job_values(pg_job->job_id, tuple, pg_job_tbl, old_value, visnull);
values[Anum_pg_job_failure_count - 1] = Int16GetDatum(pg_job->failure_count + 1);
if (pg_job->failure_count + 1 >= JOB_MAX_FAIL_COUNT) {
values[Anum_pg_job_job_status - 1] = CharGetDatum(PGJOB_ABORT_STATUS);
replaces[Anum_pg_job_next_run_date - 1] = true;
values[Anum_pg_job_next_run_date - 1] = DirectFunctionCall2(to_timestamp,
DirectFunctionCall1(textin, CStringGetDatum("4000-1-1")),
DirectFunctionCall1(textin, CStringGetDatum("yyyy-mm-dd")));
} else
values[Anum_pg_job_job_status - 1] = CharGetDatum(PGJOB_FAIL_STATUS);
newtuple = heap_modify_tuple(tuple, RelationGetDescr(pg_job_tbl), values, nulls, replaces);
simple_heap_update(pg_job_tbl, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(pg_job_tbl, newtuple);
if (IS_SINGLE_NODE) {
continue;
}
char* update_query = query_with_update_job(pg_job->job_id,
values[Anum_pg_job_job_status - 1],
pg_job->current_postgres_pid,
visnull[Anum_pg_job_last_start_date - 1] ? 0 : old_value[Anum_pg_job_last_start_date - 1],
visnull[Anum_pg_job_last_end_date - 1] ? 0 : old_value[Anum_pg_job_last_end_date - 1],
visnull[Anum_pg_job_last_suc_date - 1] ? 0 : old_value[Anum_pg_job_last_suc_date - 1],
visnull[Anum_pg_job_this_run_date - 1] ? 0 : old_value[Anum_pg_job_this_run_date - 1],
values[Anum_pg_job_next_run_date - 1],
values[Anum_pg_job_failure_count - 1],
values[Anum_pg_job_node_name - 1],
visnull[Anum_pg_job_failure_msg - 1] ? 0 : old_value[Anum_pg_job_failure_msg - 1]);
* If update job status in local success and only synchronize to other coordinator fail,
* we should consider the worker success. Because it will result in job scheduler thread
* start failed.
*/
if (update_query != NULL) {
update_pg_job_on_remote(update_query, pg_job->job_id, current_context);
pfree_ext(update_query);
}
}
}
heap_endscan(scan);
heap_close(pg_job_tbl, ExclusiveLock);
}
* Used in tsdb. Generate a random job id. pg_job is checked to ensure the generated id is not conflicted.
* CAUTION: the function only tries JOBID_MAX_NUMBER times. So if there have been a lot of jobs in pg_job,
* the function is likely to fail, although there is still valid id to use.
*/
static int get_random_job_id(int64 job_max_number = JOBID_MAX_NUMBER)
{
if (job_max_number <= InvalidJobId) {
ereport(ERROR, (errmodule(MOD_JOB), errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Cannot generate job id."), errdetail("N/A"), errcause("Invalid job id range set."),
erraction("Please recheck job status.")));
}
int job_id = gs_random() % job_max_number;
HeapTuple tup = NULL;
uint32 loop_count = 0;
while (loop_count < job_max_number) {
if (likely(job_id > 0)) {
tup = SearchSysCache1(PGJOBID, Int64GetDatum(job_id));
if (!HeapTupleIsValid(tup)) {
break;
}
loop_count++;
ReleaseSysCache(tup);
}
job_id = gs_random() % job_max_number;
}
if (loop_count == job_max_number) {
ereport(LOG,
(errmodule(MOD_TIMESERIES), errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Cannot find a valid job_id randomly, try to search one by one.")));
return 0;
}
return job_id;
}
* Description: Alloc a valid jobid.
*
* Parameters:
* @in pusJobId: Return job id.
* Returns: int
*/
int jobid_alloc(uint16* pusJobId, int64 job_max_number)
{
int job_id = get_random_job_id(job_max_number);
if (job_id != 0) {
*pusJobId = job_id;
return JOBID_ALLOC_OK;
}
HeapTuple tup = NULL;
for (int job_id = 1; job_id <= job_max_number; job_id++) {
tup = SearchSysCache1(PGJOBID, Int64GetDatum(job_id));
if (!HeapTupleIsValid(tup)) {
*pusJobId = job_id;
return JOBID_ALLOC_OK;
}
ReleaseSysCache(tup);
}
return JOBID_ALLOC_ERROR;
}
* Description: Check whether current user have authority to operate the job.
*
* Parameters:
* @in tuple: pg_job tuple.
* Returns: void
*/
void check_job_permission(HeapTuple tuple, bool check_running)
{
char *mydbname = NULL, *myrolename = NULL;
Form_pg_job job = NULL;
job = (Form_pg_job)GETSTRUCT(tuple);
myrolename = GetUserNameFromId(GetUserId());
if (!(superuser_arg(GetUserId()) || systemDBA_arg(GetUserId())) &&
0 != strcmp(NameStr(job->log_user), myrolename)) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_OPERATION),
errmsg("Permission for current user to operate the job. current_user: %s, job_user: %s, job_id: %ld",
myrolename,
NameStr(job->log_user),
job->job_id)));
}
Oid dboid = get_database_oid(NameStr(job->dbname), true);
if (OidIsValid(dboid)) {
mydbname = get_database_name(u_sess->proc_cxt.MyDatabaseId);
if (mydbname && 0 != strcmp(NameStr(job->dbname), mydbname)) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_OPERATION),
errmsg(
"Permission for current database to operate the job. current_dboid: %s, job_dboid: %s, job_id: %ld",
mydbname,
NameStr(job->dbname),
job->job_id)));
}
} else {
ereport(LOG, (errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" of job %ld does not exist", NameStr(job->dbname), job->job_id)));
}
if (check_running && job->job_status == PGJOB_RUN_STATUS) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS),
errmsg("Can not operate this job due to running status, job_id: %ld. ", job->job_id)));
}
}
* Description: Output detail info of job execute to log.
*
* Parameters:
* @in job_id: Job id.
* @in what: Job task info.
* @in status: Job status.
* @in errmsg: The error message if execute job failed.
* Returns: void
*/
static void elog_job_detail(int4 job_id, char* what, Update_Pgjob_Status status, char* errmsg)
{
HeapTuple tup = NULL;
StringInfoData buf;
Relation relation = NULL;
Datum values[Natts_pg_job];
bool nulls[Natts_pg_job];
MemoryContext current_context = CurrentMemoryContext;
ResourceOwner save = t_thrd.utils_cxt.CurrentResourceOwner;
StartTransactionCommand();
tup = get_job_tup(job_id);
relation = heap_open(PgJobRelationId, AccessShareLock);
get_job_values(job_id, tup, relation, values, nulls);
initStringInfo(&buf);
appendStringInfoString(&buf, "Execute Job Detail: \n");
appendStringInfo(&buf, "job_id: %d \n", job_id);
appendStringInfo(&buf, "what: %s \n", what);
appendStringInfo(&buf,
"start_date: %s \n",
DatumGetCString(
DirectFunctionCall1(timestamp_out, DatumGetTimestamp(values[Anum_pg_job_last_start_date - 1]))));
if (Pgjob_Succ == status) {
appendStringInfoString(&buf, "job_status: success \n");
} else {
appendStringInfoString(&buf, "job_status: failed \n");
appendStringInfo(&buf, "detail error msg: %s \n", errmsg);
}
appendStringInfo(&buf,
"end_date: %s \n",
DatumGetCString(DirectFunctionCall1(timestamp_out, DatumGetTimestamp(values[Anum_pg_job_last_end_date - 1]))));
appendStringInfo(&buf,
"next_run_date: %s \n",
DatumGetCString(DirectFunctionCall1(timestamp_out, DatumGetTimestamp(values[Anum_pg_job_next_run_date - 1]))));
elog(LOG, "%s", buf.data);
ReleaseSysCache(tup);
pfree_ext(buf.data);
heap_close(relation, AccessShareLock);
CommitTransactionCommand();
(void)MemoryContextSwitchTo(current_context);
t_thrd.utils_cxt.CurrentResourceOwner = save;
}
* Description: Construct query with update the job info.
*
* Parameters:
* @in job_id: Job id.
* @in job_status: Job status.
* @in pid: Job belong thread id.
* @in last_start_date: Job last start time.
* @in last_end_date: Job last end time.
* @in last_suc_date: Job last success time.
* @in this_run_date: Job current run time.
* @in next_run_date: Job next run time.
* @in failure_count: Job failure count.
* Returns: char*
*/
static char* query_with_update_job(int4 job_id, Datum job_status, int64 pid, Datum last_start_date, Datum last_end_date,
Datum last_suc_date, Datum this_run_date, Datum next_run_date, int2 failure_count, Datum node_name, Datum fail_msg)
{
StringInfoData queryString;
if (!IS_PGXC_COORDINATOR ||
node_name == 0 ||
strcmp(DatumGetName(node_name)->data, PGJOB_TYPE_ALL) == 0 ||
strcmp(DatumGetName(node_name)->data, PGJOB_TYPE_ALL_CN) == 0 ||
strcmp(DatumGetName(node_name)->data, PGJOB_TYPE_ALL_DN) == 0) {
* ALL_NODE/ALL_CN/ALL_DN won't sync status to other,
* one CN only sync job status to other CNs(specfic job),
* DN won't sync job status to others.
*/
return NULL;
}
initStringInfo(&queryString);
if (t_thrd.proc->workingVersionNum >= 92473) {
appendStringInfo(&queryString,
"select * from pg_catalog.update_pgjob(%d, \'%c\', %ld, %s, %s, %s, %s, %s, %d , %s);",
job_id,
DatumGetChar(job_status),
pid,
last_start_date == 0 ? "null"
: quote_literal_cstr(DatumGetCString(
DirectFunctionCall1(timestamp_out, DatumGetTimestamp(last_start_date)))),
last_end_date == 0
? "null"
: quote_literal_cstr(DatumGetCString(DirectFunctionCall1(timestamp_out,
DatumGetTimestamp(last_end_date)))),
last_suc_date == 0
? "null"
: quote_literal_cstr(DatumGetCString(DirectFunctionCall1(timestamp_out,
DatumGetTimestamp(last_suc_date)))),
this_run_date == 0
? "null"
: quote_literal_cstr(DatumGetCString(DirectFunctionCall1(timestamp_out,
DatumGetTimestamp(this_run_date)))),
quote_literal_cstr(DatumGetCString(DirectFunctionCall1(timestamp_out, DatumGetTimestamp(next_run_date)))),
failure_count,
fail_msg == 0
? "null"
: quote_literal_cstr(DatumGetCString(fail_msg)));
} else {
appendStringInfo(&queryString,
"select * from pg_catalog.update_pgjob(%d, \'%c\', %ld, %s, %s, %s, %s, %s, %d);",
job_id,
DatumGetChar(job_status),
pid,
last_start_date == 0 ? "null"
: quote_literal_cstr(DatumGetCString(
DirectFunctionCall1(timestamp_out, DatumGetTimestamp(last_start_date)))),
last_end_date == 0
? "null"
: quote_literal_cstr(DatumGetCString(DirectFunctionCall1(timestamp_out,
DatumGetTimestamp(last_end_date)))),
last_suc_date == 0
? "null"
: quote_literal_cstr(DatumGetCString(DirectFunctionCall1(timestamp_out,
DatumGetTimestamp(last_suc_date)))),
this_run_date == 0
? "null"
: quote_literal_cstr(DatumGetCString(DirectFunctionCall1(timestamp_out,
DatumGetTimestamp(this_run_date)))),
quote_literal_cstr(DatumGetCString(DirectFunctionCall1(timestamp_out, DatumGetTimestamp(next_run_date)))),
failure_count);
}
elog(LOG, "query with update job: %s", queryString.data);
return queryString.data;
}
* check the job is internal perf job or not.
*/
static bool is_internal_perf_job(int64 job_id)
{
bool result = false;
const char *what = get_job_what(job_id, false);
if (what != NULL) {
result = strcasestr(what, " capture_view_to_json(") != NULL;
pfree_ext(what);
}
return result;
}
static bool is_job_aborted(Datum job_status)
{
return (DatumGetChar(job_status) == PGJOB_ABORT_STATUS);
}
static HeapTuple get_job_tup_from_rel(Relation job_rel, int job_id)
{
TableScanDesc scan = NULL;
char* myrolename = NULL;
HeapTuple tuple = NULL;
HeapTuple cp_tuple = NULL;
scan = tableam_scan_begin(job_rel, SnapshotNow, 0, NULL);
while (HeapTupleIsValid(tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection))) {
Form_pg_job job = (Form_pg_job)GETSTRUCT(tuple);
if (job->job_id == job_id) {
myrolename = GetUserNameFromId(GetUserId());
if (!(superuser_arg(GetUserId()) || systemDBA_arg(GetUserId())) &&
0 != strcmp(NameStr(job->log_user), myrolename)) {
heap_endscan(scan);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("Permission for current user to get this job. current_user: %s, job_user: %s, job_id: %ld",
myrolename,
NameStr(job->log_user),
job->job_id)));
}
cp_tuple = heap_copytuple(tuple);
break;
}
}
tableam_scan_end(scan);
if (cp_tuple != NULL) {
return cp_tuple;
}
if (IS_PGXC_DATANODE && IsConnFromCoord())
return NULL;
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("Can not find job id %d in system table pg_job.", job_id)));
return NULL;
}