Commit 97ee604d authored by Peter Eisentraut's avatar Peter Eisentraut

Some refactoring of logical/worker.c

This moves the main operations of apply_handle_{insert|update|delete},
that of inserting, updating, deleting a tuple into/from a given
relation, into corresponding
apply_handle_{insert|update|delete}_internal functions.  This allows
performing those operations on relations that are not directly the
targets of replication, which is something a later patch will use for
targeting partitioned tables.

Author: Amit Langote <amitlangote09@gmail.com>
Reviewed-by: default avatarRafia Sabih <rafia.pghackers@gmail.com>
Reviewed-by: default avatarPeter Eisentraut <peter.eisentraut@2ndquadrant.com>
Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
parent d40d564c
...@@ -113,6 +113,16 @@ static void store_flush_position(XLogRecPtr remote_lsn); ...@@ -113,6 +113,16 @@ static void store_flush_position(XLogRecPtr remote_lsn);
static void maybe_reread_subscription(void); static void maybe_reread_subscription(void);
static void apply_handle_insert_internal(ResultRelInfo *relinfo,
EState *estate, TupleTableSlot *remoteslot);
static void apply_handle_update_internal(ResultRelInfo *relinfo,
EState *estate, TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
LogicalRepRelMapEntry *relmapentry);
static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
TupleTableSlot *remoteslot,
LogicalRepRelation *remoterel);
/* /*
* Should this worker apply changes for given relation. * Should this worker apply changes for given relation.
* *
...@@ -582,6 +592,7 @@ GetRelationIdentityOrPK(Relation rel) ...@@ -582,6 +592,7 @@ GetRelationIdentityOrPK(Relation rel)
/* /*
* Handle INSERT message. * Handle INSERT message.
*/ */
static void static void
apply_handle_insert(StringInfo s) apply_handle_insert(StringInfo s)
{ {
...@@ -621,13 +632,10 @@ apply_handle_insert(StringInfo s) ...@@ -621,13 +632,10 @@ apply_handle_insert(StringInfo s)
slot_fill_defaults(rel, estate, remoteslot); slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx); MemoryContextSwitchTo(oldctx);
ExecOpenIndices(estate->es_result_relation_info, false); Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
apply_handle_insert_internal(estate->es_result_relation_info, estate,
remoteslot);
/* Do the insert. */
ExecSimpleRelationInsert(estate, remoteslot);
/* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info);
PopActiveSnapshot(); PopActiveSnapshot();
/* Handle queued AFTER triggers. */ /* Handle queued AFTER triggers. */
...@@ -641,6 +649,20 @@ apply_handle_insert(StringInfo s) ...@@ -641,6 +649,20 @@ apply_handle_insert(StringInfo s)
CommandCounterIncrement(); CommandCounterIncrement();
} }
/* Workhorse for apply_handle_insert() */
static void
apply_handle_insert_internal(ResultRelInfo *relinfo,
EState *estate, TupleTableSlot *remoteslot)
{
ExecOpenIndices(relinfo, false);
/* Do the insert. */
ExecSimpleRelationInsert(estate, remoteslot);
/* Cleanup. */
ExecCloseIndices(relinfo);
}
/* /*
* Check if the logical replication relation is updatable and throw * Check if the logical replication relation is updatable and throw
* appropriate error if it isn't. * appropriate error if it isn't.
...@@ -684,16 +706,12 @@ apply_handle_update(StringInfo s) ...@@ -684,16 +706,12 @@ apply_handle_update(StringInfo s)
{ {
LogicalRepRelMapEntry *rel; LogicalRepRelMapEntry *rel;
LogicalRepRelId relid; LogicalRepRelId relid;
Oid idxoid;
EState *estate; EState *estate;
EPQState epqstate;
LogicalRepTupleData oldtup; LogicalRepTupleData oldtup;
LogicalRepTupleData newtup; LogicalRepTupleData newtup;
bool has_oldtup; bool has_oldtup;
TupleTableSlot *localslot;
TupleTableSlot *remoteslot; TupleTableSlot *remoteslot;
RangeTblEntry *target_rte; RangeTblEntry *target_rte;
bool found;
MemoryContext oldctx; MemoryContext oldctx;
ensure_transaction(); ensure_transaction();
...@@ -719,9 +737,6 @@ apply_handle_update(StringInfo s) ...@@ -719,9 +737,6 @@ apply_handle_update(StringInfo s)
remoteslot = ExecInitExtraTupleSlot(estate, remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel), RelationGetDescr(rel->localrel),
&TTSOpsVirtual); &TTSOpsVirtual);
localslot = table_slot_create(rel->localrel,
&estate->es_tupleTable);
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
/* /*
* Populate updatedCols so that per-column triggers can fire. This could * Populate updatedCols so that per-column triggers can fire. This could
...@@ -741,7 +756,6 @@ apply_handle_update(StringInfo s) ...@@ -741,7 +756,6 @@ apply_handle_update(StringInfo s)
fill_extraUpdatedCols(target_rte, RelationGetDescr(rel->localrel)); fill_extraUpdatedCols(target_rte, RelationGetDescr(rel->localrel));
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
ExecOpenIndices(estate->es_result_relation_info, false);
/* Build the search tuple. */ /* Build the search tuple. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
...@@ -749,20 +763,57 @@ apply_handle_update(StringInfo s) ...@@ -749,20 +763,57 @@ apply_handle_update(StringInfo s)
has_oldtup ? oldtup.values : newtup.values); has_oldtup ? oldtup.values : newtup.values);
MemoryContextSwitchTo(oldctx); MemoryContextSwitchTo(oldctx);
Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
apply_handle_update_internal(estate->es_result_relation_info, estate,
remoteslot, &newtup, rel);
PopActiveSnapshot();
/* Handle queued AFTER triggers. */
AfterTriggerEndQuery(estate);
ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate);
logicalrep_rel_close(rel, NoLock);
CommandCounterIncrement();
}
/* Workhorse for apply_handle_update() */
static void
apply_handle_update_internal(ResultRelInfo *relinfo,
EState *estate, TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
LogicalRepRelMapEntry *relmapentry)
{
Relation localrel = relinfo->ri_RelationDesc;
LogicalRepRelation *remoterel = &relmapentry->remoterel;
Oid idxoid;
EPQState epqstate;
TupleTableSlot *localslot;
bool found;
MemoryContext oldctx;
localslot = table_slot_create(localrel, &estate->es_tupleTable);
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
ExecOpenIndices(relinfo, false);
/* /*
* Try to find tuple using either replica identity index, primary key or * Try to find tuple using either replica identity index, primary key or
* if needed, sequential scan. * if needed, sequential scan.
*/ */
idxoid = GetRelationIdentityOrPK(rel->localrel); idxoid = GetRelationIdentityOrPK(localrel);
Assert(OidIsValid(idxoid) || Assert(OidIsValid(idxoid) ||
(rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup)); (remoterel->replident == REPLICA_IDENTITY_FULL));
if (OidIsValid(idxoid)) if (OidIsValid(idxoid))
found = RelationFindReplTupleByIndex(rel->localrel, idxoid, found = RelationFindReplTupleByIndex(localrel, idxoid,
LockTupleExclusive, LockTupleExclusive,
remoteslot, localslot); remoteslot, localslot);
else else
found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
remoteslot, localslot); remoteslot, localslot);
ExecClearTuple(remoteslot); ExecClearTuple(remoteslot);
...@@ -776,8 +827,8 @@ apply_handle_update(StringInfo s) ...@@ -776,8 +827,8 @@ apply_handle_update(StringInfo s)
{ {
/* Process and store remote tuple in the slot */ /* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_modify_cstrings(remoteslot, localslot, rel, slot_modify_cstrings(remoteslot, localslot, relmapentry,
newtup.values, newtup.changed); newtup->values, newtup->changed);
MemoryContextSwitchTo(oldctx); MemoryContextSwitchTo(oldctx);
EvalPlanQualSetSlot(&epqstate, remoteslot); EvalPlanQualSetSlot(&epqstate, remoteslot);
...@@ -795,23 +846,12 @@ apply_handle_update(StringInfo s) ...@@ -795,23 +846,12 @@ apply_handle_update(StringInfo s)
elog(DEBUG1, elog(DEBUG1,
"logical replication did not find row for update " "logical replication did not find row for update "
"in replication target relation \"%s\"", "in replication target relation \"%s\"",
RelationGetRelationName(rel->localrel)); RelationGetRelationName(localrel));
} }
/* Cleanup. */ /* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info); ExecCloseIndices(relinfo);
PopActiveSnapshot();
/* Handle queued AFTER triggers. */
AfterTriggerEndQuery(estate);
EvalPlanQualEnd(&epqstate); EvalPlanQualEnd(&epqstate);
ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate);
logicalrep_rel_close(rel, NoLock);
CommandCounterIncrement();
} }
/* /*
...@@ -825,12 +865,8 @@ apply_handle_delete(StringInfo s) ...@@ -825,12 +865,8 @@ apply_handle_delete(StringInfo s)
LogicalRepRelMapEntry *rel; LogicalRepRelMapEntry *rel;
LogicalRepTupleData oldtup; LogicalRepTupleData oldtup;
LogicalRepRelId relid; LogicalRepRelId relid;
Oid idxoid;
EState *estate; EState *estate;
EPQState epqstate;
TupleTableSlot *remoteslot; TupleTableSlot *remoteslot;
TupleTableSlot *localslot;
bool found;
MemoryContext oldctx; MemoryContext oldctx;
ensure_transaction(); ensure_transaction();
...@@ -855,33 +891,64 @@ apply_handle_delete(StringInfo s) ...@@ -855,33 +891,64 @@ apply_handle_delete(StringInfo s)
remoteslot = ExecInitExtraTupleSlot(estate, remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel), RelationGetDescr(rel->localrel),
&TTSOpsVirtual); &TTSOpsVirtual);
localslot = table_slot_create(rel->localrel,
&estate->es_tupleTable);
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
ExecOpenIndices(estate->es_result_relation_info, false);
/* Find the tuple using the replica identity index. */ /* Build the search tuple. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, oldtup.values); slot_store_cstrings(remoteslot, rel, oldtup.values);
MemoryContextSwitchTo(oldctx); MemoryContextSwitchTo(oldctx);
Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
apply_handle_delete_internal(estate->es_result_relation_info, estate,
remoteslot, &rel->remoterel);
PopActiveSnapshot();
/* Handle queued AFTER triggers. */
AfterTriggerEndQuery(estate);
ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate);
logicalrep_rel_close(rel, NoLock);
CommandCounterIncrement();
}
/* Workhorse for apply_handle_delete() */
static void
apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
TupleTableSlot *remoteslot,
LogicalRepRelation *remoterel)
{
Relation localrel = relinfo->ri_RelationDesc;
Oid idxoid;
EPQState epqstate;
TupleTableSlot *localslot;
bool found;
localslot = table_slot_create(localrel, &estate->es_tupleTable);
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
ExecOpenIndices(relinfo, false);
/* /*
* Try to find tuple using either replica identity index, primary key or * Try to find tuple using either replica identity index, primary key or
* if needed, sequential scan. * if needed, sequential scan.
*/ */
idxoid = GetRelationIdentityOrPK(rel->localrel); idxoid = GetRelationIdentityOrPK(localrel);
Assert(OidIsValid(idxoid) || Assert(OidIsValid(idxoid) ||
(rel->remoterel.replident == REPLICA_IDENTITY_FULL)); (remoterel->replident == REPLICA_IDENTITY_FULL));
if (OidIsValid(idxoid)) if (OidIsValid(idxoid))
found = RelationFindReplTupleByIndex(rel->localrel, idxoid, found = RelationFindReplTupleByIndex(localrel, idxoid,
LockTupleExclusive, LockTupleExclusive,
remoteslot, localslot); remoteslot, localslot);
else else
found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
remoteslot, localslot); remoteslot, localslot);
/* If found delete it. */ /* If found delete it. */
if (found) if (found)
{ {
...@@ -896,23 +963,12 @@ apply_handle_delete(StringInfo s) ...@@ -896,23 +963,12 @@ apply_handle_delete(StringInfo s)
elog(DEBUG1, elog(DEBUG1,
"logical replication could not find row for delete " "logical replication could not find row for delete "
"in replication target relation \"%s\"", "in replication target relation \"%s\"",
RelationGetRelationName(rel->localrel)); RelationGetRelationName(localrel));
} }
/* Cleanup. */ /* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info); ExecCloseIndices(relinfo);
PopActiveSnapshot();
/* Handle queued AFTER triggers. */
AfterTriggerEndQuery(estate);
EvalPlanQualEnd(&epqstate); EvalPlanQualEnd(&epqstate);
ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate);
logicalrep_rel_close(rel, NoLock);
CommandCounterIncrement();
} }
/* /*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment