Commit b05fe7b4 authored by Alvaro Herrera's avatar Alvaro Herrera

Review logical replication tablesync code

Most importantly, remove optimization in LogicalRepSyncTableStart that
skips the normal walrcv_startstreaming/endstreaming dance.  The
optimization is not critically important for production uses anyway,
since it only fires in cases with no activity, and saves an
uninteresting amount of work even then.  Critically, it obscures bugs by
hiding the interesting code path from test cases.

Also: in GetSubscriptionRelState, remove pointless relation open; access
pg_subscription_rel->srsubstate with GETSTRUCT as is typical rather than
SysCacheGetAttr; remove unused 'missing_ok' argument.
In wait_for_relation_state_change, use explicit catalog snapshot
invalidation rather than obscurely (and expensively) through
GetLatestSnapshot.
In various places: sprinkle comments more liberally and rewrite a number
of them.  Other cosmetic code improvements.

No backpatch, since no bug is being fixed here.

Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: default avatarPetr Jelínek <petr.jelinek@2ndquadrant.com>
Discussion: https://postgr.es/m/20201010190637.GA5774@alvherre.pgsql
parent c5b097f8
...@@ -328,20 +328,16 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, ...@@ -328,20 +328,16 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
/* /*
* Get state of subscription table. * Get state of subscription table.
* *
* Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true. * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
*/ */
char char
GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
bool missing_ok)
{ {
Relation rel;
HeapTuple tup; HeapTuple tup;
char substate; char substate;
bool isnull; bool isnull;
Datum d; Datum d;
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
/* Try finding the mapping. */ /* Try finding the mapping. */
tup = SearchSysCache2(SUBSCRIPTIONRELMAP, tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
ObjectIdGetDatum(relid), ObjectIdGetDatum(relid),
...@@ -349,22 +345,14 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, ...@@ -349,22 +345,14 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
if (!HeapTupleIsValid(tup)) if (!HeapTupleIsValid(tup))
{ {
if (missing_ok) *sublsn = InvalidXLogRecPtr;
{ return SUBREL_STATE_UNKNOWN;
table_close(rel, AccessShareLock);
*sublsn = InvalidXLogRecPtr;
return SUBREL_STATE_UNKNOWN;
}
elog(ERROR, "subscription table %u in subscription %u does not exist",
relid, subid);
} }
/* Get the state. */ /* Get the state. */
d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
Anum_pg_subscription_rel_srsubstate, &isnull);
Assert(!isnull); /* Get the LSN */
substate = DatumGetChar(d);
d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
Anum_pg_subscription_rel_srsublsn, &isnull); Anum_pg_subscription_rel_srsublsn, &isnull);
if (isnull) if (isnull)
...@@ -374,7 +362,6 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, ...@@ -374,7 +362,6 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
/* Cleanup */ /* Cleanup */
ReleaseSysCache(tup); ReleaseSysCache(tup);
table_close(rel, AccessShareLock);
return substate; return substate;
} }
......
...@@ -437,8 +437,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) ...@@ -437,8 +437,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
if (entry->state != SUBREL_STATE_READY) if (entry->state != SUBREL_STATE_READY)
entry->state = GetSubscriptionRelState(MySubscription->oid, entry->state = GetSubscriptionRelState(MySubscription->oid,
entry->localreloid, entry->localreloid,
&entry->statelsn, &entry->statelsn);
true);
return entry; return entry;
} }
......
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* tablesync.c * tablesync.c
* PostgreSQL logical replication * PostgreSQL logical replication: initial table data synchronization
* *
* Copyright (c) 2012-2020, PostgreSQL Global Development Group * Copyright (c) 2012-2020, PostgreSQL Global Development Group
* *
...@@ -26,26 +26,30 @@ ...@@ -26,26 +26,30 @@
* - It allows us to synchronize any tables added after the initial * - It allows us to synchronize any tables added after the initial
* synchronization has finished. * synchronization has finished.
* *
* The stream position synchronization works in multiple steps. * The stream position synchronization works in multiple steps:
* - Sync finishes copy and sets worker state as SYNCWAIT and waits for * - Apply worker requests a tablesync worker to start, setting the new
* state to change in a loop. * table state to INIT.
* - Apply periodically checks tables that are synchronizing for SYNCWAIT. * - Tablesync worker starts; changes table state from INIT to DATASYNC while
* When the desired state appears, it will set the worker state to * copying.
* CATCHUP and starts loop-waiting until either the table state is set * - Tablesync worker finishes the copy and sets table state to SYNCWAIT;
* to SYNCDONE or the sync worker exits. * waits for state change.
* - Apply worker periodically checks for tables in SYNCWAIT state. When
* any appear, it sets the table state to CATCHUP and starts loop-waiting
* until either the table state is set to SYNCDONE or the sync worker
* exits.
* - After the sync worker has seen the state change to CATCHUP, it will * - After the sync worker has seen the state change to CATCHUP, it will
* read the stream and apply changes (acting like an apply worker) until * read the stream and apply changes (acting like an apply worker) until
* it catches up to the specified stream position. Then it sets the * it catches up to the specified stream position. Then it sets the
* state to SYNCDONE. There might be zero changes applied between * state to SYNCDONE. There might be zero changes applied between
* CATCHUP and SYNCDONE, because the sync worker might be ahead of the * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
* apply worker. * apply worker.
* - Once the state was set to SYNCDONE, the apply will continue tracking * - Once the state is set to SYNCDONE, the apply will continue tracking
* the table until it reaches the SYNCDONE stream position, at which * the table until it reaches the SYNCDONE stream position, at which
* point it sets state to READY and stops tracking. Again, there might * point it sets state to READY and stops tracking. Again, there might
* be zero changes in between. * be zero changes in between.
* *
* So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP -> * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT ->
* SYNCDONE -> READY. * CATCHUP -> SYNCDONE -> READY.
* *
* The catalog pg_subscription_rel is used to keep information about * The catalog pg_subscription_rel is used to keep information about
* subscribed tables and their state. Some transient state during data * subscribed tables and their state. Some transient state during data
...@@ -67,7 +71,8 @@ ...@@ -67,7 +71,8 @@
* -> continue rep * -> continue rep
* apply:11 * apply:11
* -> set in catalog READY * -> set in catalog READY
* - Sync in front: *
* - Sync is in front:
* sync:10 * sync:10
* -> set in memory SYNCWAIT * -> set in memory SYNCWAIT
* apply:8 * apply:8
...@@ -142,13 +147,14 @@ finish_sync_worker(void) ...@@ -142,13 +147,14 @@ finish_sync_worker(void)
} }
/* /*
* Wait until the relation synchronization state is set in the catalog to the * Wait until the relation sync state is set in the catalog to the expected
* expected one. * one; return true when it happens.
* *
* Used when transitioning from CATCHUP state to SYNCDONE. * Returns false if the table sync worker or the table itself have
* disappeared, or the table state has been reset.
* *
* Returns false if the synchronization worker has disappeared or the table state * Currently, this is used in the apply worker when transitioning from
* has been reset. * CATCHUP state to SYNCDONE.
*/ */
static bool static bool
wait_for_relation_state_change(Oid relid, char expected_state) wait_for_relation_state_change(Oid relid, char expected_state)
...@@ -162,28 +168,23 @@ wait_for_relation_state_change(Oid relid, char expected_state) ...@@ -162,28 +168,23 @@ wait_for_relation_state_change(Oid relid, char expected_state)
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/* XXX use cache invalidation here to improve performance? */ InvalidateCatalogSnapshot();
PushActiveSnapshot(GetLatestSnapshot());
state = GetSubscriptionRelState(MyLogicalRepWorker->subid, state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
relid, &statelsn, true); relid, &statelsn);
PopActiveSnapshot();
if (state == SUBREL_STATE_UNKNOWN) if (state == SUBREL_STATE_UNKNOWN)
return false; break;
if (state == expected_state) if (state == expected_state)
return true; return true;
/* Check if the sync worker is still running and bail if not. */ /* Check if the sync worker is still running and bail if not. */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
/* Check if the opposite worker is still running and bail if not. */
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
am_tablesync_worker() ? InvalidOid : relid,
false); false);
LWLockRelease(LogicalRepWorkerLock); LWLockRelease(LogicalRepWorkerLock);
if (!worker) if (!worker)
return false; break;
(void) WaitLatch(MyLatch, (void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
...@@ -810,6 +811,9 @@ copy_table(Relation rel) ...@@ -810,6 +811,9 @@ copy_table(Relation rel)
/* /*
* Start syncing the table in the sync worker. * Start syncing the table in the sync worker.
* *
* If nothing needs to be done to sync the table, we exit the worker without
* any further action.
*
* The returned slot name is palloc'ed in current memory context. * The returned slot name is palloc'ed in current memory context.
*/ */
char * char *
...@@ -819,12 +823,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -819,12 +823,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
char *err; char *err;
char relstate; char relstate;
XLogRecPtr relstate_lsn; XLogRecPtr relstate_lsn;
Relation rel;
WalRcvExecResult *res;
/* Check the state of the table synchronization. */ /* Check the state of the table synchronization. */
StartTransactionCommand(); StartTransactionCommand();
relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid, MyLogicalRepWorker->relid,
&relstate_lsn, true); &relstate_lsn);
CommitTransactionCommand(); CommitTransactionCommand();
SpinLockAcquire(&MyLogicalRepWorker->relmutex); SpinLockAcquire(&MyLogicalRepWorker->relmutex);
...@@ -832,6 +838,18 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -832,6 +838,18 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
MyLogicalRepWorker->relstate_lsn = relstate_lsn; MyLogicalRepWorker->relstate_lsn = relstate_lsn;
SpinLockRelease(&MyLogicalRepWorker->relmutex); SpinLockRelease(&MyLogicalRepWorker->relmutex);
/*
* If synchronization is already done or no longer necessary, exit now
* that we've updated shared memory state.
*/
switch (relstate)
{
case SUBREL_STATE_SYNCDONE:
case SUBREL_STATE_READY:
case SUBREL_STATE_UNKNOWN:
finish_sync_worker(); /* doesn't return */
}
/* /*
* To build a slot name for the sync work, we are limited to NAMEDATALEN - * To build a slot name for the sync work, we are limited to NAMEDATALEN -
* 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars
...@@ -856,134 +874,87 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -856,134 +874,87 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
ereport(ERROR, ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err))); (errmsg("could not connect to the publisher: %s", err)));
switch (MyLogicalRepWorker->relstate) Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
{ MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
case SUBREL_STATE_INIT:
case SUBREL_STATE_DATASYNC:
{
Relation rel;
WalRcvExecResult *res;
SpinLockAcquire(&MyLogicalRepWorker->relmutex); SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
SpinLockRelease(&MyLogicalRepWorker->relmutex); SpinLockRelease(&MyLogicalRepWorker->relmutex);
/* Update the state and make it visible to others. */
StartTransactionCommand();
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
CommitTransactionCommand();
pgstat_report_stat(false);
/* /* Update the state and make it visible to others. */
* We want to do the table data sync in a single transaction. StartTransactionCommand();
*/ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
StartTransactionCommand(); MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
CommitTransactionCommand();
pgstat_report_stat(false);
/* /*
* Use a standard write lock here. It might be better to * We want to do the table data sync in a single transaction.
* disallow access to the table while it's being synchronized. */
* But we don't want to block the main apply process from StartTransactionCommand();
* working and it has to open the relation in RowExclusiveLock
* when remapping remote relation id to local one.
*/
rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
/* /*
* Create a temporary slot for the sync process. We do this * Use a standard write lock here. It might be better to disallow access
* inside the transaction so that we can use the snapshot made * to the table while it's being synchronized. But we don't want to block
* by the slot to get existing data. * the main apply process from working and it has to open the relation in
*/ * RowExclusiveLock when remapping remote relation id to local one.
res = walrcv_exec(wrconn, */
"BEGIN READ ONLY ISOLATION LEVEL " rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
"REPEATABLE READ", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("table copy could not start transaction on publisher"),
errdetail("The error was: %s", res->err)));
walrcv_clear_result(res);
/* /*
* Create new temporary logical decoding slot. * Start a transaction in the remote node in REPEATABLE READ mode. This
* * ensures that both the replication slot we create (see below) and the
* We'll use slot for data copy so make sure the snapshot is * COPY are consistent with each other.
* used for the transaction; that way the COPY will get data */
* that is consistent with the lsn used by the slot to start res = walrcv_exec(wrconn,
* decoding. "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
*/ 0, NULL);
walrcv_create_slot(wrconn, slotname, true, if (res->status != WALRCV_OK_COMMAND)
CRS_USE_SNAPSHOT, origin_startpos); ereport(ERROR,
(errmsg("table copy could not start transaction on publisher"),
errdetail("The error was: %s", res->err)));
walrcv_clear_result(res);
PushActiveSnapshot(GetTransactionSnapshot()); /*
copy_table(rel); * Create a new temporary logical decoding slot. This slot will be used
PopActiveSnapshot(); * for the catchup phase after COPY is done, so tell it to use the
* snapshot to make the final data consistent.
*/
walrcv_create_slot(wrconn, slotname, true,
CRS_USE_SNAPSHOT, origin_startpos);
res = walrcv_exec(wrconn, "COMMIT", 0, NULL); /* Now do the initial data copy */
if (res->status != WALRCV_OK_COMMAND) PushActiveSnapshot(GetTransactionSnapshot());
ereport(ERROR, copy_table(rel);
(errmsg("table copy could not finish transaction on publisher"), PopActiveSnapshot();
errdetail("The error was: %s", res->err)));
walrcv_clear_result(res);
table_close(rel, NoLock); res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("table copy could not finish transaction on publisher"),
errdetail("The error was: %s", res->err)));
walrcv_clear_result(res);
/* Make the copy visible. */ table_close(rel, NoLock);
CommandCounterIncrement();
/* /* Make the copy visible. */
* We are done with the initial data synchronization, update CommandCounterIncrement();
* the state.
*/
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
MyLogicalRepWorker->relstate_lsn = *origin_startpos;
SpinLockRelease(&MyLogicalRepWorker->relmutex);
/* Wait for main apply worker to tell us to catchup. */
wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
/*----------
* There are now two possible states here:
* a) Sync is behind the apply. If that's the case we need to
* catch up with it by consuming the logical replication
* stream up to the relstate_lsn. For that, we exit this
* function and continue in ApplyWorkerMain().
* b) Sync is caught up with the apply. So it can just set
* the state to SYNCDONE and finish.
*----------
*/
if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
{
/*
* Update the new state in catalog. No need to bother
* with the shmem state as we are exiting for good.
*/
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
SUBREL_STATE_SYNCDONE,
*origin_startpos);
finish_sync_worker();
}
break;
}
case SUBREL_STATE_SYNCDONE:
case SUBREL_STATE_READY:
case SUBREL_STATE_UNKNOWN:
/* /*
* Nothing to do here but finish. (UNKNOWN means the relation was * We are done with the initial data synchronization, update the state.
* removed from pg_subscription_rel before the sync worker could */
* start.) SpinLockAcquire(&MyLogicalRepWorker->relmutex);
*/ MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
finish_sync_worker(); MyLogicalRepWorker->relstate_lsn = *origin_startpos;
break; SpinLockRelease(&MyLogicalRepWorker->relmutex);
default:
elog(ERROR, "unknown relation state \"%c\"",
MyLogicalRepWorker->relstate);
}
/*
* Finally, wait until the main apply worker tells us to catch up and then
* return to let LogicalRepApplyLoop do it.
*/
wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
return slotname; return slotname;
} }
...@@ -2060,6 +2060,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -2060,6 +2060,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
{ {
TimestampTz last_recv_timestamp = GetCurrentTimestamp(); TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false; bool ping_sent = false;
TimeLineID tli;
/* /*
* Init the ApplyMessageContext which we clean up after each replication * Init the ApplyMessageContext which we clean up after each replication
...@@ -2201,12 +2202,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -2201,12 +2202,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* Check if we need to exit the streaming loop. */ /* Check if we need to exit the streaming loop. */
if (endofstream) if (endofstream)
{
TimeLineID tli;
walrcv_endstreaming(wrconn, &tli);
break; break;
}
/* /*
* Wait for more data or latch. If we have unflushed transactions, * Wait for more data or latch. If we have unflushed transactions,
...@@ -2283,6 +2279,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -2283,6 +2279,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
send_feedback(last_received, requestReply, requestReply); send_feedback(last_received, requestReply, requestReply);
} }
} }
/* All done */
walrcv_endstreaming(wrconn, &tli);
} }
/* /*
...@@ -3024,10 +3023,8 @@ ApplyWorkerMain(Datum main_arg) ...@@ -3024,10 +3023,8 @@ ApplyWorkerMain(Datum main_arg)
/* This is table synchronization worker, call initial sync. */ /* This is table synchronization worker, call initial sync. */
syncslotname = LogicalRepSyncTableStart(&origin_startpos); syncslotname = LogicalRepSyncTableStart(&origin_startpos);
/* The slot name needs to be allocated in permanent memory context. */ /* allocate slot name in long-lived context */
oldctx = MemoryContextSwitchTo(ApplyContext); myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
myslotname = pstrdup(syncslotname);
MemoryContextSwitchTo(oldctx);
pfree(syncslotname); pfree(syncslotname);
} }
......
...@@ -80,8 +80,7 @@ extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, ...@@ -80,8 +80,7 @@ extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn); XLogRecPtr sublsn);
extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn); XLogRecPtr sublsn);
extern char GetSubscriptionRelState(Oid subid, Oid relid, extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
XLogRecPtr *sublsn, bool missing_ok);
extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern void RemoveSubscriptionRel(Oid subid, Oid relid);
extern List *GetSubscriptionRelations(Oid subid); extern List *GetSubscriptionRelations(Oid subid);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment