Commit e174f699 authored by Michael Paquier's avatar Michael Paquier

Add some assertions in syncrep.c

A couple of routines assume that the LWLock SyncRepLock needs to be
taken, so add a couple of assertions to be sure of that.  Also, when
waiting for a given LSN at transaction commit, the code implied that the
syncrep queue cleanup happens while holding interrupts, but the code
never checked after that.

Author: Michael Paquier
Reviewed-by: Fujii Masao, Kyotaro Horiguchi, Dongming Liu
Discussion: https://postgr.es/m/a0806273-8bbb-43b3-bbe1-c45a58f6ae21.lingce.ldm@alibaba-inc.com
parent 20345197
...@@ -149,6 +149,13 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) ...@@ -149,6 +149,13 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
const char *old_status; const char *old_status;
int mode; int mode;
/*
* This should be called while holding interrupts during a transaction
* commit to prevent the follow-up shared memory queue cleanups to be
* influenced by external interruptions.
*/
Assert(InterruptHoldoffCount > 0);
/* Cap the level for anything other than commit to remote flush only. */ /* Cap the level for anything other than commit to remote flush only. */
if (commit) if (commit)
mode = SyncRepWaitMode; mode = SyncRepWaitMode;
...@@ -516,6 +523,8 @@ SyncRepReleaseWaiters(void) ...@@ -516,6 +523,8 @@ SyncRepReleaseWaiters(void)
/* /*
* Calculate the synced Write, Flush and Apply positions among sync standbys. * Calculate the synced Write, Flush and Apply positions among sync standbys.
* *
* The caller must hold SyncRepLock.
*
* Return false if the number of sync standbys is less than * Return false if the number of sync standbys is less than
* synchronous_standby_names specifies. Otherwise return true and * synchronous_standby_names specifies. Otherwise return true and
* store the positions into *writePtr, *flushPtr and *applyPtr. * store the positions into *writePtr, *flushPtr and *applyPtr.
...@@ -529,6 +538,8 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, ...@@ -529,6 +538,8 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
{ {
List *sync_standbys; List *sync_standbys;
Assert(LWLockHeldByMe(SyncRepLock));
*writePtr = InvalidXLogRecPtr; *writePtr = InvalidXLogRecPtr;
*flushPtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr;
*applyPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr;
...@@ -688,6 +699,8 @@ cmp_lsn(const void *a, const void *b) ...@@ -688,6 +699,8 @@ cmp_lsn(const void *a, const void *b)
List * List *
SyncRepGetSyncStandbys(bool *am_sync) SyncRepGetSyncStandbys(bool *am_sync)
{ {
Assert(LWLockHeldByMe(SyncRepLock));
/* Set default result */ /* Set default result */
if (am_sync != NULL) if (am_sync != NULL)
*am_sync = false; *am_sync = false;
...@@ -992,7 +1005,7 @@ SyncRepGetStandbyPriority(void) ...@@ -992,7 +1005,7 @@ SyncRepGetStandbyPriority(void)
* Pass all = true to wake whole queue; otherwise, just wake up to * Pass all = true to wake whole queue; otherwise, just wake up to
* the walsender's LSN. * the walsender's LSN.
* *
* Must hold SyncRepLock. * The caller must hold SyncRepLock in exclusive mode.
*/ */
static int static int
SyncRepWakeQueue(bool all, int mode) SyncRepWakeQueue(bool all, int mode)
...@@ -1003,6 +1016,7 @@ SyncRepWakeQueue(bool all, int mode) ...@@ -1003,6 +1016,7 @@ SyncRepWakeQueue(bool all, int mode)
int numprocs = 0; int numprocs = 0;
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
Assert(SyncRepQueueIsOrderedByLSN(mode)); Assert(SyncRepQueueIsOrderedByLSN(mode));
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment