*
* remotecopy.c
* Implements an extension of COPY command for remote management
*
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
* Portions Copyright (c) 2010-2012, Postgres-XC Development Group
*
*
* IDENTIFICATION
* src/backend/pgxc/copy/remotecopy.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "miscadmin.h"
#include "lib/stringinfo.h"
#include "optimizer/pgxcship.h"
#include "optimizer/planner.h"
#include "pgxc/pgxcnode.h"
#include "pgxc/remotecopy.h"
#include "rewrite/rewriteHandler.h"
#include "utils/builtins.h"
#include "utils/rel.h"
#include "utils/rel_gs.h"
#include "utils/lsyscache.h"
#include "commands/copy.h"
#include "utils/elog.h"
#include "mb/pg_wchar.h"
static void RemoteCopy_QuoteStr(StringInfo query_buf, char* value);
* RemoteCopy_GetRelationLoc
* Get relation node list based on COPY data involved. An empty list is
* returned to caller if relation involved has no locator information
* as it is the case of a system relation.
*/
void RemoteCopy_GetRelationLoc(RemoteCopyData* state, Relation rel, List* attnums)
{
#ifndef ENABLE_MULTIPLE_NODES
Assert(false);
DISTRIBUTED_FEATURE_NOT_SUPPORTED();
return;
#else
ExecNodes* exec_nodes = NULL;
* If target table does not exists on nodes (e.g. system table)
* the location info returned is NULL. This is the criteria, when
* we need to run COPY on Coordinator
*/
state->rel_loc = GetRelationLocInfo(RelationGetRelid(rel));
if (state->rel_loc) {
* Pick up one node only
* This case corresponds to a replicated table with COPY TO
*
*/
exec_nodes = makeNode(ExecNodes);
if (!state->is_from && IsRelationReplicated(state->rel_loc))
exec_nodes->nodeList = GetPreferredReplicationNode(state->rel_loc->nodeList);
else {
exec_nodes->nodeList = list_concat(exec_nodes->nodeList, state->rel_loc->nodeList);
}
}
state->idx_dist_by_col = NULL;
if (state->rel_loc && state->rel_loc->partAttrNum != NULL) {
* Find the column used as key for data distribution.
* First scan attributes of tuple descriptor with the list
* of attributes used in COPY if any list is specified.
* If no list is specified, set this value to the one of
* locator information.
*/
if (attnums != NIL) {
ListCell* cur = NULL;
foreach (cur, state->rel_loc->partAttrNum) {
int attnum = lfirst_int(cur);
if (list_member_int(attnums, attnum)) {
state->idx_dist_by_col = lappend_int(state->idx_dist_by_col, attnum - 1);
}
}
} else {
ListCell* cell = NULL;
AttrNumber num;
foreach (cell, state->rel_loc->partAttrNum) {
num = lfirst_int(cell);
state->idx_dist_by_col = lappend_int(state->idx_dist_by_col, num - 1);
}
}
}
state->exec_nodes = exec_nodes;
#endif
}
* RemoteCopy_BuildStatement
* Build a COPY query for remote management
*/
void RemoteCopy_BuildStatement(
RemoteCopyData* state, Relation rel, RemoteCopyOptions* options, List* attnamelist, List* attnums)
{
int attnum;
TupleDesc tupDesc = RelationGetDescr(rel);
* Build up query string for the Datanodes, it should match
* to original string, but should have STDIN/STDOUT instead
* of filename.
*/
initStringInfo(&state->query_buf);
appendStringInfoString(&state->query_buf, "COPY ");
* The table name should be qualified, unless if the table is
* a temporary one
*/
if (RelationIsLocalTemp(rel))
appendStringInfo(&state->query_buf, "%s", quote_identifier(RelationGetRelationName(rel)));
else
appendStringInfo(&state->query_buf,
"%s",
quote_qualified_identifier(get_namespace_name(RelationGetNamespace(rel)), RelationGetRelationName(rel)));
if (attnamelist != NIL) {
appendStringInfoString(&state->query_buf, " (");
* For COPY FROM, we need to append unspecified attributes that have
* default expressions associated.
*/
if (state->is_from) {
for (attnum = 1; attnum <= tupDesc->natts; attnum++) {
if (tupDesc->attrs[attnum - 1].attisdropped)
continue;
if (!list_member_int(attnums, attnum)) {
Expr* defexpr = (Expr*)build_column_default(rel, attnum);
if (defexpr && ((!pgxc_is_expr_shippable(expression_planner(defexpr), NULL)) ||
(list_member_int(state->idx_dist_by_col, attnum - 1)))) {
appendStringInfoString(
&state->query_buf, quote_identifier(NameStr(tupDesc->attrs[attnum - 1].attname)));
appendStringInfoString(&state->query_buf, ", ");
}
}
}
}
ListCell* cell = NULL;
foreach (cell, attnamelist) {
appendStringInfoString(&state->query_buf, quote_identifier(strVal(lfirst(cell))));
appendStringInfoString(&state->query_buf, ", ");
}
int blankPos = 1;
int delimPos = 1;
state->query_buf.data[state->query_buf.len - blankPos] = '\0';
state->query_buf.data[state->query_buf.len - blankPos - delimPos] = '\0';
state->query_buf.len -= (blankPos + delimPos);
appendStringInfoChar(&state->query_buf, ')');
}
if (state->is_from)
appendStringInfoString(&state->query_buf, " FROM STDIN");
else
appendStringInfoString(&state->query_buf, " TO STDOUT");
if (options->rco_format == FORMAT_BINARY)
appendStringInfoString(&state->query_buf, " BINARY");
if (options->rco_oids)
appendStringInfoString(&state->query_buf, " OIDS");
if (options->rco_freeze)
appendStringInfoString(&state->query_buf, " FREEZE");
if (options->rco_without_escaping)
appendStringInfoString(&state->query_buf, " WITHOUT ESCAPING ");
if (options->rco_delim) {
if ((options->rco_format != FORMAT_CSV && strcmp(options->rco_delim, "\t") != 0) ||
(options->rco_format == FORMAT_CSV && strcmp(options->rco_delim, ",") != 0)) {
appendStringInfoString(&state->query_buf, " DELIMITER AS ");
RemoteCopy_QuoteStr(&state->query_buf, options->rco_delim);
}
}
if (options->rco_null_print) {
if ((options->rco_format != FORMAT_CSV && strcmp(options->rco_null_print, "\\N")) ||
(options->rco_format == FORMAT_CSV && strcmp(options->rco_null_print, ""))) {
appendStringInfoString(&state->query_buf, " NULL AS ");
RemoteCopy_QuoteStr(&state->query_buf, options->rco_null_print);
}
}
if (options->rco_format == FORMAT_CSV)
appendStringInfoString(&state->query_buf, " CSV");
else if (options->rco_format == FORMAT_FIXED) {
FixFormatter* formatter = (FixFormatter*)(options->rco_formatter);
FieldDesc* descs = formatter->fieldDesc;
int nfields = formatter->nfield;
appendStringInfoString(&state->query_buf, " FORMATTER( ");
for (int i = 0; i < nfields; i++) {
char* fieldname = descs[i].fieldname;
appendStringInfo(&state->query_buf, "%s(%d,%d)", fieldname, descs[i].fieldPos, descs[i].fieldSize);
if (i != nfields - 1)
appendStringInfoString(&state->query_buf, ",");
}
appendStringInfoString(&state->query_buf, " ) ");
appendStringInfoString(&state->query_buf, "FIXED");
}
if (options->rco_eol_type == EOL_CRNL)
appendStringInfoString(&state->query_buf, " eol '0x0D0A'");
else if (options->rco_eol_type == EOL_CR)
appendStringInfoString(&state->query_buf, " eol '0x0D'");
else if (options->rco_eol_type == EOL_UD) {
appendStringInfoString(&state->query_buf, " eol ");
RemoteCopy_QuoteStr(&state->query_buf, options->rco_eol);
}
* It is not necessary to send the HEADER part to Datanodes.
* Sending data is sufficient.
*/
if (options->rco_quote && options->rco_quote[0] != '"') {
appendStringInfoString(&state->query_buf, " QUOTE AS ");
RemoteCopy_QuoteStr(&state->query_buf, options->rco_quote);
}
if (options->rco_escape && options->rco_quote && options->rco_escape[0] != options->rco_quote[0]) {
appendStringInfoString(&state->query_buf, " ESCAPE AS ");
RemoteCopy_QuoteStr(&state->query_buf, options->rco_escape);
}
if (options->rco_force_quote) {
ListCell* cell = NULL;
ListCell* prev = NULL;
appendStringInfoString(&state->query_buf, " FORCE QUOTE ");
foreach (cell, options->rco_force_quote) {
if (prev != NULL)
appendStringInfoString(&state->query_buf, ", ");
appendStringInfoString(&state->query_buf, quote_identifier(strVal(lfirst(cell))));
prev = cell;
}
}
if (options->rco_force_notnull) {
ListCell* cell = NULL;
ListCell* prev = NULL;
appendStringInfoString(&state->query_buf, " FORCE NOT NULL ");
foreach (cell, options->rco_force_notnull) {
if (prev != NULL)
appendStringInfoString(&state->query_buf, ", ");
appendStringInfoString(&state->query_buf, quote_identifier(strVal(lfirst(cell))));
prev = cell;
}
}
if (options->rco_date_format) {
appendStringInfoString(&state->query_buf, " date_format ");
RemoteCopy_QuoteStr(&state->query_buf, options->rco_date_format);
}
if (options->rco_time_format) {
appendStringInfoString(&state->query_buf, " time_format ");
RemoteCopy_QuoteStr(&state->query_buf, options->rco_time_format);
}
if (options->rco_timestamp_format) {
appendStringInfoString(&state->query_buf, " timestamp_format ");
RemoteCopy_QuoteStr(&state->query_buf, options->rco_timestamp_format);
}
if (options->rco_smalldatetime_format) {
appendStringInfoString(&state->query_buf, " smalldatetime_format ");
RemoteCopy_QuoteStr(&state->query_buf, options->rco_smalldatetime_format);
}
if (options->rco_compatible_illegal_chars)
appendStringInfoString(&state->query_buf, " COMPATIBLE_ILLEGAL_CHARS");
if (options->rco_ignore_extra_data)
appendStringInfoString(&state->query_buf, " IGNORE_EXTRA_DATA");
if (options->rco_fill_missing_fields)
appendStringInfoString(&state->query_buf, " FILL_MISSING_FIELDS");
if (options->rco_fixedEncoding != -1) {
appendStringInfoString(&state->query_buf, " ENCODING ");
char* encoding = const_cast<char *>(pg_encoding_to_char(options->rco_fixedEncoding));
RemoteCopy_QuoteStr(&state->query_buf, encoding);
}
if (options->transform_query_string) {
appendStringInfoChar(&state->query_buf, ' ');
appendStringInfoString(&state->query_buf, options->transform_query_string);
}
}
* Build a default set for RemoteCopyOptions
*/
RemoteCopyOptions* makeRemoteCopyOptions(void)
{
#ifndef ENABLE_MULTIPLE_NODES
Assert(false);
DISTRIBUTED_FEATURE_NOT_SUPPORTED();
return NULL;
#else
RemoteCopyOptions* res = (RemoteCopyOptions*)palloc(sizeof(RemoteCopyOptions));
res->rco_format = FORMAT_UNKNOWN;
res->rco_oids = false;
res->rco_without_escaping = false;
res->rco_delim = NULL;
res->rco_null_print = NULL;
res->rco_quote = NULL;
res->rco_escape = NULL;
res->rco_eol = NULL;
res->rco_force_quote = NIL;
res->rco_force_notnull = NIL;
res->rco_eol_type = EOL_NL;
res->rco_date_format = NULL;
res->rco_time_format = NULL;
res->rco_timestamp_format = NULL;
res->rco_smalldatetime_format = NULL;
res->rco_compatible_illegal_chars = false;
res->rco_ignore_extra_data = false;
res->rco_fill_missing_fields = false;
res->rco_fixedEncoding = -1;
return res;
#endif
}
* FreeRemoteCopyOptions
* Free remote COPY options structure
*/
void FreeRemoteCopyOptions(RemoteCopyOptions* options)
{
if (options == NULL)
return;
if (options->rco_delim)
pfree(options->rco_delim);
if (options->rco_null_print)
pfree(options->rco_null_print);
if (options->rco_quote)
pfree(options->rco_quote);
if (options->rco_escape)
pfree(options->rco_escape);
if (options->rco_eol)
pfree(options->rco_eol);
if (options->rco_force_quote)
list_free(options->rco_force_quote);
if (options->rco_force_notnull)
list_free(options->rco_force_notnull);
pfree(options);
}
* FreeRemoteCopyData
* Free remote COPY state data structure
*/
void FreeRemoteCopyData(RemoteCopyData* state)
{
#ifndef ENABLE_MULTIPLE_NODES
Assert(false);
DISTRIBUTED_FEATURE_NOT_SUPPORTED();
return;
#else
if (state == NULL)
return;
if (state->connections)
pfree(state->connections);
if (state->query_buf.data)
pfree(state->query_buf.data);
FreeRelationLocInfo(state->rel_loc);
pfree(state);
#endif
}
#define APPENDSOFAR(query_buf, start, current) \
if ((current) > (start)) \
appendBinaryStringInfo((query_buf), (start), (current) - (start))
* RemoteCopy_QuoteStr
* Append quoted value to the query buffer. Value is escaped if needed
* When rewriting query to be sent down to nodes we should escape special
* characters, that may present in the value. The characters are backslash(\)
* and single quote ('). These characters are escaped by doubling. We do not
* have to escape characters like \t, \v, \b, etc. because Datanode interprets
* them properly.
* We use E'...' syntax for literals containing backslashes.
*/
static void RemoteCopy_QuoteStr(StringInfo query_buf, char* value)
{
char* start = value;
char* current = value;
char c;
bool has_backslash = (strchr(value, '\\') != NULL);
if (has_backslash)
appendStringInfoChar(query_buf, 'E');
appendStringInfoChar(query_buf, '\'');
while ((c = *current) != '\0') {
switch (c) {
case '\\':
case '\'':
APPENDSOFAR(query_buf, start, current);
appendStringInfoChar(query_buf, c);
start = current;
default:
current++;
}
}
APPENDSOFAR(query_buf, start, current);
appendStringInfoChar(query_buf, '\'');
}