* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1994, Regents of the University of California
*
*
*
* gds_stream.cpp
*
* IDENTIFICATION
* src/gausskernel/optimizer/commands/gds_stream.cpp
*
* -------------------------------------------------------------------------
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/poll.h>
#include "commands/gds_stream.h"
#include "libpq/ip.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "storage/smgr/fd.h"
#include "miscadmin.h"
#include "port.h"
#include "libpq/pqformat.h"
#include "pgstat.h"
#define GDS_STREAM_BUF_EXTEND_SIZE 8192
extern const char* GSFS_PREFIX;
extern const char* GSFSS_PREFIX;
void GDSUri::Trim(char* str)
{
int len;
char* end = NULL;
char* cur = NULL;
char* begin = str;
int maxlen;
if (!str)
return;
maxlen = strlen(str);
while (begin != NULL && isspace((int)*begin))
begin++;
for (end = cur = begin; *cur != '\0'; cur++) {
if (!isspace((int)*cur))
end = cur;
}
len = end - begin + 1;
errno_t rc;
rc = memmove_s(str, maxlen, begin, len);
securec_check(rc, "", "");
str[len] = '\0';
}
void GDSUri::Parse(const char* uri)
{
char* delimPos = NULL;
char* ptr = NULL;
errno_t rc = 0;
if (!uri)
return;
m_uri = pstrdup(uri);
GDSUri::Trim(m_uri);
ptr = m_uri;
* Get protocal
* if no protocal found, we suppose that the url is a local path
*/
delimPos = strstr(ptr, "://");
if (delimPos != NULL) {
m_protocol = pnstrdup(ptr, delimPos - ptr);
ptr = delimPos + 3;
} else {
m_path = pstrdup(m_uri);
return;
}
delimPos = strstr(ptr, "/");
if (delimPos != NULL) {
m_path = pstrdup(delimPos);
*delimPos = '\0';
}
delimPos = strstr(ptr, ":");
if (delimPos != NULL) {
char* port = delimPos + 1;
m_port = atoi(port);
*delimPos = '\0';
}
if (*ptr != '\0')
m_host = pstrdup(ptr);
rc = strncpy_s(m_uri, strlen(uri) + 1, uri, strlen(uri));
securec_check(rc, "\0", "\0");
}
GDSStream::GDSStream() : m_uri(NULL), m_fd(-1), m_ssl_enable(false), m_ssl(NULL), m_inBuf(NULL), m_outBuf(NULL)
{
m_read = &GDSStream::InternalRead;
}
GDSStream::~GDSStream()
{
Close();
}
* @Description: init SSL evn
* @See also:
*/
void GDSStream::InitSSL(void)
{
gs_openssl_cli_init_system();
char ssl_dir[MAXPGPATH] = {0};
* GDSStream will load the certificate files if using SSL,
* including root(cacert.pem) and client (client.crt) certificate
* files. All the files are loacated in $GAUSSHOME/share/sslcert/gds/
*/
char* homedir = gs_getenv_r("GAUSSHOME");
if (NULL == homedir) {
ereport(ERROR,
(errmodule(MOD_SSL), errcode_for_file_access(), errmsg("env $GAUSSHOME not found, please set it first")));
}
char real_homedir[PATH_MAX + 1] = {'\0'};
if (realpath(homedir, real_homedir) == NULL) {
ereport(ERROR,
(errmodule(MOD_EXECUTOR), errcode(ERRCODE_EXTERNAL_ROUTINE_INVOCATION_EXCEPTION),
errmsg("Failed to obtain environment value $GAUSSHOME!"),
errdetail("N/A"),
errcause("Incorrect environment value."),
erraction("Please refer to backend log for more details.")));
}
homedir = NULL;
if (backend_env_valid(real_homedir, "GAUSSHOME") == false) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Incorrect backend environment variable $GAUSSHOME"),
errdetail("Please refer to the backend instance log for the detail")));
}
int nRet = snprintf_s(ssl_dir, MAXPGPATH, MAXPGPATH - 1, "%s/share/sslcert/gds/", real_homedir);
securec_check_ss_c(nRet, "\0", "\0");
if (m_ssl == NULL) {
m_ssl = gs_openssl_cli_create();
}
gs_openssl_cli_setfiles(m_ssl, ssl_dir, "cacert.pem", "server.key", "server.crt");
if (gs_openssl_cli_initialize_SSL(m_ssl, m_fd) < 0) {
ereport(ERROR, (errmodule(MOD_SSL), errcode_for_file_access(), errmsg("failed to bind fd(%d) to ssl", m_fd)));
}
}
void GDSStream::Initialize(const char* uri)
{
m_inBuf = makeStringInfo();
m_outBuf = makeStringInfo();
m_uri = New(CurrentMemoryContext) GDSUri;
m_uri->Parse(uri);
VerifyAddr();
m_fd = AllocateSocket(m_uri->m_host, m_uri->m_port);
m_ssl_enable = false;
if (strncmp(m_uri->m_protocol, GSFSS_PREFIX, 5) == 0) {
InitSSL();
m_ssl_enable = true;
m_read = &GDSStream::InternalReadSSL;
}
if (!pg_set_noblock(m_fd)) {
ereport(WARNING, (errmsg("could not set socket to non-blocking mode: %m")));
}
}
void GDSStream::Close()
{
if (m_fd > 0) {
(void)FreeSocket(m_fd);
m_fd = -1;
}
gs_openssl_cli_destroy(m_ssl);
m_ssl = NULL;
if (m_uri) {
delete m_uri;
m_uri = NULL;
}
if (m_inBuf) {
pfree_ext(m_inBuf->data);
pfree_ext(m_inBuf);
m_inBuf = NULL;
}
if (m_outBuf) {
pfree_ext(m_outBuf->data);
pfree_ext(m_outBuf);
m_outBuf = NULL;
}
}
int GDSStream::Read()
{
int retval;
#ifdef HAVE_POLL
pollfd ufds[2];
errno_t rc = memset_s(ufds, sizeof(ufds), 0, sizeof(ufds));
securec_check(rc, "\0", "\0");
#else
fd_set readfds;
FD_ZERO(&readfds);
#endif
retry:
t_thrd.int_cxt.ImmediateInterruptOK = true;
CHECK_FOR_INTERRUPTS();
#ifdef HAVE_POLL
ufds[0].fd = m_fd;
ufds[0].events = POLLIN | POLLPRI;
retval = poll(ufds, 1, -1);
#else
if ((m_fd + 1) > FD_SETSIZE) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("m_fd + 1 cannot be greater than FD_SETSIZE")));
}
FD_SET(m_fd, &readfds);
retval = select(m_fd + 1, &readfds, NULL, NULL, -1);
#endif
t_thrd.int_cxt.ImmediateInterruptOK = false;
if (retval < 0) {
if (errno == EINTR || errno == EAGAIN)
goto retry;
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("Unexpected EOF on GDS connection \"%s\": %m", m_uri->ToString()),
errhint("The peer GDS may run in safety mode with SSL certification.")));
return EOF;
}
* parse and display all the other functions.
*/
#ifdef HAVE_POLL
if ((uint32)ufds[0].revents & POLLIN) {
#else
if (FD_ISSET(m_fd, &readfds)) {
#endif
#ifdef HAVE_POLL
ufds[0].revents = 0;
#endif
int read_status = (this->*m_read)();
if (read_status == EOF || read_status < 0) {
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("The peer GDS has performed an orderly shutdown on current connection."),
errhint("Please refer to GDS log for more details.")));
return EOF;
}
}
return 0;
}
int GDSStream::InternalRead()
{
int nread = 0;
PrepareReadBuf();
if (m_fd < 0)
ereport(ERROR, (errcode_for_socket_access(), errmsg("Bad socket.")));
retry:
PGSTAT_INIT_TIME_RECORD();
PGSTAT_START_TIME_RECORD();
nread = recv(m_fd, m_inBuf->data + m_inBuf->len, m_inBuf->maxlen - m_inBuf->len, 0);
END_NET_RECV_INFO(nread);
if (nread < 0) {
if (errno == EINTR)
goto retry;
#ifdef EAGAIN
if (errno == EAGAIN)
return 0;
#endif
#if defined(EWOULDBLOCK) && (!defined(EAGAIN) || (EWOULDBLOCK != EAGAIN))
if (errno == EWOULDBLOCK)
return 0;
#endif
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_RESET_BY_PEER),
errmsg("Unexpected EOF on GDS connection \"%s\": %m", m_uri->ToString())));
return -1;
} else if (nread == 0)
return EOF;
m_inBuf->len += nread;
Assert(m_inBuf->len <= m_inBuf->maxlen);
return 1;
}
* @Description: read input data from OPENSSL buffer
* @Return: 1, read data successfully;
* 0, no data read, try it again;
* <0, exception found;
* @See also:
*/
int GDSStream::InternalReadSSL(void)
{
int nread = 0;
int times = 0;
int maxreq = 0;
PrepareReadBuf();
maxreq = m_inBuf->maxlen - m_inBuf->len;
Assert(maxreq > 0);
nread = gs_openssl_cli_read(m_ssl, m_inBuf->data + m_inBuf->len, maxreq);
if (nread > 0) {
++times;
m_inBuf->len += nread;
Assert(m_inBuf->len <= m_inBuf->maxlen);
while (nread == maxreq) {
enlargeStringInfo(m_inBuf, GDS_STREAM_BUF_EXTEND_SIZE);
maxreq = m_inBuf->maxlen - m_inBuf->len;
Assert(maxreq > 0);
nread = gs_openssl_cli_read(m_ssl, m_inBuf->data + m_inBuf->len, maxreq);
if (nread > 0) {
++times;
m_inBuf->len += nread;
Assert(m_inBuf->len <= m_inBuf->maxlen);
}
}
}
Assert(nread < maxreq);
if (0 == times) {
if (nread == OPENSSL_CLI_EAGAIN) {
return 0;
}
if (nread == 0) {
ereport(LOG,
(errmodule(MOD_SSL),
errcode_for_socket_access(),
errmsg(
"read no data at the first time of poll() from GDS connection \"%s\": %m", m_uri->ToString())));
return EOF;
}
return nread;
}
Assert(times > 0);
if (nread >= 0) {
return 1;
}
if (nread == OPENSSL_CLI_EAGAIN) {
ereport(LOG,
(errmodule(MOD_SSL),
errmsg("read no data in ssl buffer from GDS connection \"%s\": %m", m_uri->ToString())));
return 1;
}
return nread;
}
* @Description: call this method before reading. it will check
* 1. vacuum its reading buffer by moving data ahead
* 2. extend input buffer if needed
* @See also:
*/
void GDSStream::PrepareReadBuf(void)
{
Assert(m_inBuf != NULL);
if (m_inBuf->cursor < m_inBuf->len) {
if (m_inBuf->cursor > 0) {
errno_t rc = 0;
rc = memmove_s(
m_inBuf->data, m_inBuf->maxlen, m_inBuf->data + m_inBuf->cursor, m_inBuf->len - m_inBuf->cursor);
securec_check(rc, "", "");
m_inBuf->len -= m_inBuf->cursor;
m_inBuf->cursor = 0;
}
} else {
m_inBuf->cursor = m_inBuf->len = 0;
}
if (m_inBuf->maxlen - m_inBuf->len < GDS_STREAM_BUF_EXTEND_SIZE) {
enlargeStringInfo(m_inBuf, GDS_STREAM_BUF_EXTEND_SIZE);
}
}
int GDSStream::Write(void* src, Size len)
{
Assert(m_uri);
char* bufptr = (char*)src;
int ret = 0;
if (!pg_set_block(m_fd)) {
ereport(WARNING, (errmsg("could not set socket to blocking mode: %m")));
}
while ((Size)(bufptr - (char*)src) < len) {
if (!m_ssl_enable) {
t_thrd.int_cxt.ImmediateInterruptOK = true;
ret = send(m_fd, bufptr, len - (bufptr - (char*)src), 0);
t_thrd.int_cxt.ImmediateInterruptOK = false;
if (ret < 0) {
if (errno == EINTR)
continue;
ereport(ERROR,
(errmodule(MOD_SSL),
errcode_for_socket_access(),
errmsg("Unexpected connection EOF from \"%s\":%m", m_uri->ToString())));
}
} else {
ret = gs_openssl_cli_write(m_ssl, bufptr, len - (bufptr - (char*)src));
if (ret < 0) {
ereport(ERROR,
(errmodule(MOD_SSL),
errcode_for_socket_access(),
errmsg("exception from GDS \"%s\":%m", m_uri->ToString())));
}
}
bufptr += ret;
}
if (!pg_set_noblock(m_fd)) {
ereport(WARNING, (errmsg("could not set socket to non-blocking mode: %m")));
}
return len;
}
void GDSStream::Flush()
{
if (m_outBuf->cursor < m_outBuf->len) {
Write(m_outBuf->data + m_outBuf->cursor, m_outBuf->len - m_outBuf->cursor);
resetStringInfo(m_outBuf);
}
}
#define HAS_DATA_BUFFERED(buf) \
((buf)->cursor + 4 < (buf)->len && \
(size_t)((buf)->cursor + 4) + (size_t)ntohl(*(uint32*)((buf)->data + (buf)->cursor + 1)) < (size_t)(buf)->len)
int GDSStream::ReadMessage(StringInfoData& dst)
{
int retval = 0;
resetStringInfo(&dst);
Assert(NULL != m_inBuf);
while (!HAS_DATA_BUFFERED(m_inBuf) && (retval = this->Read()) != EOF)
;
if (retval != EOF) {
int length = ntohl(*(uint32*)(m_inBuf->data + m_inBuf->cursor + 1));
if (length > m_inBuf->len - m_inBuf->cursor) {
ereport(ERROR, (errmodule(MOD_GDS), errcode(ERRCODE_LOG),
errmsg("Unexpected length of data coming supposedly from GDS. Could be an forged attack package.")));
}
if (length < 0 || (uint32)length > MaxAllocSize + 4) {
ereport(ERROR,
(errcode(ERRCODE_OPERATE_RESULT_NOT_EXPECTED),
errmsg("Message size exceeds the maximum allowed (%d)", (int)MaxAllocSize + 4)));
}
length += GDSCmdHeaderSize;
appendBinaryStringInfo(&dst, m_inBuf->data + m_inBuf->cursor, length);
m_inBuf->cursor += length;
} else {
ereport(ERROR, (errcode(ERRCODE_OPERATE_RESULT_NOT_EXPECTED), errmsg("Incomplete Message from GDS .")));
}
return dst.len;
}
void GDSStream::VerifyAddr()
{
Assert(m_uri);
if (!(m_uri->m_protocol != NULL &&
(strncmp(m_uri->m_protocol, GSFS_PREFIX, 4) == 0 || strncmp(m_uri->m_protocol, GSFSS_PREFIX, 5) == 0) &&
m_uri->m_host != NULL && m_uri->m_port >= 0)) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid URI \"%s\"", m_uri->ToString())));
}
}
static char* _WriteBegin(CmdBegin* cmd)
{
WRITE_JSON_START(CmdBegin, cmd);
WRITE_JSON_STRING(m_url);
WRITE_JSON_UINT64(m_id);
WRITE_JSON_INT(m_format);
WRITE_JSON_INT(m_nodeType);
WRITE_JSON_INT(m_escape);
WRITE_JSON_STRING(m_eol);
WRITE_JSON_INT(m_quote);
WRITE_JSON_BOOL(m_header);
WRITE_JSON_INT(m_nodeNum);
WRITE_JSON_STRING(m_nodeName);
WRITE_JSON_INT(m_fixSize);
WRITE_JSON_STRING(m_prefix);
WRITE_JSON_STRING(m_fileheader);
WRITE_JSON_END();
}
void SerializeCmd(CmdBase* cmd, StringInfo buf)
{
char cmdHeader[GDSCmdHeaderSize];
uint32* lenPtr = (uint32*)&cmdHeader[1];
char* body = NULL;
uint32 len = 0;
const int cmdtypeSize = 30;
char cmdtype[cmdtypeSize];
errno_t ret = 0;
Assert(CurrentMemoryContext);
cmdHeader[0] = cmd->m_type;
switch (cmd->m_type) {
case CMD_TYPE_BEGIN:
body = _WriteBegin((CmdBegin*)cmd);
if (body != NULL)
len = strlen(body);
else
ereport(ERROR, (errcode(ERRCODE_OPERATE_RESULT_NOT_EXPECTED), errmsg("Invalid command to serialize.")));
Assert(CurrentMemoryContext);
ret = strcpy_s(cmdtype, cmdtypeSize, "CMD_TYPE_BEGIN_v1");
securec_check(ret, "\0", "\0");
break;
case CMD_TYPE_REQ:
len = 0;
ret = strcpy_s(cmdtype, cmdtypeSize, "CMD_TYPE_REQ");
securec_check(ret, "\0", "\0");
break;
case CMD_TYPE_END:
len = 0;
ret = strcpy_s(cmdtype, cmdtypeSize, "CMD_TYPE_END");
securec_check(ret, "\0", "\0");
break;
case CMD_TYPE_REMOTELOG: {
CmdRemoteLog* rdata = (CmdRemoteLog*)cmd;
body = rdata->m_data;
len = rdata->m_datasize;
len += strlen(rdata->m_name);
len += sizeof(rdata->m_datasize);
*lenPtr = htonl(len);
appendBinaryStringInfo(buf, cmdHeader, GDSCmdHeaderSize);
pq_sendint32(buf, rdata->m_datasize);
appendBinaryStringInfo(buf, rdata->m_name, strlen(rdata->m_name));
appendBinaryStringInfo(buf, rdata->m_data, rdata->m_datasize);
Assert(CurrentMemoryContext);
ret = strcpy_s(cmdtype, cmdtypeSize, "CMD_TYPE_REMOTELOG");
securec_check(ret, "\0", "\0");
}
return;
default:
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid GDS command: %d", cmd->m_type)));
}
Assert(CurrentMemoryContext);
*lenPtr = htonl(len);
appendBinaryStringInfo(buf, cmdHeader, GDSCmdHeaderSize);
if (body != NULL)
appendBinaryStringInfo(buf, body, len);
Assert(CurrentMemoryContext);
cmdtype[cmdtypeSize - 1] = '\0';
#ifdef ENABLE_MULTIPLE_NODES
if (u_sess->attr.attr_storage.gds_debug_mod)
ereport(LOG, (errmodule(MOD_GDS), (errmsg("Sending a %s to GDS", cmdtype))));
#endif
}
void PackData(StringInfo dst, StringInfo data)
{
char cmdHeader[GDSCmdHeaderSize];
uint32* lenPtr = (uint32*)&cmdHeader[1];
cmdHeader[0] = CMD_TYPE_DATA;
*lenPtr = htonl(data->len);
appendBinaryStringInfo(dst, cmdHeader, GDSCmdHeaderSize);
appendBinaryStringInfo(dst, data->data, data->len);
}
static int _ReadError(CmdError* cmd, const char* msg)
{
READ_JSON_START(CmdError, cmd, msg);
READ_JSON_INT(m_level);
READ_JSON_STRING(m_detail);
READ_JSON_END();
}
static int _ReadFileSwitch(CmdFileSwitch* cmd, const char* msg)
{
READ_JSON_START(CmdFileSwitch, cmd, msg);
READ_JSON_STRING(m_fileName);
READ_JSON_END();
}
static int _ReadResponse(CmdResponse* cmd, const char* msg)
{
READ_JSON_START(CmdResponse, cmd, msg);
READ_JSON_INT(m_result);
READ_JSON_STRING(m_reason);
READ_JSON_END();
}
static int _ReadResult(CmdQueryResult* cmd, const char* msg)
{
READ_JSON_START(CmdQueryResult, cmd, msg);
READ_JSON_INT(m_result);
READ_JSON_STRING(m_version_num);
READ_JSON_END();
}
CmdBase* DeserializeCmd(StringInfo buf)
{
char type = *(buf->data + buf->cursor++);
uint32 length = *(uint32*)(buf->data + buf->cursor);
CmdBase* cmd = NULL;
char* strcmd = NULL;
buf->cursor += sizeof(uint32);
length = ntohl(length);
if (length > (uint32)(buf->len - buf->cursor)) {
ereport(ERROR, (errmodule(MOD_GDS), errcode(ERRCODE_LOG),
errmsg("Unexpected length of data coming supposedly from GDS. Could be an forged attack package.")));
}
const int cmdtypeSize = 30;
char cmdtype[cmdtypeSize];
errno_t ret = 0;
int errorCheck = 0;
switch (type) {
case CMD_TYPE_ERROR:
ret = strcpy_s(cmdtype, cmdtypeSize, "CMD_TYPE_ERROR");
securec_check(ret, "\0", "\0");
strcmd = pnstrdup(buf->data + buf->cursor, length);
cmd = (CmdError*)palloc(sizeof(CmdError));
errorCheck = _ReadError((CmdError*)cmd, strcmd);
if (unlikely(errorCheck) != 0) {
ereport(ERROR, (errmodule(MOD_GDS), errcode(ERRCODE_LOG),
errmsg("Failed to _ReadError in CMD_TYPE_ERROR, could be a forged package.")));
}
pfree_ext(strcmd);
cmd->m_type = CMD_TYPE_ERROR;
break;
case CMD_TYPE_FILE_SWITCH:
ret = strcpy_s(cmdtype, cmdtypeSize, "CMD_TYPE_FILE_SWITCH");
securec_check(ret, "\0", "\0");
strcmd = pnstrdup(buf->data + buf->cursor, length);
cmd = (CmdFileSwitch*)palloc(sizeof(CmdFileSwitch));
errorCheck = _ReadFileSwitch((CmdFileSwitch*)cmd, strcmd);
if (unlikely(errorCheck) != 0) {
ereport(ERROR, (errmodule(MOD_GDS), errcode(ERRCODE_LOG),
errmsg("Failed to _ReadFileSwitch in CMD_TYPE_FILE_SWITCH, could be a forged package.")));
}
pfree_ext(strcmd);
cmd->m_type = CMD_TYPE_FILE_SWITCH;
break;
case CMD_TYPE_RESPONSE:
ret = strcpy_s(cmdtype, cmdtypeSize, "CMD_TYPE_RESPONSE");
securec_check(ret, "\0", "\0");
strcmd = pnstrdup(buf->data + buf->cursor, length);
cmd = (CmdResponse*)palloc(sizeof(CmdResponse));
errorCheck = _ReadResponse((CmdResponse*)cmd, strcmd);
if (unlikely(errorCheck) != 0) {
ereport(ERROR, (errmodule(MOD_GDS), errcode(ERRCODE_LOG),
errmsg("Failed to _ReadResponse in CMD_TYPE_RESPONSE, could be a forged package.")));
}
pfree_ext(strcmd);
cmd->m_type = CMD_TYPE_RESPONSE;
break;
case CMD_TYPE_END:
ret = strcpy_s(cmdtype, cmdtypeSize, "CMD_TYPE_END");
securec_check(ret, "\0", "\0");
cmd = (CmdBase*)palloc(sizeof(CmdBase));
cmd->m_type = CMD_TYPE_END;
break;
case CMD_TYPE_DATA:
case CMD_TYPE_DATA_SEG:
* this branch needn't call pnstrdup() to copy this message data,
* because we don't care message body. we think the two message types
* are the most, and it's worthy of this optimization.
*/
ret = strcpy_s(cmdtype, cmdtypeSize, "CMD_TYPE_DATA");
securec_check(ret, "\0", "\0");
cmd = (CmdData*)palloc(sizeof(CmdData));
((CmdData*)cmd)->m_data = buf->data + buf->cursor;
((CmdData*)cmd)->m_len = length;
cmd->m_type = type;
break;
case CMD_TYPE_QUERY_RESULT:
ereport(ERROR,
(errmodule(MOD_GDS),
errcode(ERRCODE_LOG),
errmsg("The corresponding GDS is of an older version. Please upgrade GDS to match the server "
"version.")));
break;
case CMD_TYPE_QUERY_RESULT_V1:
ret = strcpy_s(cmdtype, cmdtypeSize, "CMD_TYPE_QUERY_RESULT");
securec_check(ret, "\0", "\0");
strcmd = pnstrdup(buf->data + buf->cursor, length);
cmd = (CmdQueryResult*)palloc(sizeof(CmdQueryResult));
errorCheck = _ReadResult((CmdQueryResult*)cmd, strcmd);
if (unlikely(errorCheck) != 0) {
ereport(ERROR, (errmodule(MOD_GDS), errcode(ERRCODE_LOG),
errmsg("Failed to _ReadResult in CMD_TYPE_QUERY_RESULT_V1, could be a forged package.")));
}
pfree_ext(strcmd);
cmd->m_type = CMD_TYPE_QUERY_RESULT_V1;
break;
default: {
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Failed to deserialize command, which type is %d", type)));
} break;
}
buf->cursor += length;
cmdtype[cmdtypeSize - 1] = '\0';
#ifdef ENABLE_MULTIPLE_NODES
if (u_sess->attr.attr_storage.gds_debug_mod)
ereport(LOG, (errmodule(MOD_GDS), (errmsg("Receiving a %s from GDS", cmdtype))));
#endif
return cmd;
}