* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* CM is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* fe-protocol.cpp
*
*
* IDENTIFICATION
* src/cm_communication/cm_feconnect/fe-protocol.cpp
*
* -------------------------------------------------------------------------
*/
#include "cm/cm_c.h"
#include <fcntl.h>
#include "cm/libpq-fe.h"
#include "cm/libpq-int.h"
#include <unistd.h>
#include <netinet/in.h>
* This macro lists the backend message types that could be "long" (more
* than a couple of kilobytes).
*/
#define VALID_LONG_MESSAGE_TYPE(id) ((id) == 'S' || (id) == 'E')
static void handleSyncLoss(CM_Conn *conn, char id, int msgLength);
static CM_Result *pqParseInput(CM_Conn *conn);
static int cmpqParseSuccess(CM_Conn *conn, CM_Result *result);
* parseInput: if appropriate, parse input data from backend
* until input is exhausted or a stopping state is reached.
* Note that this function will NOT attempt to read more data from the backend.
*/
static CM_Result *pqParseInput(CM_Conn *conn)
{
char id;
int msgLength;
int avail;
if (conn->result == NULL) {
errno_t rc;
conn->result = (CM_Result*)malloc(sizeof(CM_Result));
if (conn->result == NULL) {
return NULL;
}
rc = memset_s(conn->result, sizeof(CM_Result), 0, sizeof(CM_Result));
securec_check_c(rc, (char*)conn->result, "");
}
CM_Result *result = conn->result;
* Try to read a message. First get the type code and length. Return
* if not enough data.
*/
conn->inCursor = conn->inStart;
if (cmpqGetc(&id, conn)) {
return NULL;
}
if (cmpqGetInt(&msgLength, 4, conn)) {
return NULL;
}
* Try to validate message type/length here. A length less than 4 is
* definitely broken. Large lengths should only be believed for a few
* message types.
*/
if (msgLength < 4) {
handleSyncLoss(conn, id, msgLength);
return NULL;
}
if (msgLength > 30000 && !VALID_LONG_MESSAGE_TYPE(id)) {
handleSyncLoss(conn, id, msgLength);
return NULL;
}
* Can't process if message body isn't all here yet.
*/
conn->result->gr_msglen = msgLength -= 4;
avail = conn->inEnd - conn->inCursor;
if (avail < msgLength) {
* Before returning, enlarge the input buffer if needed to hold
* the whole message. This is better than leaving it to
* cmpqReadData because we can avoid multiple cycles of realloc()
* when the message is large; also, we can implement a reasonable
* recovery strategy if we are unable to make the buffer big
* enough.
*/
if (cmpqCheckInBufferSpace((size_t)(conn->inCursor + msgLength), conn)) {
* XXX add some better recovery code... plan is to skip over
* the message using its length, then report an error. For the
* moment, just treat this like loss of sync (which indeed it
* might be!)
*/
handleSyncLoss(conn, id, msgLength);
}
return NULL;
}
switch (id) {
case 'S':
if (cmpqParseSuccess(conn, result)) {
return NULL;
}
break;
case 'E':
if (cmpqGetError(conn, result)) {
return NULL;
}
result->gr_status = CM_RESULT_ERROR;
break;
default:
printfCMPQExpBuffer(
&conn->errorMessage, "unexpected response from server; first received character was \"%c\"\n", id);
conn->inCursor += msgLength;
break;
}
if (conn->inCursor == conn->inStart + 5 + msgLength) {
conn->inStart = conn->inCursor;
} else {
printfCMPQExpBuffer(
&conn->errorMessage, "message contents do not agree with length in message type \"%c\"\n", id);
conn->inStart += 5 + msgLength;
}
return result;
}
* handleSyncLoss: clean up after loss of message-boundary sync
*
* There isn't really a lot we can do here except abandon the connection.
*/
static void handleSyncLoss(CM_Conn *conn, char id, int msgLength)
{
printfCMPQExpBuffer(
&conn->errorMessage, "lost synchronization with server: got message type \"%c\", length %d\n", id, msgLength);
(void)close(conn->sock);
conn->sock = -1;
conn->status = CONNECTION_BAD;
}
* Attempt to read an Error or Notice response message.
* This is possible in several places, so we break it out as a subroutine.
* Entry: 'E' message type and length have already been consumed.
* Exit: returns 0 if successfully consumed message.
* returns EOF if not enough data.
*/
int cmpqGetError(CM_Conn *conn, CM_Result *result)
{
char id;
* Read the fields and save into res.
*/
for (;;) {
if (cmpqGetc(&id, conn)) {
goto fail;
}
if (id == '\0') {
break;
}
if (cmpqGets(&conn->errorMessage, conn)) {
goto fail;
}
}
return 0;
fail:
return EOF;
}
* CMPQgetResult
* Get the next CM_Result produced. Returns NULL if no
* query work remains or an error has occurred (e.g. out of
* memory).
*/
CM_Result *cmpqGetResult(CM_Conn *conn)
{
if (conn == NULL) {
return NULL;
}
CM_Result *res;
while ((res = pqParseInput(conn)) == NULL) {
int flushResult;
* If data remains unsent, send it. Else we might be waiting for the
* result of a command the backend hasn't even got yet.
*/
while ((flushResult = cmpqFlush(conn)) > 0) {
if (cmpqWait(0, 1, conn) != 0) {
flushResult = -1;
break;
}
}
if (flushResult ||
cmpqReadData(conn) <= 0) {
* conn->errorMessage has been set by cmpqWait or cmpqReadData.
*/
return NULL;
}
}
return res;
}
* return 0 if parsing command is totally completed.
* return 1 if it needs to be read continuously.
*/
static int cmpqParseSuccess(CM_Conn *conn, CM_Result *result)
{
errno_t rc;
result->gr_status = CM_RESULT_OK;
rc = memcpy_s(&(result->gr_resdata), CM_MSG_MAX_LENGTH, conn->inBuffer + conn->inCursor, result->gr_msglen);
securec_check_c(rc, "", "");
return (result->gr_status);
}
void cmpqResetResultData(CM_Result *result)
{
if (result != NULL) {
result->gr_msglen = 0;
result->gr_status = 0;
result->gr_type = 0;
}
}