Commit 572d6ee6 authored by Alvaro Herrera's avatar Alvaro Herrera

Fix locking in WAL receiver/sender shmem state structs

In WAL receiver and WAL server, some accesses to their corresponding
shared memory control structs were done without holding any kind of
lock, which could lead to inconsistent and possibly insecure results.

In walsender, fix by clarifying the locking rules and following them
correctly, as documented in the new comment in walsender_private.h;
namely that some members can be read in walsender itself without a lock,
because the only writes occur in the same process.  The rest of the
struct requires spinlock for accesses, as usual.

In walreceiver, fix by always holding spinlock while accessing the
struct.

While there is potentially a problem in all branches, it is minor in
stable ones.  This only became a real problem in pg10 because of quorum
commit in synchronous replication (commit 3901fd70), and a potential
security problem in walreceiver because a superuser() check was removed
by default monitoring roles (commit 25fff407).  Thus, no backpatch.

In passing, clean up some leftover braces which were used to create
unconditional blocks.  Once upon a time these were used for
volatile-izing accesses to those shmem structs, which is no longer
required.  Many other occurrences of this pattern remain.

Author: Michaël Paquier
Reported-by: Michaël Paquier
Reviewed-by: Masahiko Sawada, Kyotaro Horiguchi, Thomas Munro,
	Robert Haas
Discussion: https://postgr.es/m/CAB7nPqTWYqtzD=LN_oDaf9r-hAjUEPAy0B9yRkhcsLdRN8fzrw@mail.gmail.com
parent 898d24ae
......@@ -711,14 +711,24 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
for (i = 0; i < max_wal_senders; i++)
{
XLogRecPtr flush;
WalSndState state;
int pid;
walsnd = &WalSndCtl->walsnds[i];
SpinLockAcquire(&walsnd->mutex);
pid = walsnd->pid;
flush = walsnd->flush;
state = walsnd->state;
SpinLockRelease(&walsnd->mutex);
/* Must be active */
if (walsnd->pid == 0)
if (pid == 0)
continue;
/* Must be streaming */
if (walsnd->state != WALSNDSTATE_STREAMING)
if (state != WALSNDSTATE_STREAMING)
continue;
/* Must be synchronous */
......@@ -726,7 +736,7 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
continue;
/* Must have a valid flush position */
if (XLogRecPtrIsInvalid(walsnd->flush))
if (XLogRecPtrIsInvalid(flush))
continue;
/*
......@@ -780,14 +790,24 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
*/
for (i = 0; i < max_wal_senders; i++)
{
XLogRecPtr flush;
WalSndState state;
int pid;
walsnd = &WalSndCtl->walsnds[i];
SpinLockAcquire(&walsnd->mutex);
pid = walsnd->pid;
flush = walsnd->flush;
state = walsnd->state;
SpinLockRelease(&walsnd->mutex);
/* Must be active */
if (walsnd->pid == 0)
if (pid == 0)
continue;
/* Must be streaming */
if (walsnd->state != WALSNDSTATE_STREAMING)
if (state != WALSNDSTATE_STREAMING)
continue;
/* Must be synchronous */
......@@ -796,7 +816,7 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
continue;
/* Must have a valid flush position */
if (XLogRecPtrIsInvalid(walsnd->flush))
if (XLogRecPtrIsInvalid(flush))
continue;
/*
......
......@@ -1379,7 +1379,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
TupleDesc tupdesc;
Datum *values;
bool *nulls;
WalRcvData *walrcv = WalRcv;
int pid;
bool ready_to_display;
WalRcvState state;
XLogRecPtr receive_start_lsn;
TimeLineID receive_start_tli;
......@@ -1392,11 +1393,28 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
char *slotname;
char *conninfo;
/* Take a lock to ensure value consistency */
SpinLockAcquire(&WalRcv->mutex);
pid = (int) WalRcv->pid;
ready_to_display = WalRcv->ready_to_display;
state = WalRcv->walRcvState;
receive_start_lsn = WalRcv->receiveStart;
receive_start_tli = WalRcv->receiveStartTLI;
received_lsn = WalRcv->receivedUpto;
received_tli = WalRcv->receivedTLI;
last_send_time = WalRcv->lastMsgSendTime;
last_receipt_time = WalRcv->lastMsgReceiptTime;
latest_end_lsn = WalRcv->latestWalEnd;
latest_end_time = WalRcv->latestWalEndTime;
slotname = pstrdup(WalRcv->slotname);
conninfo = pstrdup(WalRcv->conninfo);
SpinLockRelease(&WalRcv->mutex);
/*
* No WAL receiver (or not ready yet), just return a tuple with NULL
* values
*/
if (walrcv->pid == 0 || !walrcv->ready_to_display)
if (pid == 0 || !ready_to_display)
PG_RETURN_NULL();
/* determine result type */
......@@ -1406,23 +1424,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
values = palloc0(sizeof(Datum) * tupdesc->natts);
nulls = palloc0(sizeof(bool) * tupdesc->natts);
/* Take a lock to ensure value consistency */
SpinLockAcquire(&walrcv->mutex);
state = walrcv->walRcvState;
receive_start_lsn = walrcv->receiveStart;
receive_start_tli = walrcv->receiveStartTLI;
received_lsn = walrcv->receivedUpto;
received_tli = walrcv->receivedTLI;
last_send_time = walrcv->lastMsgSendTime;
last_receipt_time = walrcv->lastMsgReceiptTime;
latest_end_lsn = walrcv->latestWalEnd;
latest_end_time = walrcv->latestWalEndTime;
slotname = pstrdup(walrcv->slotname);
conninfo = pstrdup(walrcv->conninfo);
SpinLockRelease(&walrcv->mutex);
/* Fetch values */
values[0] = Int32GetDatum(walrcv->pid);
values[0] = Int32GetDatum(pid);
if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
{
......@@ -1473,6 +1476,5 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
}
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(
heap_form_tuple(tupdesc, values, nulls)));
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
......@@ -668,13 +668,9 @@ StartReplication(StartReplicationCmd *cmd)
sentPtr = cmd->startpoint;
/* Initialize shared memory status, too */
{
WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->sentPtr = sentPtr;
SpinLockRelease(&walsnd->mutex);
}
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = sentPtr;
SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
......@@ -1093,13 +1089,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
sentPtr = MyReplicationSlot->data.confirmed_flush;
/* Also update the sent position status in shared memory */
{
WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->sentPtr = MyReplicationSlot->data.restart_lsn;
SpinLockRelease(&walsnd->mutex);
}
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
SpinLockRelease(&MyWalSnd->mutex);
replication_active = true;
......@@ -2892,10 +2884,12 @@ WalSndRqstFileReload(void)
{
WalSnd *walsnd = &WalSndCtl->walsnds[i];
SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid == 0)
{
SpinLockRelease(&walsnd->mutex);
continue;
SpinLockAcquire(&walsnd->mutex);
}
walsnd->needreload = true;
SpinLockRelease(&walsnd->mutex);
}
......@@ -3071,7 +3065,6 @@ WalSndWaitStopping(void)
for (i = 0; i < max_wal_senders; i++)
{
WalSndState state;
WalSnd *walsnd = &WalSndCtl->walsnds[i];
SpinLockAcquire(&walsnd->mutex);
......@@ -3082,14 +3075,13 @@ WalSndWaitStopping(void)
continue;
}
state = walsnd->state;
SpinLockRelease(&walsnd->mutex);
if (state != WALSNDSTATE_STOPPING)
if (walsnd->state != WALSNDSTATE_STOPPING)
{
all_stopped = false;
SpinLockRelease(&walsnd->mutex);
break;
}
SpinLockRelease(&walsnd->mutex);
}
/* safe to leave if confirmation is done for all WAL senders */
......@@ -3210,14 +3202,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
TimeOffset flushLag;
TimeOffset applyLag;
int priority;
int pid;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid == 0)
{
SpinLockRelease(&walsnd->mutex);
continue;
SpinLockAcquire(&walsnd->mutex);
}
pid = walsnd->pid;
sentPtr = walsnd->sentPtr;
state = walsnd->state;
write = walsnd->write;
......@@ -3230,7 +3226,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
SpinLockRelease(&walsnd->mutex);
memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(walsnd->pid);
values[0] = Int32GetDatum(pid);
if (!superuser())
{
......@@ -3265,7 +3261,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
* which always returns an invalid flush location, as an
* asynchronous standby.
*/
priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
if (writeLag < 0)
nulls[6] = true;
......
......@@ -114,6 +114,9 @@ typedef struct
*/
char slotname[NAMEDATALEN];
/* set true once conninfo is ready to display (obfuscated pwds etc) */
bool ready_to_display;
slock_t mutex; /* locks shared variables shown above */
/*
......@@ -122,9 +125,6 @@ typedef struct
*/
bool force_reply;
/* set true once conninfo is ready to display (obfuscated pwds etc) */
bool ready_to_display;
/*
* Latch used by startup process to wake up walreceiver after telling it
* where to start streaming (after setting receiveStart and
......
......@@ -30,10 +30,17 @@ typedef enum WalSndState
/*
* Each walsender has a WalSnd struct in shared memory.
*
* This struct is protected by 'mutex', with two exceptions: one is
* sync_standby_priority as noted below. The other exception is that some
* members are only written by the walsender process itself, and thus that
* process is free to read those members without holding spinlock. pid and
* needreload always require the spinlock to be held for all accesses.
*/
typedef struct WalSnd
{
pid_t pid; /* this walsender's process id, or 0 */
pid_t pid; /* this walsender's PID, or 0 if not active */
WalSndState state; /* this walsender's state */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
bool needreload; /* does currently-open file need to be
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment