Commit d140f2f3 authored by Thomas Munro's avatar Thomas Munro

Rationalize GetWalRcv{Write,Flush}RecPtr().

GetWalRcvWriteRecPtr() previously reported the latest *flushed*
location.  Adopt the conventional terminology used elsewhere in the tree
by renaming it to GetWalRcvFlushRecPtr(), and likewise for some related
variables that used the term "received".

Add a new definition of GetWalRcvWriteRecPtr(), which returns the latest
*written* value.  This will allow later patches to use the value for
non-data-integrity purposes, without having to wait for the flush
pointer to advance.
Reviewed-by: default avatarAlvaro Herrera <alvherre@2ndquadrant.com>
Reviewed-by: default avatarAndres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
parent 83fd4532
...@@ -208,8 +208,8 @@ HotStandbyState standbyState = STANDBY_DISABLED; ...@@ -208,8 +208,8 @@ HotStandbyState standbyState = STANDBY_DISABLED;
static XLogRecPtr LastRec; static XLogRecPtr LastRec;
/* Local copy of WalRcv->receivedUpto */ /* Local copy of WalRcv->flushedUpto */
static XLogRecPtr receivedUpto = 0; static XLogRecPtr flushedUpto = 0;
static TimeLineID receiveTLI = 0; static TimeLineID receiveTLI = 0;
/* /*
...@@ -9363,7 +9363,7 @@ CreateRestartPoint(int flags) ...@@ -9363,7 +9363,7 @@ CreateRestartPoint(int flags)
* Retreat _logSegNo using the current end of xlog replayed or received, * Retreat _logSegNo using the current end of xlog replayed or received,
* whichever is later. * whichever is later.
*/ */
receivePtr = GetWalRcvWriteRecPtr(NULL, NULL); receivePtr = GetWalRcvFlushRecPtr(NULL, NULL);
replayPtr = GetXLogReplayRecPtr(&replayTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI);
endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
KeepLogSeg(endptr, &_logSegNo); KeepLogSeg(endptr, &_logSegNo);
...@@ -11856,7 +11856,7 @@ retry: ...@@ -11856,7 +11856,7 @@ retry:
/* See if we need to retrieve more data */ /* See if we need to retrieve more data */
if (readFile < 0 || if (readFile < 0 ||
(readSource == XLOG_FROM_STREAM && (readSource == XLOG_FROM_STREAM &&
receivedUpto < targetPagePtr + reqLen)) flushedUpto < targetPagePtr + reqLen))
{ {
if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
private->randAccess, private->randAccess,
...@@ -11887,10 +11887,10 @@ retry: ...@@ -11887,10 +11887,10 @@ retry:
*/ */
if (readSource == XLOG_FROM_STREAM) if (readSource == XLOG_FROM_STREAM)
{ {
if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ)) if (((targetPagePtr) / XLOG_BLCKSZ) != (flushedUpto / XLOG_BLCKSZ))
readLen = XLOG_BLCKSZ; readLen = XLOG_BLCKSZ;
else else
readLen = XLogSegmentOffset(receivedUpto, wal_segment_size) - readLen = XLogSegmentOffset(flushedUpto, wal_segment_size) -
targetPageOff; targetPageOff;
} }
else else
...@@ -12305,7 +12305,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -12305,7 +12305,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
RequestXLogStreaming(tli, ptr, PrimaryConnInfo, RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
PrimarySlotName, PrimarySlotName,
wal_receiver_create_temp_slot); wal_receiver_create_temp_slot);
receivedUpto = 0; flushedUpto = 0;
} }
/* /*
...@@ -12329,14 +12329,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -12329,14 +12329,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
* XLogReceiptTime will not advance, so the grace time * XLogReceiptTime will not advance, so the grace time
* allotted to conflicting queries will decrease. * allotted to conflicting queries will decrease.
*/ */
if (RecPtr < receivedUpto) if (RecPtr < flushedUpto)
havedata = true; havedata = true;
else else
{ {
XLogRecPtr latestChunkStart; XLogRecPtr latestChunkStart;
receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI); flushedUpto = GetWalRcvFlushRecPtr(&latestChunkStart, &receiveTLI);
if (RecPtr < receivedUpto && receiveTLI == curFileTLI) if (RecPtr < flushedUpto && receiveTLI == curFileTLI)
{ {
havedata = true; havedata = true;
if (latestChunkStart <= RecPtr) if (latestChunkStart <= RecPtr)
......
...@@ -398,7 +398,7 @@ pg_last_wal_receive_lsn(PG_FUNCTION_ARGS) ...@@ -398,7 +398,7 @@ pg_last_wal_receive_lsn(PG_FUNCTION_ARGS)
{ {
XLogRecPtr recptr; XLogRecPtr recptr;
recptr = GetWalRcvWriteRecPtr(NULL, NULL); recptr = GetWalRcvFlushRecPtr(NULL, NULL);
if (recptr == 0) if (recptr == 0)
PG_RETURN_NULL(); PG_RETURN_NULL();
......
...@@ -54,7 +54,7 @@ and WalRcvData->slotname, and initializes the starting point in ...@@ -54,7 +54,7 @@ and WalRcvData->slotname, and initializes the starting point in
WalRcvData->receiveStart. WalRcvData->receiveStart.
As walreceiver receives WAL from the master server, and writes and flushes As walreceiver receives WAL from the master server, and writes and flushes
it to disk (in pg_wal), it updates WalRcvData->receivedUpto and signals it to disk (in pg_wal), it updates WalRcvData->flushedUpto and signals
the startup process to know how far WAL replay can advance. the startup process to know how far WAL replay can advance.
Walreceiver sends information about replication progress to the master server Walreceiver sends information about replication progress to the master server
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* in the primary server), and then keeps receiving XLOG records and * in the primary server), and then keeps receiving XLOG records and
* writing them to the disk as long as the connection is alive. As XLOG * writing them to the disk as long as the connection is alive. As XLOG
* records are received and flushed to disk, it updates the * records are received and flushed to disk, it updates the
* WalRcv->receivedUpto variable in shared memory, to inform the startup * WalRcv->flushedUpto variable in shared memory, to inform the startup
* process of how far it can proceed with XLOG replay. * process of how far it can proceed with XLOG replay.
* *
* A WAL receiver cannot directly load GUC parameters used when establishing * A WAL receiver cannot directly load GUC parameters used when establishing
...@@ -261,6 +261,8 @@ WalReceiverMain(void) ...@@ -261,6 +261,8 @@ WalReceiverMain(void)
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
/* Arrange to clean up at walreceiver exit */ /* Arrange to clean up at walreceiver exit */
on_shmem_exit(WalRcvDie, 0); on_shmem_exit(WalRcvDie, 0);
...@@ -984,6 +986,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) ...@@ -984,6 +986,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
LogstreamResult.Write = recptr; LogstreamResult.Write = recptr;
} }
/* Update shared-memory status */
pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
} }
/* /*
...@@ -1005,10 +1010,10 @@ XLogWalRcvFlush(bool dying) ...@@ -1005,10 +1010,10 @@ XLogWalRcvFlush(bool dying)
/* Update shared-memory status */ /* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex); SpinLockAcquire(&walrcv->mutex);
if (walrcv->receivedUpto < LogstreamResult.Flush) if (walrcv->flushedUpto < LogstreamResult.Flush)
{ {
walrcv->latestChunkStart = walrcv->receivedUpto; walrcv->latestChunkStart = walrcv->flushedUpto;
walrcv->receivedUpto = LogstreamResult.Flush; walrcv->flushedUpto = LogstreamResult.Flush;
walrcv->receivedTLI = ThisTimeLineID; walrcv->receivedTLI = ThisTimeLineID;
} }
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
...@@ -1361,7 +1366,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) ...@@ -1361,7 +1366,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
state = WalRcv->walRcvState; state = WalRcv->walRcvState;
receive_start_lsn = WalRcv->receiveStart; receive_start_lsn = WalRcv->receiveStart;
receive_start_tli = WalRcv->receiveStartTLI; receive_start_tli = WalRcv->receiveStartTLI;
received_lsn = WalRcv->receivedUpto; received_lsn = WalRcv->flushedUpto;
received_tli = WalRcv->receivedTLI; received_tli = WalRcv->receivedTLI;
last_send_time = WalRcv->lastMsgSendTime; last_send_time = WalRcv->lastMsgSendTime;
last_receipt_time = WalRcv->lastMsgReceiptTime; last_receipt_time = WalRcv->lastMsgReceiptTime;
......
...@@ -282,11 +282,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, ...@@ -282,11 +282,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
/* /*
* If this is the first startup of walreceiver (on this timeline), * If this is the first startup of walreceiver (on this timeline),
* initialize receivedUpto and latestChunkStart to the starting point. * initialize flushedUpto and latestChunkStart to the starting point.
*/ */
if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
{ {
walrcv->receivedUpto = recptr; walrcv->flushedUpto = recptr;
walrcv->receivedTLI = tli; walrcv->receivedTLI = tli;
walrcv->latestChunkStart = recptr; walrcv->latestChunkStart = recptr;
} }
...@@ -304,7 +304,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, ...@@ -304,7 +304,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
} }
/* /*
* Returns the last+1 byte position that walreceiver has written. * Returns the last+1 byte position that walreceiver has flushed.
* *
* Optionally, returns the previous chunk start, that is the first byte * Optionally, returns the previous chunk start, that is the first byte
* written in the most recent walreceiver flush cycle. Callers not * written in the most recent walreceiver flush cycle. Callers not
...@@ -312,13 +312,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, ...@@ -312,13 +312,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
* receiveTLI. * receiveTLI.
*/ */
XLogRecPtr XLogRecPtr
GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
{ {
WalRcvData *walrcv = WalRcv; WalRcvData *walrcv = WalRcv;
XLogRecPtr recptr; XLogRecPtr recptr;
SpinLockAcquire(&walrcv->mutex); SpinLockAcquire(&walrcv->mutex);
recptr = walrcv->receivedUpto; recptr = walrcv->flushedUpto;
if (latestChunkStart) if (latestChunkStart)
*latestChunkStart = walrcv->latestChunkStart; *latestChunkStart = walrcv->latestChunkStart;
if (receiveTLI) if (receiveTLI)
...@@ -328,6 +328,18 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) ...@@ -328,6 +328,18 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
return recptr; return recptr;
} }
/*
* Returns the last+1 byte position that walreceiver has written.
* This returns a recently written value without taking a lock.
*/
XLogRecPtr
GetWalRcvWriteRecPtr(void)
{
WalRcvData *walrcv = WalRcv;
return pg_atomic_read_u64(&walrcv->writtenUpto);
}
/* /*
* Returns the replication apply delay in ms or -1 * Returns the replication apply delay in ms or -1
* if the apply delay info is not available * if the apply delay info is not available
...@@ -345,7 +357,7 @@ GetReplicationApplyDelay(void) ...@@ -345,7 +357,7 @@ GetReplicationApplyDelay(void)
TimestampTz chunkReplayStartTime; TimestampTz chunkReplayStartTime;
SpinLockAcquire(&walrcv->mutex); SpinLockAcquire(&walrcv->mutex);
receivePtr = walrcv->receivedUpto; receivePtr = walrcv->flushedUpto;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
replayPtr = GetXLogReplayRecPtr(NULL); replayPtr = GetXLogReplayRecPtr(NULL);
......
...@@ -2949,7 +2949,7 @@ GetStandbyFlushRecPtr(void) ...@@ -2949,7 +2949,7 @@ GetStandbyFlushRecPtr(void)
* has streamed, but hasn't been replayed yet. * has streamed, but hasn't been replayed yet.
*/ */
receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI); receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI);
ThisTimeLineID = replayTLI; ThisTimeLineID = replayTLI;
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "access/xlogdefs.h" #include "access/xlogdefs.h"
#include "getaddrinfo.h" /* for NI_MAXHOST */ #include "getaddrinfo.h" /* for NI_MAXHOST */
#include "pgtime.h" #include "pgtime.h"
#include "port/atomics.h"
#include "replication/logicalproto.h" #include "replication/logicalproto.h"
#include "replication/walsender.h" #include "replication/walsender.h"
#include "storage/latch.h" #include "storage/latch.h"
...@@ -73,19 +74,19 @@ typedef struct ...@@ -73,19 +74,19 @@ typedef struct
TimeLineID receiveStartTLI; TimeLineID receiveStartTLI;
/* /*
* receivedUpto-1 is the last byte position that has already been * flushedUpto-1 is the last byte position that has already been
* received, and receivedTLI is the timeline it came from. At the first * received, and receivedTLI is the timeline it came from. At the first
* startup of walreceiver, these are set to receiveStart and * startup of walreceiver, these are set to receiveStart and
* receiveStartTLI. After that, walreceiver updates these whenever it * receiveStartTLI. After that, walreceiver updates these whenever it
* flushes the received WAL to disk. * flushes the received WAL to disk.
*/ */
XLogRecPtr receivedUpto; XLogRecPtr flushedUpto;
TimeLineID receivedTLI; TimeLineID receivedTLI;
/* /*
* latestChunkStart is the starting byte position of the current "batch" * latestChunkStart is the starting byte position of the current "batch"
* of received WAL. It's actually the same as the previous value of * of received WAL. It's actually the same as the previous value of
* receivedUpto before the last flush to disk. Startup process can use * flushedUpto before the last flush to disk. Startup process can use
* this to detect whether it's keeping up or not. * this to detect whether it's keeping up or not.
*/ */
XLogRecPtr latestChunkStart; XLogRecPtr latestChunkStart;
...@@ -141,6 +142,14 @@ typedef struct ...@@ -141,6 +142,14 @@ typedef struct
slock_t mutex; /* locks shared variables shown above */ slock_t mutex; /* locks shared variables shown above */
/*
* Like flushedUpto, but advanced after writing and before flushing,
* without the need to acquire the spin lock. Data can be read by another
* process up to this point, but shouldn't be used for data integrity
* purposes.
*/
pg_atomic_uint64 writtenUpto;
/* /*
* force walreceiver reply? This doesn't need to be locked; memory * force walreceiver reply? This doesn't need to be locked; memory
* barriers for ordering are sufficient. But we do need atomic fetch and * barriers for ordering are sufficient. But we do need atomic fetch and
...@@ -322,7 +331,8 @@ extern bool WalRcvRunning(void); ...@@ -322,7 +331,8 @@ extern bool WalRcvRunning(void);
extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
const char *conninfo, const char *slotname, const char *conninfo, const char *slotname,
bool create_temp_slot); bool create_temp_slot);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
extern XLogRecPtr GetWalRcvWriteRecPtr(void);
extern int GetReplicationApplyDelay(void); extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void); extern int GetReplicationTransferLatency(void);
extern void WalRcvForceReply(void); extern void WalRcvForceReply(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment