* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "postgres.h"
#include "access/sysattr.h"
#include "access/htup.h"
#include "access/xact.h"
#include "storage/buf/bufmgr.h"
#include "executor/tuptable.h"
#include "nodes/execnodes.h"
#include "nodes/ag_extensible.h"
#include "nodes/nodes.h"
#include "nodes/plannodes.h"
#include "parser/parsetree.h"
#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
#include "utils/rel.h"
#include "executor/executor.h"
#include "catalog/ag_label.h"
#include "commands/label_commands.h"
#include "executor/cypher_executor.h"
#include "executor/cypher_utils.h"
#include "parser/cypher_parse_node.h"
#include "nodes/cypher_nodes.h"
#include "utils/agtype.h"
#include "utils/graphid.h"
static void begin_cypher_set(ExtensiblePlanState *node, EState *estate,
int eflags);
static TupleTableSlot *exec_cypher_set(ExtensiblePlanState *node);
static void end_cypher_set(ExtensiblePlanState *node);
static void rescan_cypher_set(ExtensiblePlanState *node);
static void process_update_list(ExtensiblePlanState *node);
static HeapTuple update_entity_tuple(ResultRelInfo *resultRelInfo,
TupleTableSlot *elemTupleSlot,
EState *estate, HeapTuple old_tuple);
const ExtensibleExecMethods cypher_set_exec_methods = {SET_SCAN_STATE_NAME,
begin_cypher_set,
exec_cypher_set,
end_cypher_set,
rescan_cypher_set,
NULL};
static void begin_cypher_set(ExtensiblePlanState *node, EState *estate,
int eflags)
{
cypher_set_custom_scan_state *css =
(cypher_set_custom_scan_state *)node;
Plan *subplan;
Assert(list_length(css->cs->extensible_plans) == 1);
subplan = (Plan*) linitial(css->cs->extensible_plans);
node->ss.ps.lefttree = ExecInitNode(subplan, estate, eflags);
ExecAssignExprContext(estate, &node->ss.ps);
TupleDesc tupledesc = ExecGetResultType(node->ss.ps.lefttree);
ExecInitScanTupleSlot(estate, &node->ss);
ExecAssignScanType(&node->ss, tupledesc);
if (!CYPHER_CLAUSE_IS_TERMINAL(css->flags))
{
TupleDesc tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
ExecAssignProjectionInfo(&node->ss.ps, tupdesc);
}
* Postgres does not assign the es_output_cid in queries that do
* not write to disk, ie: SELECT commands. We need the command id
* for our clauses, and we may need to initialize it. We cannot use
* GetCurrentCommandId because there may be other cypher clauses
* that have modified the command id.
*/
if (estate->es_output_cid == 0)
estate->es_output_cid = estate->es_snapshot->curcid;
Increment_Estate_CommandId(estate);
}
static HeapTuple update_entity_tuple(ResultRelInfo *resultRelInfo,
TupleTableSlot *elemTupleSlot,
EState *estate, HeapTuple old_tuple)
{
HeapTuple tuple = NULL;
LockTupleMode lockmode;
TM_FailureData hufd;
TM_Result lock_result;
TM_Result update_result;
Buffer buffer;
bool errFlag = false;
ResultRelInfo *saved_resultRelInfo = estate->es_result_relation_info;
estate->es_result_relation_info = resultRelInfo;
lockmode = LockTupleExclusive;
PG_TRY();
{
lock_result = heap_lock_tuple(resultRelInfo->ri_RelationDesc, old_tuple,
&buffer, GetCurrentCommandId(false), lockmode,
LockWaitBlock, false, &hufd);
}
PG_CATCH();
{
lock_result = TM_Invisible;
errFlag = true;
t_thrd.log_cxt.errordata_stack_depth = -1;
}
PG_END_TRY();
if (lock_result == TM_Ok)
{
ExecStoreVirtualTuple(elemTupleSlot);
tuple = ExecMaterializeSlot(elemTupleSlot);
tuple->t_self = old_tuple->t_self;
tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
if (resultRelInfo->ri_RelationDesc->rd_att->constr != NULL)
ExecConstraints(resultRelInfo, elemTupleSlot, estate);
update_result = heap_update(resultRelInfo->ri_RelationDesc, NULL,
&(tuple->t_self), tuple,
GetCurrentCommandId(true),
estate->es_crosscheck_snapshot, true, &hufd,
&lockmode);
if (update_result != TM_Ok)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Entity failed to be updated: %i",
update_result)));
if (resultRelInfo->ri_NumIndices > 0)
ExecInsertIndexTuples(elemTupleSlot, &(tuple->t_self), estate,
NULL, NULL, InvalidBktId, NULL, NULL);
}
if (!errFlag)
{
ReleaseBuffer(buffer);
}
estate->es_result_relation_info = saved_resultRelInfo;
return tuple;
}
* When the CREATE clause is the last cypher clause, consume all input from the
* previous clause(s) in the first call of exec_cypher_create.
*/
static void process_all_tuples(ExtensiblePlanState *node)
{
cypher_set_custom_scan_state *css = (cypher_set_custom_scan_state *)node;
TupleTableSlot *slot;
EState *estate = css->css.ss.ps.state;
do
{
process_update_list(node);
Decrement_Estate_CommandId(estate)
slot = ExecProcNode(node->ss.ps.lefttree);
Increment_Estate_CommandId(estate)
} while (!TupIsNull(slot));
}
static bool check_path(agtype_value *path, graphid updated_id)
{
int i;
for (i = 0; i < path->val.array.num_elems; i++)
{
agtype_value *elem = &path->val.array.elems[i];
agtype_value *id = GET_AGTYPE_VALUE_OBJECT_VALUE(elem, "id");
if (updated_id == id->val.int_value)
return true;
}
return false;
}
static agtype_value *replace_entity_in_path(agtype_value *path,
graphid updated_id,
agtype *updated_entity)
{
agtype_iterator *it;
agtype_iterator_token tok = WAGT_DONE;
agtype_parse_state *parse_state = NULL;
agtype_value *r;
agtype_value *parsed_agtype_value = NULL;
agtype *prop_agtype;
int i;
r = (agtype_value*)palloc(sizeof(agtype_value));
prop_agtype = agtype_value_to_agtype(path);
it = agtype_iterator_init(&prop_agtype->root);
tok = agtype_iterator_next(&it, r, true);
parsed_agtype_value = push_agtype_value(&parse_state, tok,
tok < WAGT_BEGIN_ARRAY ? r : NULL);
for (i = 0; i < path->val.array.num_elems; i++)
{
agtype_value *id, *elem;
elem = &path->val.array.elems[i];
if (elem->type != AGTV_VERTEX && elem->type != AGTV_EDGE)
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported agtype found in a path")));
id = GET_AGTYPE_VALUE_OBJECT_VALUE(elem, "id");
if (updated_id == id->val.int_value)
parsed_agtype_value = push_agtype_value(&parse_state, WAGT_ELEM,
get_ith_agtype_value_from_container(&updated_entity->root, 0));
else
parsed_agtype_value = push_agtype_value(&parse_state, WAGT_ELEM,
elem);
}
parsed_agtype_value = push_agtype_value(&parse_state, WAGT_END_ARRAY, NULL);
parsed_agtype_value->type = AGTV_PATH;
return parsed_agtype_value;
}
static void update_all_paths(ExtensiblePlanState *node, graphid id,
agtype *updated_entity)
{
cypher_set_custom_scan_state *css = (cypher_set_custom_scan_state *)node;
ExprContext *econtext = css->css.ss.ps.ps_ExprContext;
TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple;
int i;
for (i = 0; i < scanTupleSlot->tts_tupleDescriptor->natts; i++) {
agtype *original_entity;
agtype_value *original_entity_value;
if (scanTupleSlot->tts_tupleDescriptor->attrs[i].atttypid != AGTYPEOID)
continue;
if (scanTupleSlot->tts_isnull[i])
continue;
original_entity = DATUM_GET_AGTYPE_P(scanTupleSlot->tts_values[i]);
if (!AGTYPE_CONTAINER_IS_ARRAY(&original_entity->root))
{
continue;
}
original_entity_value = get_ith_agtype_value_from_container(&original_entity->root, 0);
if (original_entity_value->type == AGTV_PATH)
{
if (check_path(original_entity_value, id))
{
agtype_value *new_path = replace_entity_in_path(original_entity_value, id, updated_entity);
scanTupleSlot->tts_values[i] = AGTYPE_P_GET_DATUM(agtype_value_to_agtype(new_path));
}
}
}
}
static void process_update_list(ExtensiblePlanState *node)
{
cypher_set_custom_scan_state *css = (cypher_set_custom_scan_state *)node;
ExprContext *econtext = css->css.ss.ps.ps_ExprContext;
TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple;
ListCell *lc;
EState *estate = css->css.ss.ps.state;
int *luindex = NULL;
int lidx = 0;
luindex = (int*)palloc0(sizeof(int) * scanTupleSlot->tts_nvalid);
* Iterate through the SET items list and store the loop index of each
* 'entity' update. As there is only one entry for each entity, this will
* have the effect of overwriting the previous loop index stored - if this
* 'entity' is used more than once. This will create an array of the last
* loop index for the update of that particular 'entity'. This will allow us
* to correctly update an 'entity' after all other previous updates to that
* 'entity' have been done.
*/
foreach (lc, css->set_list->set_items)
{
cypher_update_item *update_item = NULL;
update_item = (cypher_update_item *)lfirst(lc);
luindex[update_item->entity_position - 1] = lidx;
lidx++;
}
lidx = 0;
foreach (lc, css->set_list->set_items)
{
agtype_value *altered_properties;
agtype_value *original_entity_value;
agtype_value *original_properties;
agtype_value *id;
agtype_value *label;
agtype *original_entity;
agtype *new_property_value;
TupleTableSlot *slot;
ResultRelInfo *resultRelInfo;
ScanKeyData scan_keys[1];
TableScanDesc scan_desc;
bool remove_property;
char *label_name;
cypher_update_item *update_item;
Datum new_entity;
HeapTuple heap_tuple;
char *clause_name = css->set_list->clause_name;
update_item = (cypher_update_item *)lfirst(lc);
* If the entity is null, we can skip this update. this will be
* possible when the OPTIONAL MATCH clause is implemented.
*/
if (scanTupleSlot->tts_isnull[update_item->entity_position - 1])
continue;
if (scanTupleSlot->tts_tupleDescriptor->attrs[update_item->entity_position -1].atttypid != AGTYPEOID)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("age %s clause can only update agtype",
clause_name)));
original_entity = DATUM_GET_AGTYPE_P(scanTupleSlot->tts_values[update_item->entity_position - 1]);
original_entity_value = get_ith_agtype_value_from_container(&original_entity->root, 0);
if (original_entity_value->type != AGTV_VERTEX &&
original_entity_value->type != AGTV_EDGE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("age %s clause can only update vertex and edges",
clause_name)));
id = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value, "id");
label = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value, "label");
label_name = pnstrdup(label->val.string.val, label->val.string.len);
original_properties = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value,
"properties");
* Determine if the property should be removed. This will be because
* this is a REMOVE clause or the variable references a variable that is
* NULL. It will be possible for a variable to be NULL when OPTIONAL
* MATCH is implemented.
*/
if(update_item->remove_item)
remove_property = true;
else
remove_property = scanTupleSlot->tts_isnull[update_item->prop_position - 1];
* If we need to remove the property, set the value to NULL. Otherwise
* fetch the evaluated expression from the tuble slot.
*/
if (remove_property)
new_property_value = NULL;
else
new_property_value = DATUM_GET_AGTYPE_P(scanTupleSlot->tts_values[update_item->prop_position - 1]);
* Alter the properties Agtype value to contain or remove the updated
* property.
*/
altered_properties = alter_property_value(original_properties,
update_item->prop_name,
new_property_value,
remove_property);
resultRelInfo = create_entity_result_rel_info(estate,
css->set_list->graph_name,
label_name);
slot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(slot,
RelationGetDescr(resultRelInfo->ri_RelationDesc));
* Now that we have the updated properties, create a either a vertex or
* edge Datum for the in-memory update, and setup the tupleTableSlot
* for the on-disc update.
*/
if (original_entity_value->type == AGTV_VERTEX)
{
new_entity = make_vertex(GRAPHID_GET_DATUM(id->val.int_value),
CStringGetDatum(label_name),
AGTYPE_P_GET_DATUM(agtype_value_to_agtype(altered_properties)));
slot = populate_vertex_tts(slot, id, altered_properties);
} else if (original_entity_value->type == AGTV_EDGE) {
agtype_value *startid = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value, "start_id");
agtype_value *endid = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value, "end_id");
new_entity = make_edge(GRAPHID_GET_DATUM(id->val.int_value),
GRAPHID_GET_DATUM(startid->val.int_value),
GRAPHID_GET_DATUM(endid->val.int_value),
CStringGetDatum(label_name),
AGTYPE_P_GET_DATUM(agtype_value_to_agtype(altered_properties)));
slot = populate_edge_tts(slot, id, startid, endid,
altered_properties);
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("age %s clause can only update vertex and edges",
clause_name)));
}
scanTupleSlot->tts_values[update_item->entity_position - 1] = new_entity;
* If the tuple table slot has paths, we need to inspect them to see if
* the updated entity is contained within them and replace the entity
* if it is.
*/
update_all_paths(node,
id->val.int_value, DATUM_GET_AGTYPE_P(new_entity));
* If the last update index for the entity is equal to the current loop
* index, then update this tuple.
*/
if (luindex[update_item->entity_position - 1] == lidx)
{
* Setup the scan key to require the id field on-disc to match the
* entity's graphid.
*/
ScanKeyInit(&scan_keys[0], 1, BTEqualStrategyNumber, F_GRAPHIDEQ,
GRAPHID_GET_DATUM(id->val.int_value));
* Setup the scan description, with the correct snapshot and scan
* keys.
*/
scan_desc = heap_beginscan(resultRelInfo->ri_RelationDesc,
estate->es_snapshot, 1, scan_keys);
heap_tuple = heap_getnext(scan_desc, ForwardScanDirection);
* If the heap tuple still exists (It wasn't deleted between the
* match and this SET/REMOVE) update the heap_tuple.
*/
if(HeapTupleIsValid(heap_tuple))
{
heap_tuple = update_entity_tuple(resultRelInfo, slot, estate,
heap_tuple);
}
heap_endscan(scan_desc);
}
ExecCloseIndices(resultRelInfo);
heap_close(resultRelInfo->ri_RelationDesc, RowExclusiveLock);
lidx++;
}
pfree(luindex);
}
static TupleTableSlot *exec_cypher_set(ExtensiblePlanState *node)
{
cypher_set_custom_scan_state *css = (cypher_set_custom_scan_state *)node;
ResultRelInfo *saved_resultRelInfo;
EState *estate = css->css.ss.ps.state;
ExprContext *econtext = css->css.ss.ps.ps_ExprContext;
TupleTableSlot *slot;
saved_resultRelInfo = estate->es_result_relation_info;
Decrement_Estate_CommandId(estate);
slot = ExecProcNode(node->ss.ps.lefttree);
Increment_Estate_CommandId(estate);
if (TupIsNull(slot))
return NULL;
econtext->ecxt_scantuple =slot;
if (CYPHER_CLAUSE_IS_TERMINAL(css->flags))
{
estate->es_result_relation_info = saved_resultRelInfo;
process_all_tuples(node);
CommandCounterIncrement();
return NULL;
}
process_update_list(node);
CommandCounterIncrement();
estate->es_result_relation_info = saved_resultRelInfo;
econtext->ecxt_scantuple = ExecProject(node->ss.ps.ps_ProjInfo, NULL);
return econtext->ecxt_scantuple;
}
static void end_cypher_set(ExtensiblePlanState *node)
{
ExecEndNode(node->ss.ps.lefttree);
}
static void rescan_cypher_set(ExtensiblePlanState *node)
{
cypher_set_custom_scan_state *css = (cypher_set_custom_scan_state *)node;
char *clause_name = css->set_list->clause_name;
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cypher %s clause cannot be rescaned",
clause_name),
errhint("its unsafe to use joins in a query with a Cypher %s clause", clause_name)));
}
Node *create_cypher_set_plan_state(ExtensiblePlan *cscan)
{
cypher_set_custom_scan_state *cypher_css = (cypher_set_custom_scan_state*)palloc0(
sizeof(cypher_set_custom_scan_state));
cypher_update_information *set_list;
char *serialized_data;
Const *c;
cypher_css->cs = cscan;
c = (Const*)linitial(cscan->extensible_private);
serialized_data = (char *)c->constvalue;
set_list = (cypher_update_information*)stringToAGNode(serialized_data);
Assert(is_ag_node(set_list, cypher_update_information));
cypher_css->set_list = set_list;
cypher_css->flags = set_list->flags;
cypher_css->css.ss.ps.type = T_ExtensiblePlanState;
cypher_css->css.methods = &cypher_set_exec_methods;
return (Node *)cypher_css;
}