Commit f332241a authored by Tom Lane's avatar Tom Lane

Fix race conditions in synchronous standby management.

We have repeatedly seen the buildfarm reach the Assert(false) in
SyncRepGetSyncStandbysPriority.  This apparently is due to failing to
consider the possibility that the sync_standby_priority values in
shared memory might be inconsistent; but they will be whenever only
some of the walsenders have updated their values after a change in
the synchronous_standby_names setting.  That function is vastly too
complex for what it does, anyway, so rewriting it seems better than
trying to apply a band-aid fix.

Furthermore, the API of SyncRepGetSyncStandbys is broken by design:
it returns a list of WalSnd array indexes, but there is nothing
guaranteeing that the contents of the WalSnd array remain stable.
Thus, if some walsender exits and then a new walsender process
takes over that WalSnd array slot, a caller might make use of
WAL position data that it should not, potentially leading to
incorrect decisions about whether to release transactions that
are waiting for synchronous commit.

To fix, replace SyncRepGetSyncStandbys with a new function
SyncRepGetCandidateStandbys that copies all the required data
from shared memory while holding the relevant mutexes.  If the
associated walsender process then exits, this data is still safe to
make release decisions with, since we know that that much WAL *was*
sent to a valid standby server.  This incidentally means that we no
longer need to treat sync_standby_priority as protected by the
SyncRepLock rather than the per-walsender mutex.

SyncRepGetSyncStandbys is no longer used by the core code, so remove
it entirely in HEAD.  However, it seems possible that external code is
relying on that function, so do not remove it from the back branches.
Instead, just remove the known-incorrect Assert.  When the bug occurs,
the function will return a too-short list, which callers should treat
as meaning there are not enough sync standbys, which seems like a
reasonably safe fallback until the inconsistent state is resolved.
Moreover it's bug-compatible with what has been happening in non-assert
builds.  We cannot do anything about the walsender-replacement race
condition without an API/ABI break.

The bogus assertion exists back to 9.6, but 9.6 is sufficiently
different from the later branches that the patch doesn't apply at all.
I chose to just remove the bogus assertion in 9.6, feeling that the
probability of a bad outcome from the walsender-replacement race
condition is too low to justify rewriting the whole patch for 9.6.

Discussion: https://postgr.es/m/21519.1585272409@sss.pgh.pa.us
parent 3cb02e30
...@@ -108,14 +108,16 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, ...@@ -108,14 +108,16 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
XLogRecPtr *flushPtr, XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr, XLogRecPtr *applyPtr,
List *sync_standbys); SyncRepStandbyData *sync_standbys,
int num_standbys);
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
XLogRecPtr *flushPtr, XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr, XLogRecPtr *applyPtr,
List *sync_standbys, uint8 nth); SyncRepStandbyData *sync_standbys,
int num_standbys,
uint8 nth);
static int SyncRepGetStandbyPriority(void); static int SyncRepGetStandbyPriority(void);
static List *SyncRepGetSyncStandbysPriority(bool *am_sync); static int standby_priority_comparator(const void *a, const void *b);
static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
static int cmp_lsn(const void *a, const void *b); static int cmp_lsn(const void *a, const void *b);
#ifdef USE_ASSERT_CHECKING #ifdef USE_ASSERT_CHECKING
...@@ -406,9 +408,10 @@ SyncRepInitConfig(void) ...@@ -406,9 +408,10 @@ SyncRepInitConfig(void)
priority = SyncRepGetStandbyPriority(); priority = SyncRepGetStandbyPriority();
if (MyWalSnd->sync_standby_priority != priority) if (MyWalSnd->sync_standby_priority != priority)
{ {
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sync_standby_priority = priority; MyWalSnd->sync_standby_priority = priority;
LWLockRelease(SyncRepLock); SpinLockRelease(&MyWalSnd->mutex);
ereport(DEBUG1, ereport(DEBUG1,
(errmsg("standby \"%s\" now has synchronous standby priority %u", (errmsg("standby \"%s\" now has synchronous standby priority %u",
application_name, priority))); application_name, priority)));
...@@ -459,7 +462,11 @@ SyncRepReleaseWaiters(void) ...@@ -459,7 +462,11 @@ SyncRepReleaseWaiters(void)
/* /*
* Check whether we are a sync standby or not, and calculate the synced * Check whether we are a sync standby or not, and calculate the synced
* positions among all sync standbys. * positions among all sync standbys. (Note: although this step does not
* of itself require holding SyncRepLock, it seems like a good idea to do
* it after acquiring the lock. This ensures that the WAL pointers we use
* to release waiters are newer than any previous execution of this
* routine used.)
*/ */
got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync); got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
...@@ -523,8 +530,6 @@ SyncRepReleaseWaiters(void) ...@@ -523,8 +530,6 @@ SyncRepReleaseWaiters(void)
/* /*
* Calculate the synced Write, Flush and Apply positions among sync standbys. * Calculate the synced Write, Flush and Apply positions among sync standbys.
* *
* The caller must hold SyncRepLock.
*
* Return false if the number of sync standbys is less than * Return false if the number of sync standbys is less than
* synchronous_standby_names specifies. Otherwise return true and * synchronous_standby_names specifies. Otherwise return true and
* store the positions into *writePtr, *flushPtr and *applyPtr. * store the positions into *writePtr, *flushPtr and *applyPtr.
...@@ -536,27 +541,41 @@ static bool ...@@ -536,27 +541,41 @@ static bool
SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr, bool *am_sync) XLogRecPtr *applyPtr, bool *am_sync)
{ {
List *sync_standbys; SyncRepStandbyData *sync_standbys;
int num_standbys;
Assert(LWLockHeldByMe(SyncRepLock)); int i;
/* Initialize default results */
*writePtr = InvalidXLogRecPtr; *writePtr = InvalidXLogRecPtr;
*flushPtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr;
*applyPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr;
*am_sync = false; *am_sync = false;
/* Quick out if not even configured to be synchronous */
if (SyncRepConfig == NULL)
return false;
/* Get standbys that are considered as synchronous at this moment */ /* Get standbys that are considered as synchronous at this moment */
sync_standbys = SyncRepGetSyncStandbys(am_sync); num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
/* Am I among the candidate sync standbys? */
for (i = 0; i < num_standbys; i++)
{
if (sync_standbys[i].is_me)
{
*am_sync = true;
break;
}
}
/* /*
* Quick exit if we are not managing a sync standby or there are not * Nothing more to do if we are not managing a sync standby or there are
* enough synchronous standbys. * not enough synchronous standbys.
*/ */
if (!(*am_sync) || if (!(*am_sync) ||
SyncRepConfig == NULL || num_standbys < SyncRepConfig->num_sync)
list_length(sync_standbys) < SyncRepConfig->num_sync)
{ {
list_free(sync_standbys); pfree(sync_standbys);
return false; return false;
} }
...@@ -576,15 +595,16 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, ...@@ -576,15 +595,16 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
{ {
SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
sync_standbys); sync_standbys, num_standbys);
} }
else else
{ {
SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
sync_standbys, SyncRepConfig->num_sync); sync_standbys, num_standbys,
SyncRepConfig->num_sync);
} }
list_free(sync_standbys); pfree(sync_standbys);
return true; return true;
} }
...@@ -592,27 +612,24 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, ...@@ -592,27 +612,24 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
* Calculate the oldest Write, Flush and Apply positions among sync standbys. * Calculate the oldest Write, Flush and Apply positions among sync standbys.
*/ */
static void static void
SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
XLogRecPtr *applyPtr, List *sync_standbys) XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr,
SyncRepStandbyData *sync_standbys,
int num_standbys)
{ {
ListCell *cell; int i;
/* /*
* Scan through all sync standbys and calculate the oldest Write, Flush * Scan through all sync standbys and calculate the oldest Write, Flush
* and Apply positions. * and Apply positions. We assume *writePtr et al were initialized to
* InvalidXLogRecPtr.
*/ */
foreach(cell, sync_standbys) for (i = 0; i < num_standbys; i++)
{ {
WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; XLogRecPtr write = sync_standbys[i].write;
XLogRecPtr write; XLogRecPtr flush = sync_standbys[i].flush;
XLogRecPtr flush; XLogRecPtr apply = sync_standbys[i].apply;
XLogRecPtr apply;
SpinLockAcquire(&walsnd->mutex);
write = walsnd->write;
flush = walsnd->flush;
apply = walsnd->apply;
SpinLockRelease(&walsnd->mutex);
if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
*writePtr = write; *writePtr = write;
...@@ -628,38 +645,36 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, ...@@ -628,38 +645,36 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
* standbys. * standbys.
*/ */
static void static void
SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr,
SyncRepStandbyData *sync_standbys,
int num_standbys,
uint8 nth)
{ {
ListCell *cell;
XLogRecPtr *write_array; XLogRecPtr *write_array;
XLogRecPtr *flush_array; XLogRecPtr *flush_array;
XLogRecPtr *apply_array; XLogRecPtr *apply_array;
int len; int i;
int i = 0;
len = list_length(sync_standbys); /* Should have enough candidates, or somebody messed up */
write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); Assert(nth > 0 && nth <= num_standbys);
flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
foreach(cell, sync_standbys) write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
{ flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
SpinLockAcquire(&walsnd->mutex); for (i = 0; i < num_standbys; i++)
write_array[i] = walsnd->write; {
flush_array[i] = walsnd->flush; write_array[i] = sync_standbys[i].write;
apply_array[i] = walsnd->apply; flush_array[i] = sync_standbys[i].flush;
SpinLockRelease(&walsnd->mutex); apply_array[i] = sync_standbys[i].apply;
i++;
} }
/* Sort each array in descending order */ /* Sort each array in descending order */
qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
/* Get Nth latest Write, Flush, Apply positions */ /* Get Nth latest Write, Flush, Apply positions */
*writePtr = write_array[nth - 1]; *writePtr = write_array[nth - 1];
...@@ -689,147 +704,49 @@ cmp_lsn(const void *a, const void *b) ...@@ -689,147 +704,49 @@ cmp_lsn(const void *a, const void *b)
} }
/* /*
* Return the list of sync standbys, or NIL if no sync standby is connected. * Return data about walsenders that are candidates to be sync standbys.
* *
* The caller must hold SyncRepLock. * *standbys is set to a palloc'd array of structs of per-walsender data,
* * and the number of valid entries (candidate sync senders) is returned.
* On return, *am_sync is set to true if this walsender is connecting to * (This might be more or fewer than num_sync; caller must check.)
* sync standby. Otherwise it's set to false.
*/ */
List * int
SyncRepGetSyncStandbys(bool *am_sync) SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
{ {
Assert(LWLockHeldByMe(SyncRepLock)); int i;
int n;
/* Set default result */ /* Create result array */
if (am_sync != NULL) *standbys = (SyncRepStandbyData *)
*am_sync = false; palloc(max_wal_senders * sizeof(SyncRepStandbyData));
/* Quick exit if sync replication is not requested */ /* Quick exit if sync replication is not requested */
if (SyncRepConfig == NULL) if (SyncRepConfig == NULL)
return NIL; return 0;
return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
SyncRepGetSyncStandbysPriority(am_sync) :
SyncRepGetSyncStandbysQuorum(am_sync);
}
/*
* Return the list of all the candidates for quorum sync standbys,
* or NIL if no such standby is connected.
*
* The caller must hold SyncRepLock. This function must be called only in
* a quorum-based sync replication.
*
* On return, *am_sync is set to true if this walsender is connecting to
* sync standby. Otherwise it's set to false.
*/
static List *
SyncRepGetSyncStandbysQuorum(bool *am_sync)
{
List *result = NIL;
int i;
volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
* rearrangement */
Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
/* Collect raw data from shared memory */
n = 0;
for (i = 0; i < max_wal_senders; i++) for (i = 0; i < max_wal_senders; i++)
{ {
XLogRecPtr flush;
WalSndState state;
int pid;
walsnd = &WalSndCtl->walsnds[i];
SpinLockAcquire(&walsnd->mutex);
pid = walsnd->pid;
flush = walsnd->flush;
state = walsnd->state;
SpinLockRelease(&walsnd->mutex);
/* Must be active */
if (pid == 0)
continue;
/* Must be streaming or stopping */
if (state != WALSNDSTATE_STREAMING &&
state != WALSNDSTATE_STOPPING)
continue;
/* Must be synchronous */
if (walsnd->sync_standby_priority == 0)
continue;
/* Must have a valid flush position */
if (XLogRecPtrIsInvalid(flush))
continue;
/*
* Consider this standby as a candidate for quorum sync standbys and
* append it to the result.
*/
result = lappend_int(result, i);
if (am_sync != NULL && walsnd == MyWalSnd)
*am_sync = true;
}
return result;
}
/*
* Return the list of sync standbys chosen based on their priorities,
* or NIL if no sync standby is connected.
*
* If there are multiple standbys with the same priority,
* the first one found is selected preferentially.
*
* The caller must hold SyncRepLock. This function must be called only in
* a priority-based sync replication.
*
* On return, *am_sync is set to true if this walsender is connecting to
* sync standby. Otherwise it's set to false.
*/
static List *
SyncRepGetSyncStandbysPriority(bool *am_sync)
{
List *result = NIL;
List *pending = NIL;
int lowest_priority;
int next_highest_priority;
int this_priority;
int priority;
int i;
bool am_in_pending = false;
volatile WalSnd *walsnd; /* Use volatile pointer to prevent code volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
* rearrangement */ * rearrangement */
SyncRepStandbyData *stby;
Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); WalSndState state; /* not included in SyncRepStandbyData */
lowest_priority = SyncRepConfig->nmembers;
next_highest_priority = lowest_priority + 1;
/*
* Find the sync standbys which have the highest priority (i.e, 1). Also
* store all the other potential sync standbys into the pending list, in
* order to scan it later and find other sync standbys from it quickly.
*/
for (i = 0; i < max_wal_senders; i++)
{
XLogRecPtr flush;
WalSndState state;
int pid;
walsnd = &WalSndCtl->walsnds[i]; walsnd = &WalSndCtl->walsnds[i];
stby = *standbys + n;
SpinLockAcquire(&walsnd->mutex); SpinLockAcquire(&walsnd->mutex);
pid = walsnd->pid; stby->pid = walsnd->pid;
flush = walsnd->flush;
state = walsnd->state; state = walsnd->state;
stby->write = walsnd->write;
stby->flush = walsnd->flush;
stby->apply = walsnd->apply;
stby->sync_standby_priority = walsnd->sync_standby_priority;
SpinLockRelease(&walsnd->mutex); SpinLockRelease(&walsnd->mutex);
/* Must be active */ /* Must be active */
if (pid == 0) if (stby->pid == 0)
continue; continue;
/* Must be streaming or stopping */ /* Must be streaming or stopping */
...@@ -838,120 +755,59 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) ...@@ -838,120 +755,59 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
continue; continue;
/* Must be synchronous */ /* Must be synchronous */
this_priority = walsnd->sync_standby_priority; if (stby->sync_standby_priority == 0)
if (this_priority == 0)
continue; continue;
/* Must have a valid flush position */ /* Must have a valid flush position */
if (XLogRecPtrIsInvalid(flush)) if (XLogRecPtrIsInvalid(stby->flush))
continue; continue;
/* /* OK, it's a candidate */
* If the priority is equal to 1, consider this standby as sync and stby->walsnd_index = i;
* append it to the result. Otherwise append this standby to the stby->is_me = (walsnd == MyWalSnd);
* pending list to check if it's actually sync or not later. n++;
*/
if (this_priority == 1)
{
result = lappend_int(result, i);
if (am_sync != NULL && walsnd == MyWalSnd)
*am_sync = true;
if (list_length(result) == SyncRepConfig->num_sync)
{
list_free(pending);
return result; /* Exit if got enough sync standbys */
}
} }
else
{
pending = lappend_int(pending, i);
if (am_sync != NULL && walsnd == MyWalSnd)
am_in_pending = true;
/* /*
* Track the highest priority among the standbys in the pending * In quorum mode, we return all the candidates. In priority mode, if we
* list, in order to use it as the starting priority for later * have too many candidates then return only the num_sync ones of highest
* scan of the list. This is useful to find quickly the sync * priority.
* standbys from the pending list later because we can skip
* unnecessary scans for the unused priorities.
*/ */
if (this_priority < next_highest_priority) if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
next_highest_priority = this_priority; n > SyncRepConfig->num_sync)
}
}
/*
* Consider all pending standbys as sync if the number of them plus
* already-found sync ones is lower than the configuration requests.
*/
if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
{ {
/* /* Sort by priority ... */
* Set *am_sync to true if this walsender is in the pending list qsort(*standbys, n, sizeof(SyncRepStandbyData),
* because all pending standbys are considered as sync. standby_priority_comparator);
*/ /* ... then report just the first num_sync ones */
if (am_sync != NULL && !(*am_sync)) n = SyncRepConfig->num_sync;
*am_sync = am_in_pending;
result = list_concat(result, pending);
list_free(pending);
return result;
} }
/* return n;
* Find the sync standbys from the pending list. }
*/
priority = next_highest_priority;
while (priority <= lowest_priority)
{
ListCell *cell;
next_highest_priority = lowest_priority + 1;
foreach(cell, pending)
{
i = lfirst_int(cell);
walsnd = &WalSndCtl->walsnds[i];
this_priority = walsnd->sync_standby_priority;
if (this_priority == priority)
{
result = lappend_int(result, i);
if (am_sync != NULL && walsnd == MyWalSnd)
*am_sync = true;
/* /*
* We should always exit here after the scan of pending list * qsort comparator to sort SyncRepStandbyData entries by priority
* starts because we know that the list has enough elements to
* reach SyncRepConfig->num_sync.
*/ */
if (list_length(result) == SyncRepConfig->num_sync) static int
{ standby_priority_comparator(const void *a, const void *b)
list_free(pending); {
return result; /* Exit if got enough sync standbys */ const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
} const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
/* First, sort by increasing priority value */
if (sa->sync_standby_priority != sb->sync_standby_priority)
return sa->sync_standby_priority - sb->sync_standby_priority;
/* /*
* Remove the entry for this sync standby from the list to * We might have equal priority values; arbitrarily break ties by position
* prevent us from looking at the same entry again. * in the WALSnd array. (This is utterly bogus, since that is arrival
* order dependent, but there are regression tests that rely on it.)
*/ */
pending = foreach_delete_current(pending, cell); return sa->walsnd_index - sb->walsnd_index;
continue; /* don't adjust next_highest_priority */
}
if (this_priority < next_highest_priority)
next_highest_priority = this_priority;
}
priority = next_highest_priority;
}
/* never reached, but keep compiler quiet */
Assert(false);
return result;
} }
/* /*
* Check if we are in the list of sync standbys, and if so, determine * Check if we are in the list of sync standbys, and if so, determine
* priority sequence. Return priority if set, or zero to indicate that * priority sequence. Return priority if set, or zero to indicate that
......
...@@ -2375,14 +2375,16 @@ InitWalSenderSlot(void) ...@@ -2375,14 +2375,16 @@ InitWalSenderSlot(void)
* Found a free slot. Reserve it for us. * Found a free slot. Reserve it for us.
*/ */
walsnd->pid = MyProcPid; walsnd->pid = MyProcPid;
walsnd->state = WALSNDSTATE_STARTUP;
walsnd->sentPtr = InvalidXLogRecPtr; walsnd->sentPtr = InvalidXLogRecPtr;
walsnd->needreload = false;
walsnd->write = InvalidXLogRecPtr; walsnd->write = InvalidXLogRecPtr;
walsnd->flush = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr;
walsnd->apply = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr;
walsnd->writeLag = -1; walsnd->writeLag = -1;
walsnd->flushLag = -1; walsnd->flushLag = -1;
walsnd->applyLag = -1; walsnd->applyLag = -1;
walsnd->state = WALSNDSTATE_STARTUP; walsnd->sync_standby_priority = 0;
walsnd->latch = &MyProc->procLatch; walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0; walsnd->replyTime = 0;
walsnd->spillTxns = 0; walsnd->spillTxns = 0;
...@@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Tuplestorestate *tupstore; Tuplestorestate *tupstore;
MemoryContext per_query_ctx; MemoryContext per_query_ctx;
MemoryContext oldcontext; MemoryContext oldcontext;
List *sync_standbys; SyncRepStandbyData *sync_standbys;
int num_standbys;
int i; int i;
/* check to see if caller supports us returning a tuplestore */ /* check to see if caller supports us returning a tuplestore */
...@@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
/* /*
* Get the currently active synchronous standbys. * Get the currently active synchronous standbys. This could be out of
* date before we're done, but we'll use the data anyway.
*/ */
LWLockAcquire(SyncRepLock, LW_SHARED); num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
sync_standbys = SyncRepGetSyncStandbys(NULL);
LWLockRelease(SyncRepLock);
for (i = 0; i < max_wal_senders; i++) for (i = 0; i < max_wal_senders; i++)
{ {
...@@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
int64 spillTxns; int64 spillTxns;
int64 spillCount; int64 spillCount;
int64 spillBytes; int64 spillBytes;
bool is_sync_standby;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
int j;
/* Collect data from shared memory */
SpinLockAcquire(&walsnd->mutex); SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid == 0) if (walsnd->pid == 0)
{ {
...@@ -3311,6 +3316,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -3311,6 +3316,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
spillBytes = walsnd->spillBytes; spillBytes = walsnd->spillBytes;
SpinLockRelease(&walsnd->mutex); SpinLockRelease(&walsnd->mutex);
/*
* Detect whether walsender is/was considered synchronous. We can
* provide some protection against stale data by checking the PID
* along with walsnd_index.
*/
is_sync_standby = false;
for (j = 0; j < num_standbys; j++)
{
if (sync_standbys[j].walsnd_index == i &&
sync_standbys[j].pid == pid)
{
is_sync_standby = true;
break;
}
}
memset(nulls, 0, sizeof(nulls)); memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(pid); values[0] = Int32GetDatum(pid);
...@@ -3380,7 +3401,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -3380,7 +3401,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*/ */
if (priority == 0) if (priority == 0)
values[10] = CStringGetTextDatum("async"); values[10] = CStringGetTextDatum("async");
else if (list_member_int(sync_standbys, i)) else if (is_sync_standby)
values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
else else
......
...@@ -36,6 +36,24 @@ ...@@ -36,6 +36,24 @@
#define SYNC_REP_PRIORITY 0 #define SYNC_REP_PRIORITY 0
#define SYNC_REP_QUORUM 1 #define SYNC_REP_QUORUM 1
/*
* SyncRepGetCandidateStandbys returns an array of these structs,
* one per candidate synchronous walsender.
*/
typedef struct SyncRepStandbyData
{
/* Copies of relevant fields from WalSnd shared-memory struct */
pid_t pid;
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
int sync_standby_priority;
/* Index of this walsender in the WalSnd shared-memory array */
int walsnd_index;
/* This flag indicates whether this struct is about our own process */
bool is_me;
} SyncRepStandbyData;
/* /*
* Struct for the configuration of synchronous replication. * Struct for the configuration of synchronous replication.
* *
...@@ -74,7 +92,7 @@ extern void SyncRepInitConfig(void); ...@@ -74,7 +92,7 @@ extern void SyncRepInitConfig(void);
extern void SyncRepReleaseWaiters(void); extern void SyncRepReleaseWaiters(void);
/* called by wal sender and user backend */ /* called by wal sender and user backend */
extern List *SyncRepGetSyncStandbys(bool *am_sync); extern int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
/* called by checkpointer */ /* called by checkpointer */
extern void SyncRepUpdateSyncStandbysDefined(void); extern void SyncRepUpdateSyncStandbysDefined(void);
......
...@@ -31,8 +31,7 @@ typedef enum WalSndState ...@@ -31,8 +31,7 @@ typedef enum WalSndState
/* /*
* Each walsender has a WalSnd struct in shared memory. * Each walsender has a WalSnd struct in shared memory.
* *
* This struct is protected by 'mutex', with two exceptions: one is * This struct is protected by its 'mutex' spinlock field, except that some
* sync_standby_priority as noted below. The other exception is that some
* members are only written by the walsender process itself, and thus that * members are only written by the walsender process itself, and thus that
* process is free to read those members without holding spinlock. pid and * process is free to read those members without holding spinlock. pid and
* needreload always require the spinlock to be held for all accesses. * needreload always require the spinlock to be held for all accesses.
...@@ -60,6 +59,12 @@ typedef struct WalSnd ...@@ -60,6 +59,12 @@ typedef struct WalSnd
TimeOffset flushLag; TimeOffset flushLag;
TimeOffset applyLag; TimeOffset applyLag;
/*
* The priority order of the standby managed by this WALSender, as listed
* in synchronous_standby_names, or 0 if not-listed.
*/
int sync_standby_priority;
/* Protects shared variables shown above. */ /* Protects shared variables shown above. */
slock_t mutex; slock_t mutex;
...@@ -69,13 +74,6 @@ typedef struct WalSnd ...@@ -69,13 +74,6 @@ typedef struct WalSnd
*/ */
Latch *latch; Latch *latch;
/*
* The priority order of the standby managed by this WALSender, as listed
* in synchronous_standby_names, or 0 if not-listed. Protected by
* SyncRepLock.
*/
int sync_standby_priority;
/* /*
* Timestamp of the last message received from standby. * Timestamp of the last message received from standby.
*/ */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment