Commit d8653f46 authored by Peter Eisentraut's avatar Peter Eisentraut

Refactor code to look up local replication tuple

This unifies some duplicate code.

Author: Amit Langote <amitlangote09@gmail.com>
Discussion: https://www.postgresql.org/message-id/CA+HiwqFjYE5anArxvkjr37AQMd52L-LZtz9Ld2QrLQ3YfcYhTw@mail.gmail.com
parent 36962349
......@@ -122,6 +122,10 @@ static void apply_handle_update_internal(ResultRelInfo *relinfo,
static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
TupleTableSlot *remoteslot,
LogicalRepRelation *remoterel);
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot);
/*
* Should this worker apply changes for given relation.
......@@ -788,33 +792,17 @@ apply_handle_update_internal(ResultRelInfo *relinfo,
LogicalRepRelMapEntry *relmapentry)
{
Relation localrel = relinfo->ri_RelationDesc;
Oid idxoid;
EPQState epqstate;
TupleTableSlot *localslot;
bool found;
MemoryContext oldctx;
localslot = table_slot_create(localrel, &estate->es_tupleTable);
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
ExecOpenIndices(relinfo, false);
/*
* Try to find tuple using either replica identity index, primary key or
* if needed, sequential scan.
*/
idxoid = GetRelationIdentityOrPK(localrel);
Assert(OidIsValid(idxoid) ||
(relmapentry->remoterel.replident == REPLICA_IDENTITY_FULL));
if (OidIsValid(idxoid))
found = RelationFindReplTupleByIndex(localrel, idxoid,
LockTupleExclusive,
remoteslot, localslot);
else
found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
remoteslot, localslot);
found = FindReplTupleInLocalRel(estate, localrel,
&relmapentry->remoterel,
remoteslot, &localslot);
ExecClearTuple(remoteslot);
/*
......@@ -922,31 +910,15 @@ apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
LogicalRepRelation *remoterel)
{
Relation localrel = relinfo->ri_RelationDesc;
Oid idxoid;
EPQState epqstate;
TupleTableSlot *localslot;
bool found;
localslot = table_slot_create(localrel, &estate->es_tupleTable);
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
ExecOpenIndices(relinfo, false);
/*
* Try to find tuple using either replica identity index, primary key or
* if needed, sequential scan.
*/
idxoid = GetRelationIdentityOrPK(localrel);
Assert(OidIsValid(idxoid) ||
(remoterel->replident == REPLICA_IDENTITY_FULL));
if (OidIsValid(idxoid))
found = RelationFindReplTupleByIndex(localrel, idxoid,
LockTupleExclusive,
remoteslot, localslot);
else
found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
remoteslot, localslot);
found = FindReplTupleInLocalRel(estate, localrel, remoterel,
remoteslot, &localslot);
/* If found delete it. */
if (found)
......@@ -970,6 +942,39 @@ apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
EvalPlanQualEnd(&epqstate);
}
/*
* Try to find a tuple received from the publication side (in 'remoteslot') in
* the corresponding local relation using either replica identity index,
* primary key or if needed, sequential scan.
*
* Local tuple, if found, is returned in '*localslot'.
*/
static bool
FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot)
{
Oid idxoid;
bool found;
*localslot = table_slot_create(localrel, &estate->es_tupleTable);
idxoid = GetRelationIdentityOrPK(localrel);
Assert(OidIsValid(idxoid) ||
(remoterel->replident == REPLICA_IDENTITY_FULL));
if (OidIsValid(idxoid))
found = RelationFindReplTupleByIndex(localrel, idxoid,
LockTupleExclusive,
remoteslot, *localslot);
else
found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
remoteslot, *localslot);
return found;
}
/*
* Handle TRUNCATE message.
*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment