Commit 4b2d4403 authored by Simon Riggs's avatar Simon Riggs

MERGE post-commit review

Review comments from Andres Freund

* Consolidate code into AfterTriggerGetTransitionTable()
* Rename nodeMerge.c to execMerge.c
* Rename nodeMerge.h to execMerge.h
* Move MERGE handling in ExecInitModifyTable()
  into a execMerge.c ExecInitMerge()
* Move mt_merge_subcommands flags into execMerge.h
* Rename opt_and_condition to opt_merge_when_and_condition
* Wordsmith various comments

Author: Pavan Deolasee
Reviewer: Simon Riggs
parent 1fd86906
......@@ -96,6 +96,12 @@ static HeapTuple ExecCallTriggerFunc(TriggerData *trigdata,
FmgrInfo *finfo,
Instrumentation *instr,
MemoryContext per_tuple_context);
static Tuplestorestate *AfterTriggerGetTransitionTable(int event,
HeapTuple oldtup,
HeapTuple newtup,
TransitionCaptureState *transition_capture);
static void TransitionTableAddTuple(HeapTuple heaptup, Tuplestorestate *tuplestore,
TupleConversionMap *map);
static void AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
int event, bool row_trigger,
HeapTuple oldtup, HeapTuple newtup,
......@@ -3846,6 +3852,14 @@ struct AfterTriggersTableData
bool before_trig_done; /* did we already queue BS triggers? */
bool after_trig_done; /* did we already queue AS triggers? */
AfterTriggerEventList after_trig_events; /* if so, saved list pointer */
/*
* We maintain separate transaction tables for UPDATE/INSERT/DELETE since
* MERGE can run all three actions in a single statement. Note that UPDATE
* needs both old and new transition tables whereas INSERT needs only new
* and DELETE needs only old.
*/
/* "old" transition table for UPDATE, if any */
Tuplestorestate *old_upd_tuplestore;
/* "new" transition table for UPDATE, if any */
......@@ -5716,6 +5730,84 @@ AfterTriggerPendingOnRel(Oid relid)
return false;
}
/*
* Get the transition table for the given event and depending on whether we are
* processing the old or the new tuple.
*/
static Tuplestorestate *
AfterTriggerGetTransitionTable(int event,
HeapTuple oldtup,
HeapTuple newtup,
TransitionCaptureState *transition_capture)
{
Tuplestorestate *tuplestore = NULL;
bool delete_old_table = transition_capture->tcs_delete_old_table;
bool update_old_table = transition_capture->tcs_update_old_table;
bool update_new_table = transition_capture->tcs_update_new_table;
bool insert_new_table = transition_capture->tcs_insert_new_table;;
/*
* For INSERT events newtup should be non-NULL, for DELETE events
* oldtup should be non-NULL, whereas for UPDATE events normally both
* oldtup and newtup are non-NULL. But for UPDATE events fired for
* capturing transition tuples during UPDATE partition-key row
* movement, oldtup is NULL when the event is for a row being inserted,
* whereas newtup is NULL when the event is for a row being deleted.
*/
Assert(!(event == TRIGGER_EVENT_DELETE && delete_old_table &&
oldtup == NULL));
Assert(!(event == TRIGGER_EVENT_INSERT && insert_new_table &&
newtup == NULL));
/*
* We're called either for the newtup or the oldtup, but not both at the
* same time.
*/
Assert((oldtup != NULL) ^ (newtup != NULL));
if (oldtup != NULL)
{
if (event == TRIGGER_EVENT_DELETE && delete_old_table)
tuplestore = transition_capture->tcs_private->old_del_tuplestore;
else if (event == TRIGGER_EVENT_UPDATE && update_old_table)
tuplestore = transition_capture->tcs_private->old_upd_tuplestore;
}
if (newtup != NULL)
{
if (event == TRIGGER_EVENT_INSERT && insert_new_table)
tuplestore = transition_capture->tcs_private->new_ins_tuplestore;
else if (event == TRIGGER_EVENT_UPDATE && update_new_table)
tuplestore = transition_capture->tcs_private->new_upd_tuplestore;
}
return tuplestore;
}
/*
* Add the given heap tuple to the given tuplestore, applying the conversion
* map if necessary.
*/
static void
TransitionTableAddTuple(HeapTuple heaptup, Tuplestorestate *tuplestore,
TupleConversionMap *map)
{
/*
* Nothing needs to be done if we don't have a tuplestore.
*/
if (tuplestore == NULL)
return;
if (map != NULL)
{
HeapTuple converted = do_convert_tuple(heaptup, map);
tuplestore_puttuple(tuplestore, converted);
pfree(converted);
}
else
tuplestore_puttuple(tuplestore, heaptup);
}
/* ----------
* AfterTriggerSaveEvent()
......@@ -5777,95 +5869,37 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
{
HeapTuple original_insert_tuple = transition_capture->tcs_original_insert_tuple;
TupleConversionMap *map = transition_capture->tcs_map;
bool delete_old_table = transition_capture->tcs_delete_old_table;
bool update_old_table = transition_capture->tcs_update_old_table;
bool update_new_table = transition_capture->tcs_update_new_table;
bool insert_new_table = transition_capture->tcs_insert_new_table;;
/*
* For INSERT events newtup should be non-NULL, for DELETE events
* oldtup should be non-NULL, whereas for UPDATE events normally both
* oldtup and newtup are non-NULL. But for UPDATE events fired for
* capturing transition tuples during UPDATE partition-key row
* movement, oldtup is NULL when the event is for a row being inserted,
* whereas newtup is NULL when the event is for a row being deleted.
* Capture the old tuple in the appropriate transition table based on
* the event.
*/
Assert(!(event == TRIGGER_EVENT_DELETE && delete_old_table &&
oldtup == NULL));
Assert(!(event == TRIGGER_EVENT_INSERT && insert_new_table &&
newtup == NULL));
if (oldtup != NULL &&
(event == TRIGGER_EVENT_DELETE && delete_old_table))
{
Tuplestorestate *old_tuplestore;
old_tuplestore = transition_capture->tcs_private->old_del_tuplestore;
if (map != NULL)
{
HeapTuple converted = do_convert_tuple(oldtup, map);
tuplestore_puttuple(old_tuplestore, converted);
pfree(converted);
}
else
tuplestore_puttuple(old_tuplestore, oldtup);
}
if (oldtup != NULL &&
(event == TRIGGER_EVENT_UPDATE && update_old_table))
{
Tuplestorestate *old_tuplestore;
old_tuplestore = transition_capture->tcs_private->old_upd_tuplestore;
if (map != NULL)
if (oldtup != NULL)
{
HeapTuple converted = do_convert_tuple(oldtup, map);
tuplestore_puttuple(old_tuplestore, converted);
pfree(converted);
}
else
tuplestore_puttuple(old_tuplestore, oldtup);
Tuplestorestate *tuplestore =
AfterTriggerGetTransitionTable(event,
oldtup,
NULL,
transition_capture);
TransitionTableAddTuple(oldtup, tuplestore, map);
}
if (newtup != NULL &&
(event == TRIGGER_EVENT_INSERT && insert_new_table))
{
Tuplestorestate *new_tuplestore;
new_tuplestore = transition_capture->tcs_private->new_ins_tuplestore;
if (original_insert_tuple != NULL)
tuplestore_puttuple(new_tuplestore, original_insert_tuple);
else if (map != NULL)
{
HeapTuple converted = do_convert_tuple(newtup, map);
tuplestore_puttuple(new_tuplestore, converted);
pfree(converted);
}
else
tuplestore_puttuple(new_tuplestore, newtup);
}
if (newtup != NULL &&
(event == TRIGGER_EVENT_UPDATE && update_new_table))
/*
* Capture the new tuple in the appropriate transition table based on
* the event.
*/
if (newtup != NULL)
{
Tuplestorestate *new_tuplestore;
new_tuplestore = transition_capture->tcs_private->new_upd_tuplestore;
Tuplestorestate *tuplestore =
AfterTriggerGetTransitionTable(event,
NULL,
newtup,
transition_capture);
if (original_insert_tuple != NULL)
tuplestore_puttuple(new_tuplestore, original_insert_tuple);
else if (map != NULL)
{
HeapTuple converted = do_convert_tuple(newtup, map);
tuplestore_puttuple(new_tuplestore, converted);
pfree(converted);
}
tuplestore_puttuple(tuplestore, original_insert_tuple);
else
tuplestore_puttuple(new_tuplestore, newtup);
TransitionTableAddTuple(newtup, tuplestore, map);
}
/*
......
......@@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
execGrouping.o execIndexing.o execJunk.o \
execMain.o execParallel.o execPartition.o execProcnode.o \
execMain.o execMerge.o execParallel.o execPartition.o execProcnode.o \
execReplication.o execScan.o execSRF.o execTuples.o \
execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
nodeBitmapAnd.o nodeBitmapOr.o \
......@@ -22,7 +22,7 @@ OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
nodeCustom.o nodeFunctionscan.o nodeGather.o \
nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
nodeLimit.o nodeLockRows.o nodeGatherMerge.o \
nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeMerge.o nodeModifyTable.o \
nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
nodeNestloop.o nodeProjectSet.o nodeRecursiveunion.o nodeResult.o \
nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
nodeValuesscan.o \
......
......@@ -39,13 +39,14 @@ ModifyTable node visits each of those rows and marks the row deleted.
MERGE runs one generic plan that returns candidate target rows. Each row
consists of a super-row that contains all the columns needed by any of the
individual actions, plus a CTID and a TABLEOID junk columns. The CTID column is
individual actions, plus CTID and TABLEOID junk columns. The CTID column is
required to know if a matching target row was found or not and the TABLEOID
column is needed to find the underlying target partition, in case when the
target table is a partition table. If the CTID column is set we attempt to
activate WHEN MATCHED actions, or if it is NULL then we will attempt to
activate WHEN NOT MATCHED actions. Once we know which action is activated we
form the final result row and apply only those changes.
target table is a partition table. When a matching target tuple is found, the
CTID column identifies the matching target tuple and we attempt to activate
WHEN MATCHED actions. If a matching tuple is not found, then CTID column is
NULL and we attempt to activate WHEN NOT MATCHED actions. Once we know which
action is activated we form the final result row and apply only those changes.
XXX a great deal more documentation needs to be written here...
......
......@@ -313,6 +313,10 @@ ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd,
/*
* Given OID of the partition leaf, return the index of the leaf in the
* partition hierarchy.
*
* NB: This is an O(N) operation. Unfortunately, there are many other problem
* areas with more than a handful partitions, so we don't try to optimise this
* code right now.
*/
int
ExecFindPartitionByOid(PartitionTupleRouting *proute, Oid partoid)
......@@ -325,7 +329,10 @@ ExecFindPartitionByOid(PartitionTupleRouting *proute, Oid partoid)
break;
}
Assert(i < proute->num_partitions);
if (i >= proute->num_partitions)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("no partition found for OID %u", partoid)));
return i;
}
......
......@@ -42,7 +42,7 @@
#include "commands/trigger.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "executor/nodeMerge.h"
#include "executor/execMerge.h"
#include "executor/nodeModifyTable.h"
#include "foreign/fdwapi.h"
#include "miscadmin.h"
......@@ -69,11 +69,6 @@ static void ExecSetupChildParentMapForSubplan(ModifyTableState *mtstate);
static TupleConversionMap *tupconv_map_for_subplan(ModifyTableState *node,
int whichplan);
/* flags for mt_merge_subcommands */
#define MERGE_INSERT 0x01
#define MERGE_UPDATE 0x02
#define MERGE_DELETE 0x04
/*
* Verify that the tuples to be produced by INSERT or UPDATE match the
* target relation's rowtype
......@@ -86,7 +81,7 @@ static TupleConversionMap *tupconv_map_for_subplan(ModifyTableState *node,
* The plan output is represented by its targetlist, because that makes
* handling the dropped-column case easier.
*/
static void
void
ExecCheckPlanOutput(Relation resultRel, List *targetList)
{
TupleDesc resultDesc = RelationGetDescr(resultRel);
......@@ -2660,104 +2655,8 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
}
resultRelInfo = mtstate->resultRelInfo;
if (node->mergeActionList)
{
ListCell *l;
ExprContext *econtext;
List *mergeMatchedActionStates = NIL;
List *mergeNotMatchedActionStates = NIL;
TupleDesc relationDesc = resultRelInfo->ri_RelationDesc->rd_att;
mtstate->mt_merge_subcommands = 0;
if (mtstate->ps.ps_ExprContext == NULL)
ExecAssignExprContext(estate, &mtstate->ps);
econtext = mtstate->ps.ps_ExprContext;
/* initialize slot for the existing tuple */
Assert(mtstate->mt_existing == NULL);
mtstate->mt_existing =
ExecInitExtraTupleSlot(mtstate->ps.state,
mtstate->mt_partition_tuple_routing ?
NULL : relationDesc);
/* initialize slot for merge actions */
Assert(mtstate->mt_mergeproj == NULL);
mtstate->mt_mergeproj =
ExecInitExtraTupleSlot(mtstate->ps.state,
mtstate->mt_partition_tuple_routing ?
NULL : relationDesc);
/*
* Create a MergeActionState for each action on the mergeActionList
* and add it to either a list of matched actions or not-matched
* actions.
*/
foreach(l, node->mergeActionList)
{
MergeAction *action = (MergeAction *) lfirst(l);
MergeActionState *action_state = makeNode(MergeActionState);
TupleDesc tupDesc;
action_state->matched = action->matched;
action_state->commandType = action->commandType;
action_state->whenqual = ExecInitQual((List *) action->qual,
&mtstate->ps);
/* create target slot for this action's projection */
tupDesc = ExecTypeFromTL((List *) action->targetList,
resultRelInfo->ri_RelationDesc->rd_rel->relhasoids);
action_state->tupDesc = tupDesc;
/* build action projection state */
action_state->proj =
ExecBuildProjectionInfo(action->targetList, econtext,
mtstate->mt_mergeproj, &mtstate->ps,
resultRelInfo->ri_RelationDesc->rd_att);
/*
* We create two lists - one for WHEN MATCHED actions and one
* for WHEN NOT MATCHED actions - and stick the
* MergeActionState into the appropriate list.
*/
if (action_state->matched)
mergeMatchedActionStates =
lappend(mergeMatchedActionStates, action_state);
else
mergeNotMatchedActionStates =
lappend(mergeNotMatchedActionStates, action_state);
switch (action->commandType)
{
case CMD_INSERT:
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
action->targetList);
mtstate->mt_merge_subcommands |= MERGE_INSERT;
break;
case CMD_UPDATE:
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
action->targetList);
mtstate->mt_merge_subcommands |= MERGE_UPDATE;
break;
case CMD_DELETE:
mtstate->mt_merge_subcommands |= MERGE_DELETE;
break;
case CMD_NOTHING:
break;
default:
elog(ERROR, "unknown operation");
break;
}
resultRelInfo->ri_mergeState->matchedActionStates =
mergeMatchedActionStates;
resultRelInfo->ri_mergeState->notMatchedActionStates =
mergeNotMatchedActionStates;
}
}
if (mtstate->operation == CMD_MERGE)
ExecInitMerge(mtstate, estate, resultRelInfo);
/* select first subplan */
mtstate->mt_whichplan = 0;
......
......@@ -852,14 +852,14 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
}
/*
* The MERGE produces the target rows by performing a right
* join between the target relation and the source relation
* (which could be a plain relation or a subquery). The INSERT
* and UPDATE actions of the MERGE requires access to the
* columns from the source relation. We arrange things so that
* the source relation attributes are available as INNER_VAR
* and the target relation attributes are available from the
* scan tuple.
* The MERGE statement produces the target rows by performing a
* right join between the target relation and the source
* relation (which could be a plain relation or a subquery).
* The INSERT and UPDATE actions of the MERGE statement
* requires access to the columns from the source relation. We
* arrange things so that the source relation attributes are
* available as INNER_VAR and the target relation attributes
* are available from the scan tuple.
*/
if (splan->mergeActionList != NIL)
{
......
......@@ -585,7 +585,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <list> hash_partbound partbound_datum_list range_datum_list
%type <defelt> hash_partbound_elem
%type <node> merge_when_clause opt_and_condition
%type <node> merge_when_clause opt_merge_when_and_condition
%type <list> merge_when_list
%type <node> merge_update merge_delete merge_insert
......@@ -11129,7 +11129,7 @@ merge_when_list:
;
merge_when_clause:
WHEN MATCHED opt_and_condition THEN merge_update
WHEN MATCHED opt_merge_when_and_condition THEN merge_update
{
MergeAction *m = makeNode(MergeAction);
......@@ -11140,7 +11140,7 @@ merge_when_clause:
$$ = (Node *)m;
}
| WHEN MATCHED opt_and_condition THEN merge_delete
| WHEN MATCHED opt_merge_when_and_condition THEN merge_delete
{
MergeAction *m = makeNode(MergeAction);
......@@ -11151,7 +11151,7 @@ merge_when_clause:
$$ = (Node *)m;
}
| WHEN NOT MATCHED opt_and_condition THEN merge_insert
| WHEN NOT MATCHED opt_merge_when_and_condition THEN merge_insert
{
MergeAction *m = makeNode(MergeAction);
......@@ -11162,7 +11162,7 @@ merge_when_clause:
$$ = (Node *)m;
}
| WHEN NOT MATCHED opt_and_condition THEN DO NOTHING
| WHEN NOT MATCHED opt_merge_when_and_condition THEN DO NOTHING
{
MergeAction *m = makeNode(MergeAction);
......@@ -11175,7 +11175,7 @@ merge_when_clause:
}
;
opt_and_condition:
opt_merge_when_and_condition:
AND a_expr { $$ = $2; }
| { $$ = NULL; }
;
......
/*-------------------------------------------------------------------------
*
* nodeMerge.h
* execMerge.h
*
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/executor/nodeMerge.h
* src/include/executor/execMerge.h
*
*-------------------------------------------------------------------------
*/
#ifndef NODEMERGE_H
#define NODEMERGE_H
#ifndef EXECMERGE_H
#define EXECMERGE_H
#include "nodes/execnodes.h"
extern void
ExecMerge(ModifyTableState *mtstate, EState *estate, TupleTableSlot *slot,
JunkFilter *junkfilter, ResultRelInfo *resultRelInfo);
/* flags for mt_merge_subcommands */
#define MERGE_INSERT 0x01
#define MERGE_UPDATE 0x02
#define MERGE_DELETE 0x04
extern void ExecMerge(ModifyTableState *mtstate, EState *estate,
TupleTableSlot *slot, JunkFilter *junkfilter,
ResultRelInfo *resultRelInfo);
extern void ExecInitMerge(ModifyTableState *mtstate,
EState *estate,
ResultRelInfo *resultRelInfo);
#endif /* NODEMERGE_H */
......@@ -39,5 +39,6 @@ extern TupleTableSlot *ExecInsert(ModifyTableState *mtstate,
EState *estate,
MergeActionState *actionState,
bool canSetTag);
extern void ExecCheckPlanOutput(Relation resultRel, List *targetList);
#endif /* NODEMODIFYTABLE_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment