Commit 644ea35f authored by Peter Eisentraut's avatar Peter Eisentraut

Fix updating of pg_subscription_rel from workers

A logical replication worker should not insert new rows into
pg_subscription_rel, only update existing rows, so that there are no
races if a concurrent refresh removes rows.  Adjust the API to be able
to choose that behavior.

Author: Masahiko Sawada <sawada.mshk@gmail.com>
Reported-by: default avatartushar <tushar.ahuja@enterprisedb.com>
parent 15ce775f
...@@ -227,17 +227,22 @@ textarray_to_stringlist(ArrayType *textarray) ...@@ -227,17 +227,22 @@ textarray_to_stringlist(ArrayType *textarray)
/* /*
* Set the state of a subscription table. * Set the state of a subscription table.
* *
* If update_only is true and the record for given table doesn't exist, do
* nothing. This can be used to avoid inserting a new record that was deleted
* by someone else. Generally, subscription DDL commands should use false,
* workers should use true.
*
* The insert-or-update logic in this function is not concurrency safe so it * The insert-or-update logic in this function is not concurrency safe so it
* might raise an error in rare circumstances. But if we took a stronger lock * might raise an error in rare circumstances. But if we took a stronger lock
* such as ShareRowExclusiveLock, we would risk more deadlocks. * such as ShareRowExclusiveLock, we would risk more deadlocks.
*/ */
Oid Oid
SetSubscriptionRelState(Oid subid, Oid relid, char state, SetSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn) XLogRecPtr sublsn, bool update_only)
{ {
Relation rel; Relation rel;
HeapTuple tup; HeapTuple tup;
Oid subrelid; Oid subrelid = InvalidOid;
bool nulls[Natts_pg_subscription_rel]; bool nulls[Natts_pg_subscription_rel];
Datum values[Natts_pg_subscription_rel]; Datum values[Natts_pg_subscription_rel];
...@@ -252,7 +257,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, ...@@ -252,7 +257,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
* If the record for given table does not exist yet create new record, * If the record for given table does not exist yet create new record,
* otherwise update the existing one. * otherwise update the existing one.
*/ */
if (!HeapTupleIsValid(tup)) if (!HeapTupleIsValid(tup) && !update_only)
{ {
/* Form the tuple. */ /* Form the tuple. */
memset(values, 0, sizeof(values)); memset(values, 0, sizeof(values));
...@@ -272,7 +277,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, ...@@ -272,7 +277,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
heap_freetuple(tup); heap_freetuple(tup);
} }
else else if (HeapTupleIsValid(tup))
{ {
bool replaces[Natts_pg_subscription_rel]; bool replaces[Natts_pg_subscription_rel];
......
...@@ -451,7 +451,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) ...@@ -451,7 +451,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
rv->schemaname, rv->relname); rv->schemaname, rv->relname);
SetSubscriptionRelState(subid, relid, table_state, SetSubscriptionRelState(subid, relid, table_state,
InvalidXLogRecPtr); InvalidXLogRecPtr, false);
} }
ereport(NOTICE, ereport(NOTICE,
...@@ -574,7 +574,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) ...@@ -574,7 +574,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
{ {
SetSubscriptionRelState(sub->oid, relid, SetSubscriptionRelState(sub->oid, relid,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
InvalidXLogRecPtr); InvalidXLogRecPtr, false);
ereport(NOTICE, ereport(NOTICE,
(errmsg("added subscription for table %s.%s", (errmsg("added subscription for table %s.%s",
quote_identifier(rv->schemaname), quote_identifier(rv->schemaname),
......
...@@ -287,7 +287,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) ...@@ -287,7 +287,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
SetSubscriptionRelState(MyLogicalRepWorker->subid, SetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid, MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn); MyLogicalRepWorker->relstate_lsn,
true);
walrcv_endstreaming(wrconn, &tli); walrcv_endstreaming(wrconn, &tli);
finish_sync_worker(); finish_sync_worker();
...@@ -414,7 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ...@@ -414,7 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
} }
SetSubscriptionRelState(MyLogicalRepWorker->subid, SetSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state, rstate->relid, rstate->state,
rstate->lsn); rstate->lsn, true);
} }
} }
else else
...@@ -845,7 +846,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -845,7 +846,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
SetSubscriptionRelState(MyLogicalRepWorker->subid, SetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid, MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn); MyLogicalRepWorker->relstate_lsn,
true);
CommitTransactionCommand(); CommitTransactionCommand();
pgstat_report_stat(false); pgstat_report_stat(false);
...@@ -932,7 +934,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -932,7 +934,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
SetSubscriptionRelState(MyLogicalRepWorker->subid, SetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid, MyLogicalRepWorker->relid,
SUBREL_STATE_SYNCDONE, SUBREL_STATE_SYNCDONE,
*origin_startpos); *origin_startpos,
true);
finish_sync_worker(); finish_sync_worker();
} }
break; break;
......
...@@ -71,7 +71,7 @@ typedef struct SubscriptionRelState ...@@ -71,7 +71,7 @@ typedef struct SubscriptionRelState
} SubscriptionRelState; } SubscriptionRelState;
extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn); XLogRecPtr sublsn, bool update_only);
extern char GetSubscriptionRelState(Oid subid, Oid relid, extern char GetSubscriptionRelState(Oid subid, Oid relid,
XLogRecPtr *sublsn, bool missing_ok); XLogRecPtr *sublsn, bool missing_ok);
extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern void RemoveSubscriptionRel(Oid subid, Oid relid);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment