Commit f3b141c4 authored by Michael Paquier's avatar Michael Paquier

Fix relation leak for subscribers firing triggers in logical replication

Creating a trigger on a relation to which an apply operation is
triggered would cause a relation leak once the change gets committed,
as the executor would miss that the relation needs to be closed
beforehand.  This issue got introduced with the refactoring done in
1375422c, where it becomes necessary to track relations within
es_opened_result_relations to make sure that they are closed.

We have discussed using ExecInitResultRelation() coupled with
ExecCloseResultRelations() for the relations in need of tracking by the
apply operations in the subscribers, which would simplify greatly the
opening and closing of indexes, but this requires a larger rework and
reorganization of the worker code, particularly for the tuple routing
part.  And that's not really welcome post feature freeze.  So, for now,
settle down to the same solution as TRUNCATE which is to fill in
es_opened_result_relations with the relation opened, to make sure that
ExecGetTriggerResultRel() finds them and that they get closed.

The code is lightly refactored so as a relation is not registered three
times for each DML code path, making the whole a bit easier to follow.

Reported-by: Tang Haiying, Shi Yu, Hou Zhijie
Author: Amit Langote, Masahiko Sawada, Hou Zhijie
Reviewed-by: Amit Kapila, Michael Paquier
Discussion: https://postgr.es/m/OS0PR01MB611383FA0FE92EB9DE21946AFB769@OS0PR01MB6113.jpnprd01.prod.outlook.com
parent 1599e7b3
...@@ -338,10 +338,13 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) ...@@ -338,10 +338,13 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
* Executor state preparation for evaluation of constraint expressions, * Executor state preparation for evaluation of constraint expressions,
* indexes and triggers. * indexes and triggers.
* *
* This is based on similar code in copy.c * resultRelInfo is a ResultRelInfo for the relation to be passed to the
* executor routines. The caller must open and close any indexes to be
* updated independently of the relation registered here.
*/ */
static EState * static EState *
create_estate_for_relation(LogicalRepRelMapEntry *rel) create_estate_for_relation(LogicalRepRelMapEntry *rel,
ResultRelInfo **resultRelInfo)
{ {
EState *estate; EState *estate;
RangeTblEntry *rte; RangeTblEntry *rte;
...@@ -355,6 +358,27 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) ...@@ -355,6 +358,27 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
rte->rellockmode = AccessShareLock; rte->rellockmode = AccessShareLock;
ExecInitRangeTable(estate, list_make1(rte)); ExecInitRangeTable(estate, list_make1(rte));
*resultRelInfo = makeNode(ResultRelInfo);
/*
* Use Relation opened by logicalrep_rel_open() instead of opening it
* again.
*/
InitResultRelInfo(*resultRelInfo, rel->localrel, 1, NULL, 0);
/*
* We put the ResultRelInfo in the es_opened_result_relations list, even
* though we don't populate the es_result_relations array. That's a bit
* bogus, but it's enough to make ExecGetTriggerResultRel() find them.
* Also, because we did not open the Relation ourselves here, there is no
* need to worry about closing it.
*
* ExecOpenIndices() is not called here either, each execution path doing
* an apply operation being responsible for that.
*/
estate->es_opened_result_relations =
lappend(estate->es_opened_result_relations, *resultRelInfo);
estate->es_output_cid = GetCurrentCommandId(true); estate->es_output_cid = GetCurrentCommandId(true);
/* Prepare to catch AFTER triggers. */ /* Prepare to catch AFTER triggers. */
...@@ -363,6 +387,21 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) ...@@ -363,6 +387,21 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
return estate; return estate;
} }
/*
* Finish any operations related to the executor state created by
* create_estate_for_relation().
*/
static void
finish_estate(EState *estate)
{
/* Handle any queued AFTER triggers. */
AfterTriggerEndQuery(estate);
/* Cleanup. */
ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate);
}
/* /*
* Executes default values for columns for which we can't map to remote * Executes default values for columns for which we can't map to remote
* relation columns. * relation columns.
...@@ -1168,12 +1207,10 @@ apply_handle_insert(StringInfo s) ...@@ -1168,12 +1207,10 @@ apply_handle_insert(StringInfo s)
} }
/* Initialize the executor state. */ /* Initialize the executor state. */
estate = create_estate_for_relation(rel); estate = create_estate_for_relation(rel, &resultRelInfo);
remoteslot = ExecInitExtraTupleSlot(estate, remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel), RelationGetDescr(rel->localrel),
&TTSOpsVirtual); &TTSOpsVirtual);
resultRelInfo = makeNode(ResultRelInfo);
InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
/* Input functions may need an active snapshot, so get one */ /* Input functions may need an active snapshot, so get one */
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
...@@ -1194,11 +1231,7 @@ apply_handle_insert(StringInfo s) ...@@ -1194,11 +1231,7 @@ apply_handle_insert(StringInfo s)
PopActiveSnapshot(); PopActiveSnapshot();
/* Handle queued AFTER triggers. */ finish_estate(estate);
AfterTriggerEndQuery(estate);
ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate);
logicalrep_rel_close(rel, NoLock); logicalrep_rel_close(rel, NoLock);
...@@ -1293,12 +1326,10 @@ apply_handle_update(StringInfo s) ...@@ -1293,12 +1326,10 @@ apply_handle_update(StringInfo s)
check_relation_updatable(rel); check_relation_updatable(rel);
/* Initialize the executor state. */ /* Initialize the executor state. */
estate = create_estate_for_relation(rel); estate = create_estate_for_relation(rel, &resultRelInfo);
remoteslot = ExecInitExtraTupleSlot(estate, remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel), RelationGetDescr(rel->localrel),
&TTSOpsVirtual); &TTSOpsVirtual);
resultRelInfo = makeNode(ResultRelInfo);
InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
/* /*
* Populate updatedCols so that per-column triggers can fire, and so * Populate updatedCols so that per-column triggers can fire, and so
...@@ -1345,11 +1376,7 @@ apply_handle_update(StringInfo s) ...@@ -1345,11 +1376,7 @@ apply_handle_update(StringInfo s)
PopActiveSnapshot(); PopActiveSnapshot();
/* Handle queued AFTER triggers. */ finish_estate(estate);
AfterTriggerEndQuery(estate);
ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate);
logicalrep_rel_close(rel, NoLock); logicalrep_rel_close(rel, NoLock);
...@@ -1450,12 +1477,10 @@ apply_handle_delete(StringInfo s) ...@@ -1450,12 +1477,10 @@ apply_handle_delete(StringInfo s)
check_relation_updatable(rel); check_relation_updatable(rel);
/* Initialize the executor state. */ /* Initialize the executor state. */
estate = create_estate_for_relation(rel); estate = create_estate_for_relation(rel, &resultRelInfo);
remoteslot = ExecInitExtraTupleSlot(estate, remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel), RelationGetDescr(rel->localrel),
&TTSOpsVirtual); &TTSOpsVirtual);
resultRelInfo = makeNode(ResultRelInfo);
InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
...@@ -1474,11 +1499,7 @@ apply_handle_delete(StringInfo s) ...@@ -1474,11 +1499,7 @@ apply_handle_delete(StringInfo s)
PopActiveSnapshot(); PopActiveSnapshot();
/* Handle queued AFTER triggers. */ finish_estate(estate);
AfterTriggerEndQuery(estate);
ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate);
logicalrep_rel_close(rel, NoLock); logicalrep_rel_close(rel, NoLock);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment