Commit 9a20a9b2 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Improve scalability of WAL insertions.

This patch replaces WALInsertLock with a number of WAL insertion slots,
allowing multiple backends to insert WAL records to the WAL buffers
concurrently. This is particularly useful for parallel loading large amounts
of data on a system with many CPUs.

This has one user-visible change: switching to a new WAL segment with
pg_switch_xlog() now fills the remaining unused portion of the segment with
zeros. This potentially adds some overhead, but it has been a very common
practice by DBA's to clear the "tail" of the segment with an external
pg_clearxlogtail utility anyway, to make the WAL files compress better.
With this patch, it's no longer necessary to do that.

This patch adds a new GUC, xloginsert_slots, to tune the number of WAL
insertion slots. Performance testing suggests that the default, 8, works
pretty well for all kinds of worklods, but I left the GUC in place to allow
others with different hardware to test that easily. We might want to remove
that before release.

Reviewed by Andres Freund.
parent 5372275b
......@@ -41,6 +41,7 @@
#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/barrier.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
......@@ -83,6 +84,7 @@ int sync_method = DEFAULT_SYNC_METHOD;
int wal_level = WAL_LEVEL_MINIMAL;
int CommitDelay = 0; /* precommit delay in microseconds */
int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
int num_xloginsert_slots = 8;
#ifdef WAL_DEBUG
bool XLOG_DEBUG = false;
......@@ -279,8 +281,8 @@ XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
* (which is almost but not quite the same as a pointer to the most recent
* CHECKPOINT record). We update this from the shared-memory copy,
* XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
* hold the Insert lock). See XLogInsert for details. We are also allowed
* to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
* hold an insertion slot). See XLogInsert for details. We are also allowed
* to update from XLogCtl->RedoRecPtr if we hold the info_lck;
* see GetRedoRecPtr. A freshly spawned backend obtains the value during
* InitXLOGAccess.
*/
......@@ -321,7 +323,10 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
* so it's a plain spinlock. The other locks are held longer (potentially
* over I/O operations), so we use LWLocks for them. These locks are:
*
* WALInsertLock: must be held to insert a record into the WAL buffers.
* WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
* It is only held while initializing and changing the mapping. If the
* contents of the buffer being replaced haven't been written yet, the mapping
* lock is released while the write is done, and reacquired afterwards.
*
* WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
* XLogFlush).
......@@ -348,17 +353,83 @@ typedef struct XLogwrtResult
XLogRecPtr Flush; /* last byte + 1 flushed */
} XLogwrtResult;
/*
* A slot for inserting to the WAL. This is similar to an LWLock, the main
* difference is that there is an extra xlogInsertingAt field that is protected
* by the same mutex. Unlike an LWLock, a slot can only be acquired in
* exclusive mode.
*
* The xlogInsertingAt field is used to advertise to other processes how far
* the slot owner has progressed in inserting the record. When a backend
* acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't
* yet know where it's going to insert the record. That's conservative
* but correct; the new insertion is certainly going to go to a byte position
* greater than 1. If another backend needs to flush the WAL, it will have to
* wait for the new insertion. xlogInsertingAt is updated after finishing the
* insert or when crossing a page boundary, which will wake up anyone waiting
* for it, whether the wait was necessary in the first place or not.
*
* A process can wait on a slot in two modes: LW_EXCLUSIVE or
* LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is
* released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes
* waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is
* released, or xlogInsertingAt is updated. In other words, a process in
* LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress
* copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to
* the front of the queue, while LW_EXCLUSIVE waiters are appended to the end.
*
* To join the wait queue, a process must set MyProc->lwWaitMode to the mode
* it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head
* or tail of the wait queue. The same mechanism is used to wait on an LWLock,
* see lwlock.c for details.
*/
typedef struct
{
slock_t mutex; /* protects the below fields */
XLogRecPtr xlogInsertingAt; /* insert has completed up to this point */
PGPROC *owner; /* for debugging purposes */
bool releaseOK; /* T if ok to release waiters */
char exclusive; /* # of exclusive holders (0 or 1) */
PGPROC *head; /* head of list of waiting PGPROCs */
PGPROC *tail; /* tail of list of waiting PGPROCs */
/* tail is undefined when head is NULL */
} XLogInsertSlot;
/*
* All the slots are allocated as an array in shared memory. We force the
* array stride to be a power of 2, which saves a few cycles in indexing, but
* more importantly also ensures that individual slots don't cross cache line
* boundaries. (Of course, we have to also ensure that the array start
* address is suitably aligned.)
*/
typedef union XLogInsertSlotPadded
{
XLogInsertSlot slot;
char pad[64];
} XLogInsertSlotPadded;
/*
* Shared state data for XLogInsert.
*/
typedef struct XLogCtlInsert
{
XLogRecPtr PrevRecord; /* start of previously-inserted record */
int curridx; /* current block index in cache */
XLogPageHeader currpage; /* points to header of block in cache */
char *currpos; /* current insertion point in cache */
XLogRecPtr RedoRecPtr; /* current redo point for insertions */
bool forcePageWrites; /* forcing full-page writes for PITR? */
slock_t insertpos_lck; /* protects CurrBytePos and PrevBytePos */
/*
* CurrBytePos is the end of reserved WAL. The next record will be inserted
* at that position. PrevBytePos is the start position of the previously
* inserted (or rather, reserved) record - it is copied to the the prev-
* link of the next record. These are stored as "usable byte positions"
* rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
*/
uint64 CurrBytePos;
uint64 PrevBytePos;
/* insertion slots, see above for details */
XLogInsertSlotPadded *insertSlots;
/*
* fullPageWrites is the master copy used by all backends to determine
......@@ -366,7 +437,12 @@ typedef struct XLogCtlInsert
* This is required because, when full_page_writes is changed by SIGHUP,
* we must WAL-log it before it actually affects WAL-logging by backends.
* Checkpointer sets at startup or after SIGHUP.
*
* To read these fields, you must hold an insertion slot. To modify them,
* you must hold ALL the slots.
*/
XLogRecPtr RedoRecPtr; /* current redo point for insertions */
bool forcePageWrites; /* forcing full-page writes for PITR? */
bool fullPageWrites;
/*
......@@ -395,11 +471,11 @@ typedef struct XLogCtlWrite
*/
typedef struct XLogCtlData
{
/* Protected by WALInsertLock: */
XLogCtlInsert Insert;
/* Protected by info_lck: */
XLogwrtRqst LogwrtRqst;
XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */
uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */
TransactionId ckptXid;
XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */
......@@ -419,10 +495,21 @@ typedef struct XLogCtlData
*/
XLogwrtResult LogwrtResult;
/*
* Latest initialized block index in cache.
*
* To change curridx and the identity of a buffer, you need to hold
* WALBufMappingLock. To change the identity of a buffer that's still
* dirty, the old page needs to be written out first, and for that you
* need WALWriteLock, and you need to ensure that there are no in-progress
* insertions to the page by calling WaitXLogInsertionsToFinish().
*/
int curridx;
/*
* These values do not change after startup, although the pointed-to pages
* and xlblocks values certainly do. Permission to read/write the pages
* and xlblocks values depends on WALInsertLock and WALWriteLock.
* and xlblocks values certainly do. xlblock values are protected by
* WALBufMappingLock.
*/
char *pages; /* buffers for unwritten XLOG pages */
XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
......@@ -518,24 +605,34 @@ static XLogCtlData *XLogCtl = NULL;
static ControlFileData *ControlFile = NULL;
/*
* Macros for managing XLogInsert state. In most cases, the calling routine
* has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
* so these are passed as parameters instead of being fetched via XLogCtl.
* Calculate the amount of space left on the page after 'endptr'. Beware
* multiple evaluation!
*/
#define INSERT_FREESPACE(endptr) \
(((endptr) % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr) % XLOG_BLCKSZ))
/* Free space remaining in the current xlog page buffer */
#define INSERT_FREESPACE(Insert) \
(XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
/* Macro to advance to next buffer index. */
#define NextBufIdx(idx) \
(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
/* Construct XLogRecPtr value for current insertion point */
#define INSERT_RECPTR(recptr,Insert,curridx) \
(recptr) = XLogCtl->xlblocks[curridx] - INSERT_FREESPACE(Insert)
/*
* XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
* would hold if it was in cache, the page containing 'recptr'.
*
* XLogRecEndPtrToBufIdx is the same, but a pointer to the first byte of a
* page is taken to mean the previous page.
*/
#define XLogRecPtrToBufIdx(recptr) \
(((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
#define PrevBufIdx(idx) \
(((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
#define XLogRecEndPtrToBufIdx(recptr) \
((((recptr) - 1) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
#define NextBufIdx(idx) \
(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
/*
* These are the number of bytes in a WAL page and segment usable for WAL data.
*/
#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
#define UsableBytesInSegment ((XLOG_SEG_SIZE / XLOG_BLCKSZ) * UsableBytesInPage - (SizeOfXLogLongPHD - SizeOfXLogShortPHD))
/*
* Private, possibly out-of-date copy of shared LogwrtResult.
......@@ -631,6 +728,9 @@ static bool InRedo = false;
/* Have we launched bgwriter during recovery? */
static bool bgwriterLaunched = false;
/* For WALInsertSlotAcquire/Release functions */
static int MySlotNo = 0;
static bool holdingAllSlots = false;
static void readRecoveryCommandFile(void);
static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
......@@ -651,9 +751,9 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
XLogRecPtr *lsn, BkpBlock *bkpb);
static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb,
char *blk, bool get_cleanup_lock, bool keep_buffer);
static bool AdvanceXLInsertBuffer(bool new_segment);
static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
static bool XLogCheckpointNeeded(XLogSegNo new_segno);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
bool find_free, int *max_advance,
bool use_lock);
......@@ -693,6 +793,24 @@ static bool read_backup_label(XLogRecPtr *checkPointLoc,
static void rm_redo_error_callback(void *arg);
static int get_sync_bit(int method);
static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
XLogRecData *rdata,
XLogRecPtr StartPos, XLogRecPtr EndPos);
static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
XLogRecPtr *PrevPtr);
static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
static void WakeupWaiters(XLogRecPtr EndPos);
static char *GetXLogBuffer(XLogRecPtr ptr);
static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
static void WALInsertSlotAcquire(bool exclusive);
static void WALInsertSlotAcquireOne(int slotno);
static void WALInsertSlotRelease(void);
static void WALInsertSlotReleaseOne(int slotno);
/*
* Insert an XLOG record having the specified RMID and info bytes,
......@@ -713,10 +831,6 @@ XLogRecPtr
XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogRecPtr RecPtr;
XLogRecPtr WriteRqst;
uint32 freespace;
int curridx;
XLogRecData *rdt;
XLogRecData *rdt_lastnormal;
Buffer dtbuf[XLR_MAX_BKP_BLOCKS];
......@@ -731,11 +845,13 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
uint32 len,
write_len;
unsigned i;
bool updrqst;
bool doPageWrites;
bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
bool inserted;
uint8 info_orig = info;
static XLogRecord *rechdr;
XLogRecPtr StartPos;
XLogRecPtr EndPos;
if (rechdr == NULL)
{
......@@ -761,8 +877,8 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
*/
if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
{
RecPtr = SizeOfXLogLongPHD; /* start of 1st chkpt record */
return RecPtr;
EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
return EndPos;
}
/*
......@@ -770,9 +886,9 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
* up.
*
* We may have to loop back to here if a race condition is detected below.
* We could prevent the race by doing all this work while holding the
* insert lock, but it seems better to avoid doing CRC calculations while
* holding the lock.
* We could prevent the race by doing all this work while holding an
* insertion slot, but it seems better to avoid doing CRC calculations
* while holding one.
*
* We add entries for backup blocks to the chain, so that they don't need
* any special treatment in the critical section where the chunks are
......@@ -789,8 +905,8 @@ begin:;
/*
* Decide if we need to do full-page writes in this XLOG record: true if
* full_page_writes is on or we have a PITR request for it. Since we
* don't yet have the insert lock, fullPageWrites and forcePageWrites
* could change under us, but we'll recheck them once we have the lock.
* don't yet have an insertion slot, fullPageWrites and forcePageWrites
* could change under us, but we'll recheck them once we have a slot.
*/
doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
......@@ -930,25 +1046,60 @@ begin:;
COMP_CRC32(rdata_crc, rdt->data, rdt->len);
/*
* Construct record header (prev-link and CRC are filled in later), and
* make that the first chunk in the chain.
* Construct record header (prev-link is filled in later, after reserving
* the space for the record), and make that the first chunk in the chain.
*
* The CRC calculated for the header here doesn't include prev-link,
* because we don't know it yet. It will be added later.
*/
rechdr->xl_xid = GetCurrentTransactionIdIfAny();
rechdr->xl_tot_len = SizeOfXLogRecord + write_len;
rechdr->xl_len = len; /* doesn't include backup blocks */
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
rechdr->xl_prev = InvalidXLogRecPtr;
COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
hdr_rdt.next = rdata;
hdr_rdt.data = (char *) rechdr;
hdr_rdt.len = SizeOfXLogRecord;
write_len += SizeOfXLogRecord;
/*----------
*
* We have now done all the preparatory work we can without holding a
* lock or modifying shared state. From here on, inserting the new WAL
* record to the shared WAL buffer cache is a two-step process:
*
* 1. Reserve the right amount of space from the WAL. The current head of
* reserved space is kept in Insert->CurrBytePos, and is protected by
* insertpos_lck.
*
* 2. Copy the record to the reserved WAL space. This involves finding the
* correct WAL buffer containing the reserved space, and copying the
* record in place. This can be done concurrently in multiple processes.
*
* To keep track of which insertions are still in-progress, each concurrent
* inserter allocates an "insertion slot", which tells others how far the
* inserter has progressed. There is a small fixed number of insertion
* slots, determined by the num_xloginsert_slots GUC. When an inserter
* finishes, it updates the xlogInsertingAt of its slot to the end of the
* record it inserted, to let others know that it's done. xlogInsertingAt
* is also updated when crossing over to a new WAL buffer, to allow the
* the previous buffer to be flushed.
*
* Holding onto a slot also protects RedoRecPtr and fullPageWrites from
* changing until the insertion is finished.
*
* Step 2 can usually be done completely in parallel. If the required WAL
* page is not initialized yet, you have to grab WALBufMappingLock to
* initialize it, but the WAL writer tries to do that ahead of insertions
* to avoid that from happening in the critical path.
*
*----------
*/
START_CRIT_SECTION();
/* Now wait to get insert lock */
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
WALInsertSlotAcquire(isLogSwitch);
/*
* Check to see if my RedoRecPtr is out of date. If so, may have to go
......@@ -977,7 +1128,7 @@ begin:;
* Oops, this buffer now needs to be backed up, but we
* didn't think so above. Start over.
*/
LWLockRelease(WALInsertLock);
WALInsertSlotRelease();
END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
......@@ -996,7 +1147,7 @@ begin:;
if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
{
/* Oops, must redo it with full-page data. */
LWLockRelease(WALInsertLock);
WALInsertSlotRelease();
END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
......@@ -1004,238 +1155,1169 @@ begin:;
}
/*
* If the current page is completely full, the record goes to the next
* page, right after the page header.
* Reserve space for the record in the WAL. This also sets the xl_prev
* pointer.
*/
if (isLogSwitch)
inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
else
{
ReserveXLogInsertLocation(write_len, &StartPos, &EndPos,
&rechdr->xl_prev);
inserted = true;
}
if (inserted)
{
/*
* Now that xl_prev has been filled in, finish CRC calculation of the
* record header.
*/
COMP_CRC32(rdata_crc, ((char *) &rechdr->xl_prev), sizeof(XLogRecPtr));
FIN_CRC32(rdata_crc);
rechdr->xl_crc = rdata_crc;
/*
* All the record data, including the header, is now ready to be
* inserted. Copy the record in the space reserved.
*/
CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos);
}
else
{
/*
* This was an xlog-switch record, but the current insert location was
* already exactly at the beginning of a segment, so there was no need
* to do anything.
*/
}
/*
* Done! Let others know that we're finished.
*/
WALInsertSlotRelease();
END_CRIT_SECTION();
/*
* Update shared LogwrtRqst.Write, if we crossed page boundary.
*/
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
SpinLockAcquire(&xlogctl->info_lck);
/* advance global request to include new block(s) */
if (xlogctl->LogwrtRqst.Write < EndPos)
xlogctl->LogwrtRqst.Write = EndPos;
/* update local result copy while I have the chance */
LogwrtResult = xlogctl->LogwrtResult;
SpinLockRelease(&xlogctl->info_lck);
}
/*
* If this was an XLOG_SWITCH record, flush the record and the empty
* padding space that fills the rest of the segment, and perform
* end-of-segment actions (eg, notifying archiver).
*/
if (isLogSwitch)
{
TRACE_POSTGRESQL_XLOG_SWITCH();
XLogFlush(EndPos);
/*
* Even though we reserved the rest of the segment for us, which is
* reflected in EndPos, we return a pointer to just the end of the
* xlog-switch record.
*/
if (inserted)
{
EndPos = StartPos + SizeOfXLogRecord;
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
if (EndPos % XLOG_SEG_SIZE == EndPos % XLOG_BLCKSZ)
EndPos += SizeOfXLogLongPHD;
else
EndPos += SizeOfXLogShortPHD;
}
}
}
#ifdef WAL_DEBUG
if (XLOG_DEBUG)
{
StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf, "INSERT @ %X/%X: ",
(uint32) (EndPos >> 32), (uint32) EndPos);
xlog_outrec(&buf, rechdr);
if (rdata->data != NULL)
{
appendStringInfo(&buf, " - ");
RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data);
}
elog(LOG, "%s", buf.data);
pfree(buf.data);
}
#endif
/*
* Update our global variables
*/
ProcLastRecPtr = StartPos;
XactLastRecEnd = EndPos;
return EndPos;
}
/*
* Reserves the right amount of space for a record of given size from the WAL.
* *StartPos is set to the beginning of the reserved section, *EndPos to
* its end+1. *PrevPtr is set to the beginning of the previous record; it is
* used to set the xl_prev of this record.
*
* This is the performance critical part of XLogInsert that must be serialized
* across backends. The rest can happen mostly in parallel. Try to keep this
* section as short as possible, insertpos_lck can be heavily contended on a
* busy system.
*
* NB: The space calculation here must match the code in CopyXLogRecordToWAL,
* where we actually copy the record to the reserved space.
*/
static void
ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
XLogRecPtr *PrevPtr)
{
volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
uint64 startbytepos;
uint64 endbytepos;
uint64 prevbytepos;
size = MAXALIGN(size);
/* All (non xlog-switch) records should contain data. */
Assert(size > SizeOfXLogRecord);
/*
* The duration the spinlock needs to be held is minimized by minimizing
* the calculations that have to be done while holding the lock. The
* current tip of reserved WAL is kept in CurrBytePos, as a byte position
* that only counts "usable" bytes in WAL, that is, it excludes all WAL
* page headers. The mapping between "usable" byte positions and physical
* positions (XLogRecPtrs) can be done outside the locked region, and
* because the usable byte position doesn't include any headers, reserving
* X bytes from WAL is almost as simple as "CurrBytePos += X".
*/
SpinLockAcquire(&Insert->insertpos_lck);
startbytepos = Insert->CurrBytePos;
endbytepos = startbytepos + size;
prevbytepos = Insert->PrevBytePos;
Insert->CurrBytePos = endbytepos;
Insert->PrevBytePos = startbytepos;
SpinLockRelease(&Insert->insertpos_lck);
*StartPos = XLogBytePosToRecPtr(startbytepos);
*EndPos = XLogBytePosToEndRecPtr(endbytepos);
*PrevPtr = XLogBytePosToRecPtr(prevbytepos);
/*
* Check that the conversions between "usable byte positions" and
* XLogRecPtrs work consistently in both directions.
*/
Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
}
/*
* Like ReserveXLogInsertLocation(), but for an xlog-switch record.
*
* A log-switch record is handled slightly differently. The rest of the
* segment will be reserved for this insertion, as indicated by the returned
* *EndPos_p value. However, if we are already at the beginning of the current
* segment, *StartPos_p and *EndPos_p are set to the current location without
* reserving any space, and the function returns false.
*/
static bool
ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
{
volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
uint64 startbytepos;
uint64 endbytepos;
uint64 prevbytepos;
uint32 size = SizeOfXLogRecord;
XLogRecPtr ptr;
uint32 segleft;
/*
* These calculations are a bit heavy-weight to be done while holding a
* spinlock, but since we're holding all the WAL insertion slots, there
* are no other inserters competing for it. GetXLogInsertRecPtr() does
* compete for it, but that's not called very frequently.
*/
SpinLockAcquire(&Insert->insertpos_lck);
startbytepos = Insert->CurrBytePos;
ptr = XLogBytePosToEndRecPtr(startbytepos);
if (ptr % XLOG_SEG_SIZE == 0)
{
SpinLockRelease(&Insert->insertpos_lck);
*EndPos = *StartPos = ptr;
return false;
}
endbytepos = startbytepos + size;
prevbytepos = Insert->PrevBytePos;
*StartPos = XLogBytePosToRecPtr(startbytepos);
*EndPos = XLogBytePosToEndRecPtr(endbytepos);
segleft = XLOG_SEG_SIZE - ((*EndPos) % XLOG_SEG_SIZE);
if (segleft != XLOG_SEG_SIZE)
{
/* consume the rest of the segment */
*EndPos += segleft;
endbytepos = XLogRecPtrToBytePos(*EndPos);
}
Insert->CurrBytePos = endbytepos;
Insert->PrevBytePos = startbytepos;
SpinLockRelease(&Insert->insertpos_lck);
*PrevPtr = XLogBytePosToRecPtr(prevbytepos);
Assert((*EndPos) % XLOG_SEG_SIZE == 0);
Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
return true;
}
/*
* Subroutine of XLogInsert. Copies a WAL record to an already-reserved
* area in the WAL.
*/
static void
CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
XLogRecPtr StartPos, XLogRecPtr EndPos)
{
char *currpos;
int freespace;
int written;
XLogRecPtr CurrPos;
XLogPageHeader pagehdr;
/* The first chunk is the record header */
Assert(rdata->len == SizeOfXLogRecord);
/*
* Get a pointer to the right place in the right WAL buffer to start
* inserting to.
*/
CurrPos = StartPos;
currpos = GetXLogBuffer(CurrPos);
freespace = INSERT_FREESPACE(CurrPos);
/*
* there should be enough space for at least the first field (xl_tot_len)
* on this page.
*/
Assert(freespace >= sizeof(uint32));
/* Copy record data */
written = 0;
while (rdata != NULL)
{
char *rdata_data = rdata->data;
int rdata_len = rdata->len;
while (rdata_len > freespace)
{
/*
* Write what fits on this page, and continue on the next page.
*/
Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0);
memcpy(currpos, rdata_data, freespace);
rdata_data += freespace;
rdata_len -= freespace;
written += freespace;
CurrPos += freespace;
/*
* Get pointer to beginning of next page, and set the xlp_rem_len
* in the page header. Set XLP_FIRST_IS_CONTRECORD.
*
* It's safe to set the contrecord flag and xlp_rem_len without a
* lock on the page. All the other flags were already set when the
* page was initialized, in AdvanceXLInsertBuffer, and we're the
* only backend that needs to set the contrecord flag.
*/
currpos = GetXLogBuffer(CurrPos);
pagehdr = (XLogPageHeader) currpos;
pagehdr->xlp_rem_len = write_len - written;
pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
/* skip over the page header */
if (CurrPos % XLogSegSize == 0)
{
CurrPos += SizeOfXLogLongPHD;
currpos += SizeOfXLogLongPHD;
}
else
{
CurrPos += SizeOfXLogShortPHD;
currpos += SizeOfXLogShortPHD;
}
freespace = INSERT_FREESPACE(CurrPos);
}
Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
memcpy(currpos, rdata_data, rdata_len);
currpos += rdata_len;
CurrPos += rdata_len;
freespace -= rdata_len;
written += rdata_len;
rdata = rdata->next;
}
Assert(written == write_len);
/* Align the end position, so that the next record starts aligned */
CurrPos = MAXALIGN(CurrPos);
/*
* If this was an xlog-switch, it's not enough to write the switch record,
* we also have to consume all the remaining space in the WAL segment.
* We have already reserved it for us, but we still need to make sure it's
* allocated and zeroed in the WAL buffers so that when the caller (or
* someone else) does XLogWrite(), it can really write out all the zeros.
*/
if (isLogSwitch && CurrPos % XLOG_SEG_SIZE != 0)
{
/* An xlog-switch record doesn't contain any data besides the header */
Assert(write_len == SizeOfXLogRecord);
/*
* We do this one page at a time, to make sure we don't deadlock
* against ourselves if wal_buffers < XLOG_SEG_SIZE.
*/
Assert(EndPos % XLogSegSize == 0);
/* Use up all the remaining space on the first page */
CurrPos += freespace;
while (CurrPos < EndPos)
{
/* initialize the next page (if not initialized already) */
WakeupWaiters(CurrPos);
AdvanceXLInsertBuffer(CurrPos, false);
CurrPos += XLOG_BLCKSZ;
}
}
if (CurrPos != EndPos)
elog(PANIC, "space reserved for WAL record does not match what was written");
}
/*
* Allocate a slot for insertion.
*
* In exclusive mode, all slots are reserved for the current process. That
* blocks all concurrent insertions.
*/
static void
WALInsertSlotAcquire(bool exclusive)
{
int i;
if (exclusive)
{
for (i = 0; i < num_xloginsert_slots; i++)
WALInsertSlotAcquireOne(i);
holdingAllSlots = true;
}
else
WALInsertSlotAcquireOne(-1);
}
/*
* Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary
* one if slotno == -1. The index of the slot that was acquired is stored in
* MySlotNo.
*
* This is more or less equivalent to LWLockAcquire().
*/
static void
WALInsertSlotAcquireOne(int slotno)
{
volatile XLogInsertSlot *slot;
PGPROC *proc = MyProc;
bool retry = false;
int extraWaits = 0;
static int slotToTry = -1;
/*
* Try to use the slot we used last time. If the system isn't particularly
* busy, it's a good bet that it's available, and it's good to have some
* affinity to a particular slot so that you don't unnecessarily bounce
* cache lines between processes when there is no contention.
*
* If this is the first time through in this backend, pick a slot
* (semi-)randomly. This allows the slots to be used evenly if you have a
* lot of very short connections.
*/
if (slotno != -1)
MySlotNo = slotno;
else
{
if (slotToTry == -1)
slotToTry = MyProc->pgprocno % num_xloginsert_slots;
MySlotNo = slotToTry;
}
/*
* We can't wait if we haven't got a PGPROC. This should only occur
* during bootstrap or shared memory initialization. Put an Assert here
* to catch unsafe coding practices.
*/
Assert(MyProc != NULL);
/*
* Lock out cancel/die interrupts until we exit the code section protected
* by the slot. This ensures that interrupts will not interfere with
* manipulations of data structures in shared memory.
*/
START_CRIT_SECTION();
/*
* Loop here to try to acquire slot after each time we are signaled by
* WALInsertSlotRelease.
*/
for (;;)
{
bool mustwait;
slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
/* Acquire mutex. Time spent holding mutex should be short! */
SpinLockAcquire(&slot->mutex);
/* If retrying, allow WALInsertSlotRelease to release waiters again */
if (retry)
slot->releaseOK = true;
/* If I can get the slot, do so quickly. */
if (slot->exclusive == 0)
{
slot->exclusive++;
mustwait = false;
}
else
mustwait = true;
if (!mustwait)
break; /* got the lock */
Assert(slot->owner != MyProc);
/*
* Add myself to wait queue.
*/
proc->lwWaiting = true;
proc->lwWaitMode = LW_EXCLUSIVE;
proc->lwWaitLink = NULL;
if (slot->head == NULL)
slot->head = proc;
else
slot->tail->lwWaitLink = proc;
slot->tail = proc;
/* Can release the mutex now */
SpinLockRelease(&slot->mutex);
/*
* Wait until awakened.
*
* Since we share the process wait semaphore with the regular lock
* manager and ProcWaitForSignal, and we may need to acquire a slot
* while one of those is pending, it is possible that we get awakened
* for a reason other than being signaled by WALInsertSlotRelease. If
* so, loop back and wait again. Once we've gotten the slot,
* re-increment the sema by the number of additional signals received,
* so that the lock manager or signal manager will see the received
* signal when it next waits.
*/
for (;;)
{
/* "false" means cannot accept cancel/die interrupt here. */
PGSemaphoreLock(&proc->sem, false);
if (!proc->lwWaiting)
break;
extraWaits++;
}
/* Now loop back and try to acquire lock again. */
retry = true;
}
slot->owner = proc;
/*
* Normally, we initialize the xlogInsertingAt value of the slot to 1,
* because we don't yet know where in the WAL we're going to insert. It's
* not critical what it points to right now - leaving it to a too small
* value just means that WaitXlogInsertionsToFinish() might wait on us
* unnecessarily, until we update the value (when we finish the insert or
* move to next page).
*
* If we're grabbing all the slots, however, stamp all but the last one
* with InvalidXLogRecPtr, meaning there is no insert in progress. The last
* slot is the one that we will update as we proceed with the insert, the
* rest are held just to keep off other inserters.
*/
if (slotno != -1 && slotno != num_xloginsert_slots - 1)
slot->xlogInsertingAt = InvalidXLogRecPtr;
else
slot->xlogInsertingAt = 1;
/* We are done updating shared state of the slot itself. */
SpinLockRelease(&slot->mutex);
/*
* Fix the process wait semaphore's count for any absorbed wakeups.
*/
while (extraWaits-- > 0)
PGSemaphoreUnlock(&proc->sem);
/*
* If we couldn't get the slot immediately, try another slot next time.
* On a system with more insertion slots than concurrent inserters, this
* causes all the inserters to eventually migrate to a slot that no-one
* else is using. On a system with more inserters than slots, it still
* causes the inserters to be distributed quite evenly across the slots.
*/
if (slotno != -1 && retry)
slotToTry = (slotToTry + 1) % num_xloginsert_slots;
}
/*
* Wait for the given slot to become free, or for its xlogInsertingAt location
* to change to something else than 'waitptr'. In other words, wait for the
* inserter using the given slot to finish its insertion, or to at least make
* some progress.
*/
static void
WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr)
{
PGPROC *proc = MyProc;
int extraWaits = 0;
/*
* Lock out cancel/die interrupts while we sleep on the slot. There is
* no cleanup mechanism to remove us from the wait queue if we got
* interrupted.
*/
HOLD_INTERRUPTS();
/*
* Loop here to try to acquire lock after each time we are signaled.
*/
for (;;)
{
bool mustwait;
/* Acquire mutex. Time spent holding mutex should be short! */
SpinLockAcquire(&slot->mutex);
/* If I can get the lock, do so quickly. */
if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr)
mustwait = false;
else
mustwait = true;
if (!mustwait)
break; /* the lock was free */
Assert(slot->owner != MyProc);
/*
* Add myself to wait queue.
*/
proc->lwWaiting = true;
proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
proc->lwWaitLink = NULL;
/* waiters are added to the front of the queue */
proc->lwWaitLink = slot->head;
if (slot->head == NULL)
slot->tail = proc;
slot->head = proc;
/* Can release the mutex now */
SpinLockRelease(&slot->mutex);
/*
* Wait until awakened.
*
* Since we share the process wait semaphore with other things, like
* the regular lock manager and ProcWaitForSignal, and we may need to
* acquire an LWLock while one of those is pending, it is possible that
* we get awakened for a reason other than being signaled by
* LWLockRelease. If so, loop back and wait again. Once we've gotten
* the LWLock, re-increment the sema by the number of additional
* signals received, so that the lock manager or signal manager will
* see the received signal when it next waits.
*/
for (;;)
{
/* "false" means cannot accept cancel/die interrupt here. */
PGSemaphoreLock(&proc->sem, false);
if (!proc->lwWaiting)
break;
extraWaits++;
}
/* Now loop back and try to acquire lock again. */
}
/* We are done updating shared state of the lock itself. */
SpinLockRelease(&slot->mutex);
/*
* Fix the process wait semaphore's count for any absorbed wakeups.
*/
while (extraWaits-- > 0)
PGSemaphoreUnlock(&proc->sem);
/*
* Now okay to allow cancel/die interrupts.
*/
RESUME_INTERRUPTS();
}
/*
* Wake up all processes waiting for us with WaitOnSlot(). Sets our
* xlogInsertingAt value to EndPos, without releasing the slot.
*/
static void
WakeupWaiters(XLogRecPtr EndPos)
{
volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
PGPROC *head;
PGPROC *proc;
PGPROC *next;
/*
* If we have already reported progress up to the same point, do nothing.
* No other process can modify xlogInsertingAt, so we can check this before
* grabbing the spinlock.
*/
if (slot->xlogInsertingAt == EndPos)
return;
/* xlogInsertingAt should not go backwards */
Assert(slot->xlogInsertingAt < EndPos);
/* Acquire mutex. Time spent holding mutex should be short! */
SpinLockAcquire(&slot->mutex);
/* we should own the slot */
Assert(slot->exclusive == 1 && slot->owner == MyProc);
slot->xlogInsertingAt = EndPos;
/*
* See if there are any waiters that need to be woken up.
*/
head = slot->head;
if (head != NULL)
{
proc = head;
/* LW_WAIT_UNTIL_FREE waiters are always in the front of the queue */
next = proc->lwWaitLink;
while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
{
proc = next;
next = next->lwWaitLink;
}
/* proc is now the last PGPROC to be released */
slot->head = next;
proc->lwWaitLink = NULL;
}
/* We are done updating shared state of the lock itself. */
SpinLockRelease(&slot->mutex);
/*
* Awaken any waiters I removed from the queue.
*/
while (head != NULL)
{
proc = head;
head = proc->lwWaitLink;
proc->lwWaitLink = NULL;
proc->lwWaiting = false;
PGSemaphoreUnlock(&proc->sem);
}
}
/*
* Release our insertion slot (or slots, if we're holding them all).
*/
static void
WALInsertSlotRelease(void)
{
int i;
if (holdingAllSlots)
{
for (i = 0; i < num_xloginsert_slots; i++)
WALInsertSlotReleaseOne(i);
holdingAllSlots = false;
}
else
WALInsertSlotReleaseOne(MySlotNo);
}
static void
WALInsertSlotReleaseOne(int slotno)
{
volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot;
PGPROC *head;
PGPROC *proc;
/* Acquire mutex. Time spent holding mutex should be short! */
SpinLockAcquire(&slot->mutex);
/* we must be holding it */
Assert(slot->exclusive == 1 && slot->owner == MyProc);
slot->xlogInsertingAt = InvalidXLogRecPtr;
/* Release my hold on the slot */
slot->exclusive = 0;
slot->owner = NULL;
/*
* See if I need to awaken any waiters..
*/
head = slot->head;
if (head != NULL)
{
if (slot->releaseOK)
{
/*
* Remove the to-be-awakened PGPROCs from the queue.
*/
bool releaseOK = true;
proc = head;
/*
* First wake up any backends that want to be woken up without
* acquiring the lock. These are always in the front of the queue.
*/
while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
proc = proc->lwWaitLink;
/*
* Awaken the first exclusive-waiter, if any.
*/
if (proc->lwWaitLink)
{
Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE);
proc = proc->lwWaitLink;
releaseOK = false;
}
/* proc is now the last PGPROC to be released */
slot->head = proc->lwWaitLink;
proc->lwWaitLink = NULL;
slot->releaseOK = releaseOK;
}
else
head = NULL;
}
/* We are done updating shared state of the slot itself. */
SpinLockRelease(&slot->mutex);
/*
* Awaken any waiters I removed from the queue.
*/
while (head != NULL)
{
proc = head;
head = proc->lwWaitLink;
proc->lwWaitLink = NULL;
proc->lwWaiting = false;
PGSemaphoreUnlock(&proc->sem);
}
/*
* Now okay to allow cancel/die interrupts.
*/
END_CRIT_SECTION();
}
/*
* Wait for any WAL insertions < upto to finish.
*
* Returns the location of the oldest insertion that is still in-progress.
* Any WAL prior to that point has been fully copied into WAL buffers, and
* can be flushed out to disk. Because this waits for any insertions older
* than 'upto' to finish, the return value is always >= 'upto'.
*
* Note: When you are about to write out WAL, you must call this function
* *before* acquiring WALWriteLock, to avoid deadlocks. This function might
* need to wait for an insertion to finish (or at least advance to next
* uninitialized page), and the inserter might need to evict an old WAL buffer
* to make room for a new one, which in turn requires WALWriteLock.
*/
static XLogRecPtr
WaitXLogInsertionsToFinish(XLogRecPtr upto)
{
uint64 bytepos;
XLogRecPtr reservedUpto;
XLogRecPtr finishedUpto;
volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
int i;
if (MyProc == NULL)
elog(PANIC, "cannot wait without a PGPROC structure");
/* Read the current insert position */
SpinLockAcquire(&Insert->insertpos_lck);
bytepos = Insert->CurrBytePos;
SpinLockRelease(&Insert->insertpos_lck);
reservedUpto = XLogBytePosToEndRecPtr(bytepos);
/*
* No-one should request to flush a piece of WAL that hasn't even been
* reserved yet. However, it can happen if there is a block with a bogus
* LSN on disk, for example. XLogFlush checks for that situation and
* complains, but only after the flush. Here we just assume that to mean
* that all WAL that has been reserved needs to be finished. In this
* corner-case, the return value can be smaller than 'upto' argument.
*/
updrqst = false;
freespace = INSERT_FREESPACE(Insert);
if (freespace == 0)
if (upto > reservedUpto)
{
updrqst = AdvanceXLInsertBuffer(false);
freespace = INSERT_FREESPACE(Insert);
elog(LOG, "request to flush past end of generated WAL; request %X/%X, currpos %X/%X",
(uint32) (upto >> 32), (uint32) upto,
(uint32) (reservedUpto >> 32), (uint32) reservedUpto);
upto = reservedUpto;
}
/* Compute record's XLOG location */
curridx = Insert->curridx;
INSERT_RECPTR(RecPtr, Insert, curridx);
/*
* finishedUpto is our return value, indicating the point upto which
* all the WAL insertions have been finished. Initialize it to the head
* of reserved WAL, and as we iterate through the insertion slots, back it
* out for any insertion that's still in progress.
*/
finishedUpto = reservedUpto;
/*
* If the record is an XLOG_SWITCH, and we are exactly at the start of a
* segment, we need not insert it (and don't want to because we'd like
* consecutive switch requests to be no-ops). Instead, make sure
* everything is written and flushed through the end of the prior segment,
* and return the prior segment's end address.
* Loop through all the slots, sleeping on any in-progress insert older
* than 'upto'.
*/
if (isLogSwitch && (RecPtr % XLogSegSize) == SizeOfXLogLongPHD)
for (i = 0; i < num_xloginsert_slots; i++)
{
/* We can release insert lock immediately */
LWLockRelease(WALInsertLock);
volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
XLogRecPtr insertingat;
RecPtr -= SizeOfXLogLongPHD;
retry:
/*
* We can check if the slot is in use without grabbing the spinlock.
* The spinlock acquisition of insertpos_lck before this loop acts
* as a memory barrier. If someone acquires the slot after that, it
* can't possibly be inserting to anything < reservedUpto. If it was
* acquired before that, an unlocked test will return true.
*/
if (!slot->exclusive)
continue;
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
LogwrtResult = XLogCtl->LogwrtResult;
if (LogwrtResult.Flush < RecPtr)
SpinLockAcquire(&slot->mutex);
/* re-check now that we have the lock */
if (!slot->exclusive)
{
XLogwrtRqst FlushRqst;
FlushRqst.Write = RecPtr;
FlushRqst.Flush = RecPtr;
XLogWrite(FlushRqst, false, false);
}
LWLockRelease(WALWriteLock);
END_CRIT_SECTION();
/* wake up walsenders now that we've released heavily contended locks */
WalSndWakeupProcessRequests();
return RecPtr;
SpinLockRelease(&slot->mutex);
continue;
}
insertingat = slot->xlogInsertingAt;
SpinLockRelease(&slot->mutex);
/* Finish the record header */
rechdr->xl_prev = Insert->PrevRecord;
/* Now we can finish computing the record's CRC */
COMP_CRC32(rdata_crc, (char *) rechdr, offsetof(XLogRecord, xl_crc));
FIN_CRC32(rdata_crc);
rechdr->xl_crc = rdata_crc;
#ifdef WAL_DEBUG
if (XLOG_DEBUG)
{
StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf, "INSERT @ %X/%X: ",
(uint32) (RecPtr >> 32), (uint32) RecPtr);
xlog_outrec(&buf, rechdr);
if (rdata->data != NULL)
if (insertingat == InvalidXLogRecPtr)
{
appendStringInfo(&buf, " - ");
RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data);
}
elog(LOG, "%s", buf.data);
pfree(buf.data);
/*
* slot is reserved just to hold off other inserters, there is no
* actual insert in progress.
*/
continue;
}
#endif
/* Record begin of record in appropriate places */
ProcLastRecPtr = RecPtr;
Insert->PrevRecord = RecPtr;
/*
* Append the data, including backup blocks if any
* This insertion is still in progress. Do we need to wait for it?
*
* When an inserter acquires a slot, it doesn't reset 'insertingat', so
* it will initially point to the old value of some already-finished
* insertion. The inserter will update the value as soon as it finishes
* the insertion, moves to the next page, or has to do I/O to flush an
* old dirty buffer. That means that when we see a slot with
* insertingat value < upto, we don't know if that insertion is still
* truly in progress, or if the slot is reused by a new inserter that
* hasn't updated the insertingat value yet. We have to assume it's the
* latter, and wait.
*/
rdata = &hdr_rdt;
while (write_len)
{
while (rdata->data == NULL)
rdata = rdata->next;
if (freespace > 0)
if (insertingat < upto)
{
if (rdata->len > freespace)
{
memcpy(Insert->currpos, rdata->data, freespace);
rdata->data += freespace;
rdata->len -= freespace;
write_len -= freespace;
WaitOnSlot(slot, insertingat);
goto retry;
}
else
{
memcpy(Insert->currpos, rdata->data, rdata->len);
freespace -= rdata->len;
write_len -= rdata->len;
Insert->currpos += rdata->len;
rdata = rdata->next;
continue;
/*
* We don't need to wait for this insertion, but update the
* return value.
*/
if (insertingat < finishedUpto)
finishedUpto = insertingat;
}
}
return finishedUpto;
}
/* Use next buffer */
updrqst = AdvanceXLInsertBuffer(false);
curridx = Insert->curridx;
/* Mark page header to indicate this record continues on the page */
Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
Insert->currpage->xlp_rem_len = write_len;
freespace = INSERT_FREESPACE(Insert);
}
/*
* Get a pointer to the right location in the WAL buffer containing the
* given XLogRecPtr.
*
* If the page is not initialized yet, it is initialized. That might require
* evicting an old dirty buffer from the buffer cache, which means I/O.
*
* The caller must ensure that the page containing the requested location
* isn't evicted yet, and won't be evicted. The way to ensure that is to
* hold onto an XLogInsertSlot with the xlogInsertingAt position set to
* something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs
* to evict an old page from the buffer. (This means that once you call
* GetXLogBuffer() with a given 'ptr', you must not access anything before
* that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
* later, because older buffers might be recycled already)
*/
static char *
GetXLogBuffer(XLogRecPtr ptr)
{
int idx;
XLogRecPtr endptr;
static uint64 cachedPage = 0;
static char *cachedPos = NULL;
XLogRecPtr expectedEndPtr;
/* Ensure next record will be properly aligned */
Insert->currpos = (char *) Insert->currpage +
MAXALIGN(Insert->currpos - (char *) Insert->currpage);
freespace = INSERT_FREESPACE(Insert);
/*
* Fast path for the common case that we need to access again the same
* page as last time.
*/
if (ptr / XLOG_BLCKSZ == cachedPage)
{
Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
return cachedPos + ptr % XLOG_BLCKSZ;
}
/*
* The recptr I return is the beginning of the *next* record. This will be
* stored as LSN for changed data pages...
* The XLog buffer cache is organized so that a page is always loaded
* to a particular buffer. That way we can easily calculate the buffer
* a given page must be loaded into, from the XLogRecPtr alone.
*/
INSERT_RECPTR(RecPtr, Insert, curridx);
idx = XLogRecPtrToBufIdx(ptr);
/*
* If the record is an XLOG_SWITCH, we must now write and flush all the
* existing data, and then forcibly advance to the start of the next
* segment. It's not good to do this I/O while holding the insert lock,
* but there seems too much risk of confusion if we try to release the
* lock sooner. Fortunately xlog switch needn't be a high-performance
* operation anyway...
* See what page is loaded in the buffer at the moment. It could be the
* page we're looking for, or something older. It can't be anything newer
* - that would imply the page we're looking for has already been written
* out to disk and evicted, and the caller is responsible for making sure
* that doesn't happen.
*
* However, we don't hold a lock while we read the value. If someone has
* just initialized the page, it's possible that we get a "torn read" of
* the XLogRecPtr if 64-bit fetches are not atomic on this platform. In
* that case we will see a bogus value. That's ok, we'll grab the mapping
* lock (in AdvanceXLInsertBuffer) and retry if we see anything else than
* the page we're looking for. But it means that when we do this unlocked
* read, we might see a value that appears to be ahead of the page we're
* looking for. Don't PANIC on that, until we've verified the value while
* holding the lock.
*/
if (isLogSwitch)
expectedEndPtr = ptr;
expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
endptr = XLogCtl->xlblocks[idx];
if (expectedEndPtr != endptr)
{
XLogwrtRqst FlushRqst;
XLogRecPtr OldSegEnd;
/*
* Let others know that we're finished inserting the record up
* to the page boundary.
*/
WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ);
TRACE_POSTGRESQL_XLOG_SWITCH();
AdvanceXLInsertBuffer(ptr, false);
endptr = XLogCtl->xlblocks[idx];
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
if (expectedEndPtr != endptr)
elog(PANIC, "could not find WAL buffer for %X/%X",
(uint32) (ptr >> 32) , (uint32) ptr);
}
else
{
/*
* Make sure the initialization of the page is visible to us, and
* won't arrive later to overwrite the WAL data we write on the page.
*/
pg_memory_barrier();
}
/*
* Flush through the end of the page containing XLOG_SWITCH, and
* perform end-of-segment actions (eg, notifying archiver).
* Found the buffer holding this page. Return a pointer to the right
* offset within the page.
*/
WriteRqst = XLogCtl->xlblocks[curridx];
FlushRqst.Write = WriteRqst;
FlushRqst.Flush = WriteRqst;
XLogWrite(FlushRqst, false, true);
cachedPage = ptr / XLOG_BLCKSZ;
cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
/* Set up the next buffer as first page of next segment */
/* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
(void) AdvanceXLInsertBuffer(true);
Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
/* There should be no unwritten data */
curridx = Insert->curridx;
Assert(curridx == XLogCtl->Write.curridx);
return cachedPos + ptr % XLOG_BLCKSZ;
}
/* Compute end address of old segment */
OldSegEnd = XLogCtl->xlblocks[curridx];
OldSegEnd -= XLOG_BLCKSZ;
/*
* Converts a "usable byte position" to XLogRecPtr. A usable byte position
* is the position starting from the beginning of WAL, excluding all WAL
* page headers.
*/
static XLogRecPtr
XLogBytePosToRecPtr(uint64 bytepos)
{
uint64 fullsegs;
uint64 fullpages;
uint64 bytesleft;
uint32 seg_offset;
XLogRecPtr result;
/* Make it look like we've written and synced all of old segment */
LogwrtResult.Write = OldSegEnd;
LogwrtResult.Flush = OldSegEnd;
fullsegs = bytepos / UsableBytesInSegment;
bytesleft = bytepos % UsableBytesInSegment;
/*
* Update shared-memory status --- this code should match XLogWrite
*/
if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->LogwrtResult = LogwrtResult;
if (xlogctl->LogwrtRqst.Write < LogwrtResult.Write)
xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
if (xlogctl->LogwrtRqst.Flush < LogwrtResult.Flush)
xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
SpinLockRelease(&xlogctl->info_lck);
/* fits on first page of segment */
seg_offset = bytesleft + SizeOfXLogLongPHD;
}
else
{
/* account for the first page on segment with long header */
seg_offset = XLOG_BLCKSZ;
bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
LWLockRelease(WALWriteLock);
fullpages = bytesleft / UsableBytesInPage;
bytesleft = bytesleft % UsableBytesInPage;
updrqst = false; /* done already */
seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
}
else
{
/* normal case, ie not xlog switch */
/* Need to update shared LogwrtRqst if some block was filled up */
if (freespace == 0)
XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
return result;
}
/*
* Like XLogBytePosToRecPtr, but if the position is at a page boundary,
* returns a pointer to the beginning of the page (ie. before page header),
* not to where the first xlog record on that page would go to. This is used
* when converting a pointer to the end of a record.
*/
static XLogRecPtr
XLogBytePosToEndRecPtr(uint64 bytepos)
{
uint64 fullsegs;
uint64 fullpages;
uint64 bytesleft;
uint32 seg_offset;
XLogRecPtr result;
fullsegs = bytepos / UsableBytesInSegment;
bytesleft = bytepos % UsableBytesInSegment;
if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
{
/* curridx is filled and available for writing out */
updrqst = true;
/* fits on first page of segment */
if (bytesleft == 0)
seg_offset = 0;
else
seg_offset = bytesleft + SizeOfXLogLongPHD;
}
else
{
/* if updrqst already set, write through end of previous buf */
curridx = PrevBufIdx(curridx);
}
WriteRqst = XLogCtl->xlblocks[curridx];
/* account for the first page on segment with long header */
seg_offset = XLOG_BLCKSZ;
bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
fullpages = bytesleft / UsableBytesInPage;
bytesleft = bytesleft % UsableBytesInPage;
if (bytesleft == 0)
seg_offset += fullpages * XLOG_BLCKSZ + bytesleft;
else
seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
}
LWLockRelease(WALInsertLock);
XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
if (updrqst)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
return result;
}
SpinLockAcquire(&xlogctl->info_lck);
/* advance global request to include new block(s) */
if (xlogctl->LogwrtRqst.Write < WriteRqst)
xlogctl->LogwrtRqst.Write = WriteRqst;
/* update local result copy while I have the chance */
LogwrtResult = xlogctl->LogwrtResult;
SpinLockRelease(&xlogctl->info_lck);
}
/*
* Convert an XLogRecPtr to a "usable byte position".
*/
static uint64
XLogRecPtrToBytePos(XLogRecPtr ptr)
{
uint64 fullsegs;
uint32 fullpages;
uint32 offset;
uint64 result;
XactLastRecEnd = RecPtr;
XLByteToSeg(ptr, fullsegs);
END_CRIT_SECTION();
fullpages = (ptr % XLOG_SEG_SIZE) / XLOG_BLCKSZ;
offset = ptr % XLOG_BLCKSZ;
/* wake up walsenders now that we've released heavily contended locks */
WalSndWakeupProcessRequests();
if (fullpages == 0)
{
result = fullsegs * UsableBytesInSegment;
if (offset > 0)
{
Assert(offset >= SizeOfXLogLongPHD);
result += offset - SizeOfXLogLongPHD;
}
}
else
{
result = fullsegs * UsableBytesInSegment +
(XLOG_BLCKSZ - SizeOfXLogLongPHD) + /* account for first page */
(fullpages - 1) * UsableBytesInPage; /* full pages */
if (offset > 0)
{
Assert(offset >= SizeOfXLogShortPHD);
result += offset - SizeOfXLogShortPHD;
}
}
return RecPtr;
return result;
}
/*
......@@ -1303,44 +2385,48 @@ XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
}
/*
* Advance the Insert state to the next buffer page, writing out the next
* buffer if it still contains unwritten data.
*
* If new_segment is TRUE then we set up the next buffer page as the first
* page of the next xlog segment file, possibly but not usually the next
* consecutive file page.
*
* The global LogwrtRqst.Write pointer needs to be advanced to include the
* just-filled page. If we can do this for free (without an extra lock),
* we do so here. Otherwise the caller must do it. We return TRUE if the
* request update still needs to be done, FALSE if we did it internally.
*
* Must be called with WALInsertLock held.
* Initialize XLOG buffers, writing out old buffers if they still contain
* unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
* true, initialize as many pages as we can without having to write out
* unwritten data. Any new pages are initialized to zeros, with pages headers
* initialized properly.
*/
static bool
AdvanceXLInsertBuffer(bool new_segment)
static void
AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
int nextidx = NextBufIdx(Insert->curridx);
bool update_needed = true;
int nextidx;
XLogRecPtr OldPageRqstPtr;
XLogwrtRqst WriteRqst;
XLogRecPtr NewPageEndPtr;
XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr;
XLogRecPtr NewPageBeginPtr;
XLogPageHeader NewPage;
int npages = 0;
LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
/*
* Now that we have the lock, check if someone initialized the page
* already.
*/
while (upto >= XLogCtl->xlblocks[XLogCtl->curridx] || opportunistic)
{
nextidx = NextBufIdx(XLogCtl->curridx);
/*
* Get ending-offset of the buffer page we need to replace (this may be
* zero if the buffer hasn't been used yet). Fall through if it's already
* written out.
* Get ending-offset of the buffer page we need to replace (this may
* be zero if the buffer hasn't been used yet). Fall through if it's
* already written out.
*/
OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
if (LogwrtResult.Write < OldPageRqstPtr)
{
/* nope, got work to do... */
XLogRecPtr FinishedPageRqstPtr;
FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
/*
* Nope, got work to do. If we just want to pre-initialize as much
* as we can without flushing, give up now.
*/
if (opportunistic)
break;
/* Before waiting, get info_lck and update LogwrtResult */
{
......@@ -1348,22 +2434,30 @@ AdvanceXLInsertBuffer(bool new_segment)
volatile XLogCtlData *xlogctl = XLogCtl;
SpinLockAcquire(&xlogctl->info_lck);
if (xlogctl->LogwrtRqst.Write < FinishedPageRqstPtr)
xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
if (xlogctl->LogwrtRqst.Write < OldPageRqstPtr)
xlogctl->LogwrtRqst.Write = OldPageRqstPtr;
LogwrtResult = xlogctl->LogwrtResult;
SpinLockRelease(&xlogctl->info_lck);
}
update_needed = false; /* Did the shared-request update */
/*
* Now that we have an up-to-date LogwrtResult value, see if we still
* need to write it or if someone else already did.
* Now that we have an up-to-date LogwrtResult value, see if we
* still need to write it or if someone else already did.
*/
if (LogwrtResult.Write < OldPageRqstPtr)
{
/* Must acquire write lock */
/*
* Must acquire write lock. Release WALBufMappingLock first,
* to make sure that all insertions that we need to wait for
* can finish (up to this same position). Otherwise we risk
* deadlock.
*/
LWLockRelease(WALBufMappingLock);
WaitXLogInsertionsToFinish(OldPageRqstPtr);
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
LogwrtResult = XLogCtl->LogwrtResult;
if (LogwrtResult.Write >= OldPageRqstPtr)
{
......@@ -1372,17 +2466,17 @@ AdvanceXLInsertBuffer(bool new_segment)
}
else
{
/*
* Have to write buffers while holding insert lock. This is
* not good, so only write as much as we absolutely must.
*/
/* Have to write it ourselves */
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
WriteRqst.Write = OldPageRqstPtr;
WriteRqst.Flush = 0;
XLogWrite(WriteRqst, false, false);
XLogWrite(WriteRqst, false);
LWLockRelease(WALWriteLock);
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
}
/* Re-acquire WALBufMappingLock and retry */
LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
continue;
}
}
......@@ -1390,28 +2484,18 @@ AdvanceXLInsertBuffer(bool new_segment)
* Now the next buffer slot is free and we can set it up to be the next
* output page.
*/
NewPageBeginPtr = XLogCtl->xlblocks[Insert->curridx];
NewPageBeginPtr = XLogCtl->xlblocks[XLogCtl->curridx];
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
if (new_segment)
{
/* force it to a segment start point */
if (NewPageBeginPtr % XLogSegSize != 0)
NewPageBeginPtr += XLogSegSize - NewPageBeginPtr % XLogSegSize;
}
Assert(NewPageEndPtr % XLOG_BLCKSZ == 0);
Assert(XLogRecEndPtrToBufIdx(NewPageEndPtr) == nextidx);
Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
NewPageEndPtr = NewPageBeginPtr;
NewPageEndPtr += XLOG_BLCKSZ;
XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
Insert->curridx = nextidx;
Insert->currpage = NewPage;
Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
/*
* Be sure to re-zero the buffer so that bytes beyond what we've written
* will look like zeroes and not valid XLOG records...
* Be sure to re-zero the buffer so that bytes beyond what we've
* written will look like zeroes and not valid XLOG records...
*/
MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
......@@ -1423,18 +2507,20 @@ AdvanceXLInsertBuffer(bool new_segment)
/* NewPage->xlp_info = 0; */ /* done by memset */
NewPage ->xlp_tli = ThisTimeLineID;
NewPage ->xlp_pageaddr = NewPageBeginPtr;
/* NewPage->xlp_rem_len = 0; */ /* done by memset */
/*
* If online backup is not in progress, mark the header to indicate that
* WAL records beginning in this page have removable backup blocks. This
* allows the WAL archiver to know whether it is safe to compress archived
* WAL data by transforming full-block records into the non-full-block
* format. It is sufficient to record this at the page level because we
* force a page switch (in fact a segment switch) when starting a backup,
* so the flag will be off before any records can be written during the
* backup. At the end of a backup, the last page will be marked as all
* unsafe when perhaps only part is unsafe, but at worst the archiver
* would miss the opportunity to compress a few records.
* If online backup is not in progress, mark the header to indicate
* that* WAL records beginning in this page have removable backup
* blocks. This allows the WAL archiver to know whether it is safe to
* compress archived WAL data by transforming full-block records into
* the non-full-block format. It is sufficient to record this at the
* page level because we force a page switch (in fact a segment switch)
* when starting a backup, so the flag will be off before any records
* can be written during the backup. At the end of a backup, the last
* page will be marked as all unsafe when perhaps only part is unsafe,
* but at worst the archiver would miss the opportunity to compress a
* few records.
*/
if (!Insert->forcePageWrites)
NewPage ->xlp_info |= XLP_BKP_REMOVABLE;
......@@ -1450,11 +2536,30 @@ AdvanceXLInsertBuffer(bool new_segment)
NewLongPage->xlp_seg_size = XLogSegSize;
NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
NewPage ->xlp_info |= XLP_LONG_HEADER;
}
/*
* Make sure the initialization of the page becomes visible to others
* before the xlblocks update. GetXLogBuffer() reads xlblocks without
* holding a lock.
*/
pg_write_barrier();
*((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
XLogCtl->curridx = nextidx;
npages++;
}
LWLockRelease(WALBufMappingLock);
return update_needed;
#ifdef WAL_DEBUG
if (npages > 0)
{
elog(DEBUG1, "initialized %d pages, upto %X/%X",
npages, (uint32) (NewPageEndPtr >> 32), (uint32) NewPageEndPtr);
}
#endif
}
/*
......@@ -1486,16 +2591,12 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
* This option allows us to avoid uselessly issuing multiple writes when a
* single one would do.
*
* If xlog_switch == TRUE, we are intending an xlog segment switch, so
* perform end-of-segment actions after writing the last page, even if
* it's not physically the end of its segment. (NB: this will work properly
* only if caller specifies WriteRqst == page-end and flexible == false,
* and there is some data to write.)
*
* Must be called with WALWriteLock held.
* Must be called with WALWriteLock held. WaitXLogInsertionsToFinish(WriteRqst)
* must be called before grabbing the lock, to make sure the data is ready to
* write.
*/
static void
XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
{
XLogCtlWrite *Write = &XLogCtl->Write;
bool ispartialpage;
......@@ -1544,15 +2645,15 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
* if we're passed a bogus WriteRqst.Write that is past the end of the
* last page that's been initialized by AdvanceXLInsertBuffer.
*/
if (LogwrtResult.Write >= XLogCtl->xlblocks[curridx])
XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
if (LogwrtResult.Write >= EndPtr)
elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
(uint32) (LogwrtResult.Write >> 32),
(uint32) LogwrtResult.Write,
(uint32) (XLogCtl->xlblocks[curridx] >> 32),
(uint32) XLogCtl->xlblocks[curridx]);
(uint32) (EndPtr >> 32), (uint32) EndPtr);
/* Advance LogwrtResult.Write to end of current buffer page */
LogwrtResult.Write = XLogCtl->xlblocks[curridx];
LogwrtResult.Write = EndPtr;
ispartialpage = WriteRqst.Write < LogwrtResult.Write;
if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo))
......@@ -1656,16 +2757,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
* later. Doing it here ensures that one and only one backend will
* perform this fsync.
*
* We also do this if this is the last page written for an xlog
* switch.
*
* This is also the right place to notify the Archiver that the
* segment is ready to copy to archival storage, and to update the
* timer for archive_timeout, and to signal for a checkpoint if
* too many logfile segments have been used since the last
* checkpoint.
*/
if (finishing_seg || (xlog_switch && last_iteration))
if (finishing_seg)
{
issue_xlog_fsync(openLogFile, openLogSegNo);
......@@ -1949,6 +3047,7 @@ XLogFlush(XLogRecPtr record)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr insertpos;
/* read LogwrtResult and update local state */
SpinLockAcquire(&xlogctl->info_lck);
......@@ -1961,6 +3060,12 @@ XLogFlush(XLogRecPtr record)
if (record <= LogwrtResult.Flush)
break;
/*
* Before actually performing the write, wait for all in-flight
* insertions to the pages we're about to write to finish.
*/
insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
/*
* Try to get the write lock. If we can't get it immediately, wait
* until it's released, and recheck if we still need to do the flush
......@@ -1997,31 +3102,27 @@ XLogFlush(XLogRecPtr record)
*/
if (CommitDelay > 0 && enableFsync &&
MinimumActiveBackends(CommitSiblings))
{
pg_usleep(CommitDelay);
/*
* Re-check how far we can now flush the WAL. It's generally not
* safe to call WaitXLogInsetionsToFinish while holding
* WALWriteLock, because an in-progress insertion might need to
* also grab WALWriteLock to make progress. But we know that all
* the insertions up to insertpos have already finished, because
* that's what the earlier WaitXLogInsertionsToFinish() returned.
* We're only calling it again to allow insertpos to be moved
* further forward, not to actually wait for anyone.
*/
insertpos = WaitXLogInsertionsToFinish(insertpos);
}
/* try to write/flush later additions to XLOG as well */
if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
uint32 freespace = INSERT_FREESPACE(Insert);
WriteRqst.Write = insertpos;
WriteRqst.Flush = insertpos;
if (freespace == 0) /* buffer is full */
WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
else
{
WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
WriteRqstPtr -= freespace;
}
LWLockRelease(WALInsertLock);
WriteRqst.Write = WriteRqstPtr;
WriteRqst.Flush = WriteRqstPtr;
}
else
{
WriteRqst.Write = WriteRqstPtr;
WriteRqst.Flush = record;
}
XLogWrite(WriteRqst, false, false);
XLogWrite(WriteRqst, false);
LWLockRelease(WALWriteLock);
/* done */
......@@ -2142,7 +3243,8 @@ XLogBackgroundFlush(void)
START_CRIT_SECTION();
/* now wait for the write lock */
/* now wait for any in-progress insertions to finish and get write lock */
WaitXLogInsertionsToFinish(WriteRqstPtr);
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
LogwrtResult = XLogCtl->LogwrtResult;
if (WriteRqstPtr > LogwrtResult.Flush)
......@@ -2151,7 +3253,7 @@ XLogBackgroundFlush(void)
WriteRqst.Write = WriteRqstPtr;
WriteRqst.Flush = WriteRqstPtr;
XLogWrite(WriteRqst, flexible, false);
XLogWrite(WriteRqst, flexible);
wrote_something = true;
}
LWLockRelease(WALWriteLock);
......@@ -2161,6 +3263,12 @@ XLogBackgroundFlush(void)
/* wake up walsenders now that we've released heavily contended locks */
WalSndWakeupProcessRequests();
/*
* Great, done. To take some work off the critical path, try to initialize
* as many of the no-longer-needed WAL buffers for future use as we can.
*/
AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
return wrote_something;
}
......@@ -3937,10 +5045,13 @@ XLOGShmemSize(void)
/* XLogCtl */
size = sizeof(XLogCtlData);
/* xlog insertion slots, plus alignment */
size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1));
/* xlblocks array */
size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
/* extra alignment padding for XLOG I/O buffers */
size = add_size(size, ALIGNOF_XLOG_BUFFER);
size = add_size(size, XLOG_BLCKSZ);
/* and the buffers themselves */
size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
......@@ -3959,11 +5070,11 @@ XLOGShmemInit(void)
bool foundCFile,
foundXLog;
char *allocptr;
int i;
ControlFile = (ControlFileData *)
ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
XLogCtl = (XLogCtlData *)
ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
allocptr = ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
if (foundCFile || foundXLog)
{
......@@ -3971,7 +5082,7 @@ XLOGShmemInit(void)
Assert(foundCFile && foundXLog);
return;
}
XLogCtl = (XLogCtlData *) allocptr;
memset(XLogCtl, 0, sizeof(XLogCtlData));
/*
......@@ -3979,15 +5090,23 @@ XLOGShmemInit(void)
* multiple of the alignment for same, so no extra alignment padding is
* needed here.
*/
allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
allocptr += sizeof(XLogCtlData);
XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
/* Xlog insertion slots. Ensure they're aligned to the full padded size */
allocptr += sizeof(XLogInsertSlotPadded) -
((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded);
XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr;
allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots;
/*
* Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
* Align the start of the page buffers to a full xlog block size boundary.
* This simplifies some calculations in XLOG insertion. It is also required
* for O_DIRECT.
*/
allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr);
XLogCtl->pages = allocptr;
memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
......@@ -3999,7 +5118,21 @@ XLOGShmemInit(void)
XLogCtl->SharedRecoveryInProgress = true;
XLogCtl->SharedHotStandbyActive = false;
XLogCtl->WalWriterSleeping = false;
XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
for (i = 0; i < num_xloginsert_slots; i++)
{
XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
SpinLockInit(&slot->mutex);
slot->xlogInsertingAt = InvalidXLogRecPtr;
slot->owner = NULL;
slot->releaseOK = true;
slot->exclusive = 0;
slot->head = NULL;
slot->tail = NULL;
}
SpinLockInit(&XLogCtl->Insert.insertpos_lck);
SpinLockInit(&XLogCtl->info_lck);
SpinLockInit(&XLogCtl->ulsn_lck);
InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
......@@ -4050,8 +5183,8 @@ BootStrapXLOG(void)
ThisTimeLineID = 1;
/* page buffer must be aligned suitably for O_DIRECT */
buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
buffer = (char *) palloc(XLOG_BLCKSZ + XLOG_BLCKSZ);
page = (XLogPageHeader) TYPEALIGN(XLOG_BLCKSZ, buffer);
memset(page, 0, XLOG_BLCKSZ);
/*
......@@ -4893,6 +6026,7 @@ StartupXLOG(void)
bool backupEndRequired = false;
bool backupFromStandby = false;
DBState dbstate_at_startup;
int firstIdx;
XLogReaderState *xlogreader;
XLogPageReadPrivate private;
bool fast_promoted = false;
......@@ -5257,7 +6391,7 @@ StartupXLOG(void)
lastFullPageWrites = checkPoint.fullPageWrites;
RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
if (RecPtr < checkPoint.redo)
ereport(PANIC,
......@@ -5899,25 +7033,21 @@ StartupXLOG(void)
openLogFile = XLogFileOpen(openLogSegNo);
openLogOff = 0;
Insert = &XLogCtl->Insert;
Insert->PrevRecord = LastRec;
XLogCtl->xlblocks[0] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
firstIdx = XLogRecEndPtrToBufIdx(EndOfLog);
XLogCtl->curridx = firstIdx;
XLogCtl->xlblocks[firstIdx] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
/*
* Tricky point here: readBuf contains the *last* block that the LastRec
* record spans, not the one it starts in. The last block is indeed the
* one we want to use.
*/
if (EndOfLog % XLOG_BLCKSZ == 0)
{
memset(Insert->currpage, 0, XLOG_BLCKSZ);
}
else
{
Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ);
}
Insert->currpos = (char *) Insert->currpage +
(EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]);
Assert(readOff == (XLogCtl->xlblocks[firstIdx] - XLOG_BLCKSZ) % XLogSegSize);
memcpy((char *) &XLogCtl->pages[firstIdx * XLOG_BLCKSZ], xlogreader->readBuf, XLOG_BLCKSZ);
Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
......@@ -5926,12 +7056,12 @@ StartupXLOG(void)
XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog;
freespace = INSERT_FREESPACE(Insert);
freespace = INSERT_FREESPACE(EndOfLog);
if (freespace > 0)
{
/* Make sure rest of page is zero */
MemSet(Insert->currpos, 0, freespace);
XLogCtl->Write.curridx = 0;
MemSet(&XLogCtl->pages[firstIdx * XLOG_BLCKSZ] + EndOfLog % XLOG_BLCKSZ, 0, freespace);
XLogCtl->Write.curridx = firstIdx;
}
else
{
......@@ -5943,7 +7073,7 @@ StartupXLOG(void)
* this is sufficient. The first actual attempt to insert a log
* record will advance the insert state.
*/
XLogCtl->Write.curridx = NextBufIdx(0);
XLogCtl->Write.curridx = NextBufIdx(firstIdx);
}
/* Pre-scan prepared transactions to find out the range of XIDs present */
......@@ -6504,21 +7634,29 @@ InitXLOGAccess(void)
}
/*
* Once spawned, a backend may update its local RedoRecPtr from
* XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
* to do so. This is done in XLogInsert() or GetRedoRecPtr().
* Return the current Redo pointer from shared memory.
*
* As a side-effect, the local RedoRecPtr copy is updated.
*/
XLogRecPtr
GetRedoRecPtr(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr ptr;
/*
* The possibly not up-to-date copy in XlogCtl is enough. Even if we
* grabbed a WAL insertion slot to read the master copy, someone might
* update it just after we've released the lock.
*/
SpinLockAcquire(&xlogctl->info_lck);
Assert(RedoRecPtr <= xlogctl->Insert.RedoRecPtr);
RedoRecPtr = xlogctl->Insert.RedoRecPtr;
ptr = xlogctl->RedoRecPtr;
SpinLockRelease(&xlogctl->info_lck);
if (RedoRecPtr < ptr)
RedoRecPtr = ptr;
return RedoRecPtr;
}
......@@ -6527,9 +7665,8 @@ GetRedoRecPtr(void)
*
* NOTE: The value *actually* returned is the position of the last full
* xlog page. It lags behind the real insert position by at most 1 page.
* For that, we don't need to acquire WALInsertLock which can be quite
* heavily contended, and an approximation is enough for the current
* usage of this function.
* For that, we don't need to scan through WAL insertion slots, and an
* approximation is enough for the current usage of this function.
*/
XLogRecPtr
GetInsertRecPtr(void)
......@@ -6806,6 +7943,8 @@ LogCheckpointEnd(bool restartpoint)
void
CreateCheckPoint(int flags)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
bool shutdown;
CheckPoint checkPoint;
XLogRecPtr recptr;
......@@ -6813,6 +7952,7 @@ CreateCheckPoint(int flags)
XLogRecData rdata;
uint32 freespace;
XLogSegNo _logSegNo;
XLogRecPtr curInsert;
VirtualTransactionId *vxids;
int nvxids;
......@@ -6883,10 +8023,11 @@ CreateCheckPoint(int flags)
checkPoint.oldestActiveXid = InvalidTransactionId;
/*
* We must hold WALInsertLock while examining insert state to determine
* the checkpoint REDO pointer.
* We must block concurrent insertions while examining insert state to
* determine the checkpoint REDO pointer.
*/
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
WALInsertSlotAcquire(true);
curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
/*
* If this isn't a shutdown or forced checkpoint, and we have not inserted
......@@ -6906,14 +8047,11 @@ CreateCheckPoint(int flags)
if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
CHECKPOINT_FORCE)) == 0)
{
XLogRecPtr curInsert;
INSERT_RECPTR(curInsert, Insert, Insert->curridx);
if (curInsert == ControlFile->checkPoint +
MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
{
LWLockRelease(WALInsertLock);
WALInsertSlotRelease();
LWLockRelease(CheckpointLock);
END_CRIT_SECTION();
return;
......@@ -6945,18 +8083,19 @@ CreateCheckPoint(int flags)
* the buffer flush work. Those XLOG records are logically after the
* checkpoint, even though physically before it. Got that?
*/
freespace = INSERT_FREESPACE(Insert);
freespace = INSERT_FREESPACE(curInsert);
if (freespace == 0)
{
(void) AdvanceXLInsertBuffer(false);
/* OK to ignore update return flag, since we will do flush anyway */
freespace = INSERT_FREESPACE(Insert);
if (curInsert % XLogSegSize == 0)
curInsert += SizeOfXLogLongPHD;
else
curInsert += SizeOfXLogShortPHD;
}
INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
checkPoint.redo = curInsert;
/*
* Here we update the shared RedoRecPtr for future XLogInsert calls; this
* must be done while holding the insert lock AND the info_lck.
* must be done while holding the insertion slots.
*
* Note: if we fail to complete the checkpoint, RedoRecPtr will be left
* pointing past where it really needs to point. This is okay; the only
......@@ -6965,20 +8104,18 @@ CreateCheckPoint(int flags)
* XLogInserts that happen while we are dumping buffers must assume that
* their buffer changes are not included in the checkpoint.
*/
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
SpinLockAcquire(&xlogctl->info_lck);
RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
SpinLockRelease(&xlogctl->info_lck);
}
/*
* Now we can release WAL insert lock, allowing other xacts to proceed
* while we are flushing disk buffers.
* Now we can release the WAL insertion slots, allowing other xacts to
* proceed while we are flushing disk buffers.
*/
LWLockRelease(WALInsertLock);
WALInsertSlotRelease();
/* Update the info_lck-protected copy of RedoRecPtr as well */
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->RedoRecPtr = checkPoint.redo;
SpinLockRelease(&xlogctl->info_lck);
/*
* If enabled, log checkpoint start. We postpone this until now so as not
......@@ -7003,10 +8140,11 @@ CreateCheckPoint(int flags)
* we wait till he's out of his commit critical section before proceeding.
* See notes in RecordTransactionCommit().
*
* Because we've already released WALInsertLock, this test is a bit fuzzy:
* it is possible that we will wait for xacts we didn't really need to
* wait for. But the delay should be short and it seems better to make
* checkpoint take a bit longer than to hold locks longer than necessary.
* Because we've already released the insertion slots, this test is a bit
* fuzzy: it is possible that we will wait for xacts we didn't really need
* to wait for. But the delay should be short and it seems better to make
* checkpoint take a bit longer than to hold off insertions longer than
* necessary.
* (In fact, the whole reason we have this issue is that xact.c does
* commit record XLOG insertion and clog update as two separate steps
* protected by different locks, but again that seems best on grounds of
......@@ -7233,10 +8371,10 @@ CreateEndOfRecoveryRecord(void)
xlrec.end_time = time(NULL);
LWLockAcquire(WALInsertLock, LW_SHARED);
WALInsertSlotAcquire(true);
xlrec.ThisTimeLineID = ThisTimeLineID;
xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID;
LWLockRelease(WALInsertLock);
WALInsertSlotRelease();
LocalSetXLogInsertAllowed();
......@@ -7437,15 +8575,18 @@ CreateRestartPoint(int flags)
* the number of segments replayed since last restartpoint, and request a
* restartpoint if it exceeds checkpoint_segments.
*
* You need to hold WALInsertLock and info_lck to update it, although
* during recovery acquiring WALInsertLock is just pro forma, because
* there is no other processes updating Insert.RedoRecPtr.
* Like in CreateCheckPoint(), hold off insertions to update it, although
* during recovery this is just pro forma, because no WAL insertions are
* happening.
*/
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
SpinLockAcquire(&xlogctl->info_lck);
WALInsertSlotAcquire(true);
xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
WALInsertSlotRelease();
/* Also update the info_lck-protected copy */
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->RedoRecPtr = lastCheckPoint.redo;
SpinLockRelease(&xlogctl->info_lck);
LWLockRelease(WALInsertLock);
/*
* Prepare to accumulate statistics.
......@@ -7863,9 +9004,9 @@ UpdateFullPageWrites(void)
*/
if (fullPageWrites)
{
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
WALInsertSlotAcquire(true);
Insert->fullPageWrites = true;
LWLockRelease(WALInsertLock);
WALInsertSlotRelease();
}
/*
......@@ -7886,9 +9027,9 @@ UpdateFullPageWrites(void)
if (!fullPageWrites)
{
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
WALInsertSlotAcquire(true);
Insert->fullPageWrites = false;
LWLockRelease(WALInsertLock);
WALInsertSlotRelease();
}
END_CRIT_SECTION();
}
......@@ -8520,15 +9661,15 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
* Note that forcePageWrites has no effect during an online backup from
* the standby.
*
* We must hold WALInsertLock to change the value of forcePageWrites, to
* ensure adequate interlocking against XLogInsert().
* We must hold all the insertion slots to change the value of
* forcePageWrites, to ensure adequate interlocking against XLogInsert().
*/
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
WALInsertSlotAcquire(true);
if (exclusive)
{
if (XLogCtl->Insert.exclusiveBackup)
{
LWLockRelease(WALInsertLock);
WALInsertSlotRelease();
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("a backup is already in progress"),
......@@ -8539,7 +9680,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
else
XLogCtl->Insert.nonExclusiveBackups++;
XLogCtl->Insert.forcePageWrites = true;
LWLockRelease(WALInsertLock);
WALInsertSlotRelease();
/* Ensure we release forcePageWrites if fail below */
PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
......@@ -8654,13 +9795,13 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
* taking a checkpoint right after another is not that expensive
* either because only few buffers have been dirtied yet.
*/
LWLockAcquire(WALInsertLock, LW_SHARED);
WALInsertSlotAcquire(true);
if (XLogCtl->Insert.lastBackupStart < startpoint)
{
XLogCtl->Insert.lastBackupStart = startpoint;
gotUniqueStartpoint = true;
}
LWLockRelease(WALInsertLock);
WALInsertSlotRelease();
} while (!gotUniqueStartpoint);
XLByteToSeg(startpoint, _logSegNo);
......@@ -8750,7 +9891,7 @@ pg_start_backup_callback(int code, Datum arg)
bool exclusive = DatumGetBool(arg);
/* Update backup counters and forcePageWrites on failure */
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
WALInsertSlotAcquire(true);
if (exclusive)
{
Assert(XLogCtl->Insert.exclusiveBackup);
......@@ -8767,7 +9908,7 @@ pg_start_backup_callback(int code, Datum arg)
{
XLogCtl->Insert.forcePageWrites = false;
}
LWLockRelease(WALInsertLock);
WALInsertSlotRelease();
}
/*
......@@ -8838,7 +9979,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
/*
* OK to update backup counters and forcePageWrites
*/
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
WALInsertSlotAcquire(true);
if (exclusive)
XLogCtl->Insert.exclusiveBackup = false;
else
......@@ -8858,7 +9999,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
{
XLogCtl->Insert.forcePageWrites = false;
}
LWLockRelease(WALInsertLock);
WALInsertSlotRelease();
if (exclusive)
{
......@@ -9143,7 +10284,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
void
do_pg_abort_backup(void)
{
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
WALInsertSlotAcquire(true);
Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
XLogCtl->Insert.nonExclusiveBackups--;
......@@ -9152,7 +10293,7 @@ do_pg_abort_backup(void)
{
XLogCtl->Insert.forcePageWrites = false;
}
LWLockRelease(WALInsertLock);
WALInsertSlotRelease();
}
/*
......@@ -9184,14 +10325,14 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI)
XLogRecPtr
GetXLogInsertRecPtr(void)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogRecPtr current_recptr;
volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
uint64 current_bytepos;
LWLockAcquire(WALInsertLock, LW_SHARED);
INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
LWLockRelease(WALInsertLock);
SpinLockAcquire(&Insert->insertpos_lck);
current_bytepos = Insert->CurrBytePos;
SpinLockRelease(&Insert->insertpos_lck);
return current_recptr;
return XLogBytePosToRecPtr(current_bytepos);
}
/*
......
......@@ -22,6 +22,7 @@
*/
#include "postgres.h"
#include "access/xlog.h"
#include "miscadmin.h"
#include "replication/walsender.h"
#include "storage/lwlock.h"
......@@ -63,6 +64,7 @@ SpinlockSemas(void)
nsemas = NumLWLocks(); /* one for each lwlock */
nsemas += NBuffers; /* one for each buffer header */
nsemas += max_wal_senders; /* one for each wal sender process */
nsemas += num_xloginsert_slots; /* one for each WAL insertion slot */
nsemas += 30; /* plus a bunch for other small-scale use */
return nsemas;
......
......@@ -2037,6 +2037,17 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
{
{"xloginsert_slots", PGC_POSTMASTER, WAL_SETTINGS,
gettext_noop("Sets the number of slots for concurrent xlog insertions."),
NULL,
GUC_NOT_IN_SAMPLE
},
&num_xloginsert_slots,
8, 1, 1000,
NULL, NULL, NULL
},
{
/* see max_connections */
{"max_wal_senders", PGC_POSTMASTER, REPLICATION_SENDING,
......
......@@ -190,6 +190,7 @@ extern char *XLogArchiveCommand;
extern bool EnableHotStandby;
extern bool fullPageWrites;
extern bool log_checkpoints;
extern int num_xloginsert_slots;
/* WAL levels */
typedef enum WalLevel
......
......@@ -93,14 +93,4 @@ typedef uint32 TimeLineID;
#define DEFAULT_SYNC_METHOD SYNC_METHOD_FSYNC
#endif
/*
* Limitation of buffer-alignment for direct IO depends on OS and filesystem,
* but XLOG_BLCKSZ is assumed to be enough for it.
*/
#ifdef O_DIRECT
#define ALIGNOF_XLOG_BUFFER XLOG_BLCKSZ
#else
#define ALIGNOF_XLOG_BUFFER ALIGNOF_BUFFER
#endif
#endif /* XLOG_DEFS_H */
......@@ -53,7 +53,7 @@ typedef enum LWLockId
ProcArrayLock,
SInvalReadLock,
SInvalWriteLock,
WALInsertLock,
WALBufMappingLock,
WALWriteLock,
ControlFileLock,
CheckpointLock,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment