Commit b39630fd authored by Tom Lane's avatar Tom Lane

Fix access to no-longer-open relcache entry in logical-rep worker.

If we redirected a replicated tuple operation into a partition child
table, and then tried to fire AFTER triggers for that event, the
relation cache entry for the child table was already closed.  This has
no visible ill effects as long as the entry is still there and still
valid, but an unluckily-timed cache flush could result in a crash or
other misbehavior.

To fix, postpone the ExecCleanupTupleRouting call (which is what
closes the child table) until after we've fired triggers.  This
requires a bit of refactoring so that the cleanup function can
have access to the necessary state.

In HEAD, I took the opportunity to simplify some of worker.c's
function APIs based on use of the new ApplyExecutionData struct.
However, it doesn't seem safe/practical to back-patch that aspect,
at least not without a lot of analysis of possible interactions
with a04daa97.

In passing, add an Assert to afterTriggerInvokeEvents to catch
such cases.  This seems worthwhile because we've grown a number
of fairly unstructured ways of calling AfterTriggerEndQuery.

Back-patch to v13, where worker.c grew the ability to deal with
partitioned target tables.

Discussion: https://postgr.es/m/3382681.1621381328@sss.pgh.pa.us
parent 7ce7d07e
......@@ -4219,6 +4219,8 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events,
{
rInfo = ExecGetTriggerResultRel(estate, evtshared->ats_relid);
rel = rInfo->ri_RelationDesc;
/* Catch calls with insufficient relcache refcounting */
Assert(!RelationHasReferenceCountZero(rel));
trigdesc = rInfo->ri_TrigDesc;
finfo = rInfo->ri_TrigFunctions;
instr = rInfo->ri_TrigInstrument;
......
......@@ -136,6 +136,18 @@ typedef struct SlotErrCallbackArg
int remote_attnum;
} SlotErrCallbackArg;
typedef struct ApplyExecutionData
{
EState *estate; /* executor state, used to track resources */
LogicalRepRelMapEntry *targetRel; /* replication target rel */
ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
/* These fields are used when the target relation is partitioned: */
ModifyTableState *mtstate; /* dummy ModifyTable state */
PartitionTupleRouting *proute; /* partition routing info */
} ApplyExecutionData;
/*
* Stream xid hash entry. Whenever we see a new xid we create this entry in the
* xidhash and along with it create the streaming file and store the fileset handle.
......@@ -226,24 +238,23 @@ static void apply_dispatch(StringInfo s);
static void apply_handle_commit_internal(StringInfo s,
LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ResultRelInfo *relinfo,
EState *estate, TupleTableSlot *remoteslot);
static void apply_handle_update_internal(ResultRelInfo *relinfo,
EState *estate, TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
LogicalRepRelMapEntry *relmapentry);
static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
static void apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot);
static void apply_handle_update_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
LogicalRepRelation *remoterel);
LogicalRepTupleData *newtup);
static void apply_handle_delete_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot);
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot);
static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
EState *estate,
static void apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
LogicalRepRelMapEntry *relmapentry,
CmdType operation);
/*
......@@ -336,27 +347,29 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
/*
* Executor state preparation for evaluation of constraint expressions,
* indexes and triggers.
* indexes and triggers for the specified relation.
*
* resultRelInfo is a ResultRelInfo for the relation to be passed to the
* executor routines. The caller must open and close any indexes to be
* updated independently of the relation registered here.
* Note that the caller must open and close any indexes to be updated.
*/
static EState *
create_estate_for_relation(LogicalRepRelMapEntry *rel,
ResultRelInfo **resultRelInfo)
static ApplyExecutionData *
create_edata_for_relation(LogicalRepRelMapEntry *rel)
{
ApplyExecutionData *edata;
EState *estate;
RangeTblEntry *rte;
ResultRelInfo *resultRelInfo;
/*
* Input functions may need an active snapshot, as may AFTER triggers
* invoked during finish_estate. For safety, ensure an active snapshot
* invoked during finish_edata. For safety, ensure an active snapshot
* exists throughout all our usage of the executor.
*/
PushActiveSnapshot(GetTransactionSnapshot());
estate = CreateExecutorState();
edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
edata->targetRel = rel;
edata->estate = estate = CreateExecutorState();
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_RELATION;
......@@ -365,48 +378,62 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel,
rte->rellockmode = AccessShareLock;
ExecInitRangeTable(estate, list_make1(rte));
*resultRelInfo = makeNode(ResultRelInfo);
edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
/*
* Use Relation opened by logicalrep_rel_open() instead of opening it
* again.
*/
InitResultRelInfo(*resultRelInfo, rel->localrel, 1, NULL, 0);
InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
/*
* We put the ResultRelInfo in the es_opened_result_relations list, even
* though we don't populate the es_result_relations array. That's a bit
* bogus, but it's enough to make ExecGetTriggerResultRel() find them.
* Also, because we did not open the Relation ourselves here, there is no
* need to worry about closing it.
*
* ExecOpenIndices() is not called here either, each execution path doing
* an apply operation being responsible for that.
*/
estate->es_opened_result_relations =
lappend(estate->es_opened_result_relations, *resultRelInfo);
lappend(estate->es_opened_result_relations, resultRelInfo);
estate->es_output_cid = GetCurrentCommandId(true);
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
return estate;
/* other fields of edata remain NULL for now */
return edata;
}
/*
* Finish any operations related to the executor state created by
* create_estate_for_relation().
* create_edata_for_relation().
*/
static void
finish_estate(EState *estate)
finish_edata(ApplyExecutionData *edata)
{
EState *estate = edata->estate;
/* Handle any queued AFTER triggers. */
AfterTriggerEndQuery(estate);
/* Cleanup. */
/* Shut down tuple routing, if any was done. */
if (edata->proute)
ExecCleanupTupleRouting(edata->mtstate, edata->proute);
/*
* Cleanup. It might seem that we should call ExecCloseResultRelations()
* here, but we intentionally don't. It would close the rel we added to
* es_opened_result_relations above, which is wrong because we took no
* corresponding refcount. We rely on ExecCleanupTupleRouting() to close
* any other relations opened during execution.
*/
ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate);
pfree(edata);
PopActiveSnapshot();
}
......@@ -1189,10 +1216,10 @@ GetRelationIdentityOrPK(Relation rel)
static void
apply_handle_insert(StringInfo s)
{
ResultRelInfo *resultRelInfo;
LogicalRepRelMapEntry *rel;
LogicalRepTupleData newtup;
LogicalRepRelId relid;
ApplyExecutionData *edata;
EState *estate;
TupleTableSlot *remoteslot;
MemoryContext oldctx;
......@@ -1215,7 +1242,8 @@ apply_handle_insert(StringInfo s)
}
/* Initialize the executor state. */
estate = create_estate_for_relation(rel, &resultRelInfo);
edata = create_edata_for_relation(rel);
estate = edata->estate;
remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
......@@ -1228,24 +1256,32 @@ apply_handle_insert(StringInfo s)
/* For a partitioned table, insert the tuple into a partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
apply_handle_tuple_routing(resultRelInfo, estate,
remoteslot, NULL, rel, CMD_INSERT);
apply_handle_tuple_routing(edata,
remoteslot, NULL, CMD_INSERT);
else
apply_handle_insert_internal(resultRelInfo, estate,
apply_handle_insert_internal(edata, edata->targetRelInfo,
remoteslot);
finish_estate(estate);
finish_edata(edata);
logicalrep_rel_close(rel, NoLock);
CommandCounterIncrement();
}
/* Workhorse for apply_handle_insert() */
/*
* Workhorse for apply_handle_insert()
* relinfo is for the relation we're actually inserting into
* (could be a child partition of edata->targetRelInfo)
*/
static void
apply_handle_insert_internal(ResultRelInfo *relinfo,
EState *estate, TupleTableSlot *remoteslot)
apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot)
{
EState *estate = edata->estate;
/* We must open indexes here. */
ExecOpenIndices(relinfo, false);
/* Do the insert. */
......@@ -1296,9 +1332,9 @@ check_relation_updatable(LogicalRepRelMapEntry *rel)
static void
apply_handle_update(StringInfo s)
{
ResultRelInfo *resultRelInfo;
LogicalRepRelMapEntry *rel;
LogicalRepRelId relid;
ApplyExecutionData *edata;
EState *estate;
LogicalRepTupleData oldtup;
LogicalRepTupleData newtup;
......@@ -1329,7 +1365,8 @@ apply_handle_update(StringInfo s)
check_relation_updatable(rel);
/* Initialize the executor state. */
estate = create_estate_for_relation(rel, &resultRelInfo);
edata = create_edata_for_relation(rel);
estate = edata->estate;
remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
......@@ -1369,26 +1406,32 @@ apply_handle_update(StringInfo s)
/* For a partitioned table, apply update to correct partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
apply_handle_tuple_routing(resultRelInfo, estate,
remoteslot, &newtup, rel, CMD_UPDATE);
apply_handle_tuple_routing(edata,
remoteslot, &newtup, CMD_UPDATE);
else
apply_handle_update_internal(resultRelInfo, estate,
remoteslot, &newtup, rel);
apply_handle_update_internal(edata, edata->targetRelInfo,
remoteslot, &newtup);
finish_estate(estate);
finish_edata(edata);
logicalrep_rel_close(rel, NoLock);
CommandCounterIncrement();
}
/* Workhorse for apply_handle_update() */
/*
* Workhorse for apply_handle_update()
* relinfo is for the relation we're actually updating in
* (could be a child partition of edata->targetRelInfo)
*/
static void
apply_handle_update_internal(ResultRelInfo *relinfo,
EState *estate, TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
LogicalRepRelMapEntry *relmapentry)
apply_handle_update_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup)
{
EState *estate = edata->estate;
LogicalRepRelMapEntry *relmapentry = edata->targetRel;
Relation localrel = relinfo->ri_RelationDesc;
EPQState epqstate;
TupleTableSlot *localslot;
......@@ -1447,10 +1490,10 @@ apply_handle_update_internal(ResultRelInfo *relinfo,
static void
apply_handle_delete(StringInfo s)
{
ResultRelInfo *resultRelInfo;
LogicalRepRelMapEntry *rel;
LogicalRepTupleData oldtup;
LogicalRepRelId relid;
ApplyExecutionData *edata;
EState *estate;
TupleTableSlot *remoteslot;
MemoryContext oldctx;
......@@ -1476,7 +1519,8 @@ apply_handle_delete(StringInfo s)
check_relation_updatable(rel);
/* Initialize the executor state. */
estate = create_estate_for_relation(rel, &resultRelInfo);
edata = create_edata_for_relation(rel);
estate = edata->estate;
remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
......@@ -1488,26 +1532,32 @@ apply_handle_delete(StringInfo s)
/* For a partitioned table, apply delete to correct partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
apply_handle_tuple_routing(resultRelInfo, estate,
remoteslot, NULL, rel, CMD_DELETE);
apply_handle_tuple_routing(edata,
remoteslot, NULL, CMD_DELETE);
else
apply_handle_delete_internal(resultRelInfo, estate,
remoteslot, &rel->remoterel);
apply_handle_delete_internal(edata, edata->targetRelInfo,
remoteslot);
finish_estate(estate);
finish_edata(edata);
logicalrep_rel_close(rel, NoLock);
CommandCounterIncrement();
}
/* Workhorse for apply_handle_delete() */
/*
* Workhorse for apply_handle_delete()
* relinfo is for the relation we're actually deleting from
* (could be a child partition of edata->targetRelInfo)
*/
static void
apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
TupleTableSlot *remoteslot,
LogicalRepRelation *remoterel)
apply_handle_delete_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot)
{
EState *estate = edata->estate;
Relation localrel = relinfo->ri_RelationDesc;
LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
EPQState epqstate;
TupleTableSlot *localslot;
bool found;
......@@ -1577,16 +1627,17 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
* This handles insert, update, delete on a partitioned table.
*/
static void
apply_handle_tuple_routing(ResultRelInfo *relinfo,
EState *estate,
apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
LogicalRepRelMapEntry *relmapentry,
CmdType operation)
{
EState *estate = edata->estate;
LogicalRepRelMapEntry *relmapentry = edata->targetRel;
ResultRelInfo *relinfo = edata->targetRelInfo;
Relation parentrel = relinfo->ri_RelationDesc;
ModifyTableState *mtstate = NULL;
PartitionTupleRouting *proute = NULL;
ModifyTableState *mtstate;
PartitionTupleRouting *proute;
ResultRelInfo *partrelinfo;
Relation partrel;
TupleTableSlot *remoteslot_part;
......@@ -1594,12 +1645,14 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
MemoryContext oldctx;
/* ModifyTableState is needed for ExecFindPartition(). */
mtstate = makeNode(ModifyTableState);
edata->mtstate = mtstate = makeNode(ModifyTableState);
mtstate->ps.plan = NULL;
mtstate->ps.state = estate;
mtstate->operation = operation;
mtstate->resultRelInfo = relinfo;
proute = ExecSetupPartitionTupleRouting(estate, parentrel);
/* ... as is PartitionTupleRouting. */
edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
/*
* Find the partition to which the "search tuple" belongs.
......@@ -1633,14 +1686,13 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
switch (operation)
{
case CMD_INSERT:
apply_handle_insert_internal(partrelinfo, estate,
apply_handle_insert_internal(edata, partrelinfo,
remoteslot_part);
break;
case CMD_DELETE:
apply_handle_delete_internal(partrelinfo, estate,
remoteslot_part,
&relmapentry->remoterel);
apply_handle_delete_internal(edata, partrelinfo,
remoteslot_part);
break;
case CMD_UPDATE:
......@@ -1752,9 +1804,8 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
Assert(partrelinfo_new != partrelinfo);
/* DELETE old tuple found in the old partition. */
apply_handle_delete_internal(partrelinfo, estate,
localslot,
&relmapentry->remoterel);
apply_handle_delete_internal(edata, partrelinfo,
localslot);
/* INSERT new tuple into the new partition. */
......@@ -1782,7 +1833,7 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
slot_getallattrs(remoteslot);
}
MemoryContextSwitchTo(oldctx);
apply_handle_insert_internal(partrelinfo_new, estate,
apply_handle_insert_internal(edata, partrelinfo_new,
remoteslot_part);
}
}
......@@ -1792,8 +1843,6 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
elog(ERROR, "unrecognized CmdType: %d", (int) operation);
break;
}
ExecCleanupTupleRouting(mtstate, proute);
}
/*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment