Commit fe702a7b authored by Robert Haas's avatar Robert Haas

Move each SLRU's lwlocks to a separate tranche.

This makes it significantly easier to identify these lwlocks in
LWLOCK_STATS or Trace_lwlocks output.  It's also arguably better
from a modularity standpoint, since lwlock.c no longer needs to
know anything about the LWLock needs of the higher-level SLRU
facility.

Ildus Kurbangaliev, reviewd by Álvaro Herrera and by me.
parent c4059188
...@@ -456,7 +456,7 @@ void ...@@ -456,7 +456,7 @@ void
CLOGShmemInit(void) CLOGShmemInit(void)
{ {
ClogCtl->PagePrecedes = CLOGPagePrecedes; ClogCtl->PagePrecedes = CLOGPagePrecedes;
SimpleLruInit(ClogCtl, "CLOG Ctl", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE, SimpleLruInit(ClogCtl, "clog", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE,
CLogControlLock, "pg_clog"); CLogControlLock, "pg_clog");
} }
......
...@@ -478,7 +478,7 @@ CommitTsShmemInit(void) ...@@ -478,7 +478,7 @@ CommitTsShmemInit(void)
bool found; bool found;
CommitTsCtl->PagePrecedes = CommitTsPagePrecedes; CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0, SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
CommitTsControlLock, "pg_commit_ts"); CommitTsControlLock, "pg_commit_ts");
commitTsShared = ShmemInitStruct("CommitTs shared", commitTsShared = ShmemInitStruct("CommitTs shared",
......
...@@ -1838,10 +1838,10 @@ MultiXactShmemInit(void) ...@@ -1838,10 +1838,10 @@ MultiXactShmemInit(void)
MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes; MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes;
SimpleLruInit(MultiXactOffsetCtl, SimpleLruInit(MultiXactOffsetCtl,
"MultiXactOffset Ctl", NUM_MXACTOFFSET_BUFFERS, 0, "multixact_offset", NUM_MXACTOFFSET_BUFFERS, 0,
MultiXactOffsetControlLock, "pg_multixact/offsets"); MultiXactOffsetControlLock, "pg_multixact/offsets");
SimpleLruInit(MultiXactMemberCtl, SimpleLruInit(MultiXactMemberCtl,
"MultiXactMember Ctl", NUM_MXACTMEMBER_BUFFERS, 0, "multixact_member", NUM_MXACTMEMBER_BUFFERS, 0,
MultiXactMemberControlLock, "pg_multixact/members"); MultiXactMemberControlLock, "pg_multixact/members");
/* Initialize our shared state struct */ /* Initialize our shared state struct */
......
...@@ -152,7 +152,7 @@ SimpleLruShmemSize(int nslots, int nlsns) ...@@ -152,7 +152,7 @@ SimpleLruShmemSize(int nslots, int nlsns)
sz += MAXALIGN(nslots * sizeof(bool)); /* page_dirty[] */ sz += MAXALIGN(nslots * sizeof(bool)); /* page_dirty[] */
sz += MAXALIGN(nslots * sizeof(int)); /* page_number[] */ sz += MAXALIGN(nslots * sizeof(int)); /* page_number[] */
sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */ sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */
sz += MAXALIGN(nslots * sizeof(LWLock *)); /* buffer_locks[] */ sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_locks[] */
if (nlsns > 0) if (nlsns > 0)
sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */ sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */
...@@ -203,8 +203,6 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, ...@@ -203,8 +203,6 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
offset += MAXALIGN(nslots * sizeof(int)); offset += MAXALIGN(nslots * sizeof(int));
shared->page_lru_count = (int *) (ptr + offset); shared->page_lru_count = (int *) (ptr + offset);
offset += MAXALIGN(nslots * sizeof(int)); offset += MAXALIGN(nslots * sizeof(int));
shared->buffer_locks = (LWLock **) (ptr + offset);
offset += MAXALIGN(nslots * sizeof(LWLock *));
if (nlsns > 0) if (nlsns > 0)
{ {
...@@ -212,20 +210,35 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, ...@@ -212,20 +210,35 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));
} }
/* Initialize LWLocks */
shared->buffer_locks = (LWLockPadded *) ShmemAlloc(sizeof(LWLockPadded) * nslots);
Assert(strlen(name) + 1 < SLRU_MAX_NAME_LENGTH);
strlcpy(shared->lwlock_tranche_name, name, SLRU_MAX_NAME_LENGTH);
shared->lwlock_tranche_id = LWLockNewTrancheId();
shared->lwlock_tranche.name = shared->lwlock_tranche_name;
shared->lwlock_tranche.array_base = shared->buffer_locks;
shared->lwlock_tranche.array_stride = sizeof(LWLockPadded);
ptr += BUFFERALIGN(offset); ptr += BUFFERALIGN(offset);
for (slotno = 0; slotno < nslots; slotno++) for (slotno = 0; slotno < nslots; slotno++)
{ {
LWLockInitialize(&shared->buffer_locks[slotno].lock,
shared->lwlock_tranche_id);
shared->page_buffer[slotno] = ptr; shared->page_buffer[slotno] = ptr;
shared->page_status[slotno] = SLRU_PAGE_EMPTY; shared->page_status[slotno] = SLRU_PAGE_EMPTY;
shared->page_dirty[slotno] = false; shared->page_dirty[slotno] = false;
shared->page_lru_count[slotno] = 0; shared->page_lru_count[slotno] = 0;
shared->buffer_locks[slotno] = LWLockAssign();
ptr += BLCKSZ; ptr += BLCKSZ;
} }
} }
else else
Assert(found); Assert(found);
/* Register SLRU tranche in the main tranches array */
LWLockRegisterTranche(shared->lwlock_tranche_id, &shared->lwlock_tranche);
/* /*
* Initialize the unshared control struct, including directory path. We * Initialize the unshared control struct, including directory path. We
* assume caller set PagePrecedes. * assume caller set PagePrecedes.
...@@ -308,8 +321,8 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno) ...@@ -308,8 +321,8 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
/* See notes at top of file */ /* See notes at top of file */
LWLockRelease(shared->ControlLock); LWLockRelease(shared->ControlLock);
LWLockAcquire(shared->buffer_locks[slotno], LW_SHARED); LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED);
LWLockRelease(shared->buffer_locks[slotno]); LWLockRelease(&shared->buffer_locks[slotno].lock);
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
/* /*
...@@ -323,7 +336,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno) ...@@ -323,7 +336,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS || if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS) shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS)
{ {
if (LWLockConditionalAcquire(shared->buffer_locks[slotno], LW_SHARED)) if (LWLockConditionalAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED))
{ {
/* indeed, the I/O must have failed */ /* indeed, the I/O must have failed */
if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS) if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
...@@ -333,7 +346,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno) ...@@ -333,7 +346,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
shared->page_status[slotno] = SLRU_PAGE_VALID; shared->page_status[slotno] = SLRU_PAGE_VALID;
shared->page_dirty[slotno] = true; shared->page_dirty[slotno] = true;
} }
LWLockRelease(shared->buffer_locks[slotno]); LWLockRelease(&shared->buffer_locks[slotno].lock);
} }
} }
} }
...@@ -402,7 +415,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, ...@@ -402,7 +415,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
shared->page_dirty[slotno] = false; shared->page_dirty[slotno] = false;
/* Acquire per-buffer lock (cannot deadlock, see notes at top) */ /* Acquire per-buffer lock (cannot deadlock, see notes at top) */
LWLockAcquire(shared->buffer_locks[slotno], LW_EXCLUSIVE); LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
/* Release control lock while doing I/O */ /* Release control lock while doing I/O */
LWLockRelease(shared->ControlLock); LWLockRelease(shared->ControlLock);
...@@ -422,7 +435,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, ...@@ -422,7 +435,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
shared->page_status[slotno] = ok ? SLRU_PAGE_VALID : SLRU_PAGE_EMPTY; shared->page_status[slotno] = ok ? SLRU_PAGE_VALID : SLRU_PAGE_EMPTY;
LWLockRelease(shared->buffer_locks[slotno]); LWLockRelease(&shared->buffer_locks[slotno].lock);
/* Now it's okay to ereport if we failed */ /* Now it's okay to ereport if we failed */
if (!ok) if (!ok)
...@@ -518,7 +531,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata) ...@@ -518,7 +531,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
shared->page_dirty[slotno] = false; shared->page_dirty[slotno] = false;
/* Acquire per-buffer lock (cannot deadlock, see notes at top) */ /* Acquire per-buffer lock (cannot deadlock, see notes at top) */
LWLockAcquire(shared->buffer_locks[slotno], LW_EXCLUSIVE); LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
/* Release control lock while doing I/O */ /* Release control lock while doing I/O */
LWLockRelease(shared->ControlLock); LWLockRelease(shared->ControlLock);
...@@ -547,7 +560,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata) ...@@ -547,7 +560,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
shared->page_status[slotno] = SLRU_PAGE_VALID; shared->page_status[slotno] = SLRU_PAGE_VALID;
LWLockRelease(shared->buffer_locks[slotno]); LWLockRelease(&shared->buffer_locks[slotno].lock);
/* Now it's okay to ereport if we failed */ /* Now it's okay to ereport if we failed */
if (!ok) if (!ok)
......
...@@ -178,7 +178,7 @@ void ...@@ -178,7 +178,7 @@ void
SUBTRANSShmemInit(void) SUBTRANSShmemInit(void)
{ {
SubTransCtl->PagePrecedes = SubTransPagePrecedes; SubTransCtl->PagePrecedes = SubTransPagePrecedes;
SimpleLruInit(SubTransCtl, "SUBTRANS Ctl", NUM_SUBTRANS_BUFFERS, 0, SimpleLruInit(SubTransCtl, "subtrans", NUM_SUBTRANS_BUFFERS, 0,
SubtransControlLock, "pg_subtrans"); SubtransControlLock, "pg_subtrans");
/* Override default assumption that writes should be fsync'd */ /* Override default assumption that writes should be fsync'd */
SubTransCtl->do_fsync = false; SubTransCtl->do_fsync = false;
......
...@@ -479,7 +479,7 @@ AsyncShmemInit(void) ...@@ -479,7 +479,7 @@ AsyncShmemInit(void)
* Set up SLRU management of the pg_notify data. * Set up SLRU management of the pg_notify data.
*/ */
AsyncCtl->PagePrecedes = asyncQueuePagePrecedes; AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0, SimpleLruInit(AsyncCtl, "async", NUM_ASYNC_BUFFERS, 0,
AsyncCtlLock, "pg_notify"); AsyncCtlLock, "pg_notify");
/* Override default assumption that writes should be fsync'd */ /* Override default assumption that writes should be fsync'd */
AsyncCtl->do_fsync = false; AsyncCtl->do_fsync = false;
......
...@@ -76,11 +76,6 @@ ...@@ -76,11 +76,6 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "access/clog.h"
#include "access/commit_ts.h"
#include "access/multixact.h"
#include "access/subtrans.h"
#include "commands/async.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pg_trace.h" #include "pg_trace.h"
#include "postmaster/postmaster.h" #include "postmaster/postmaster.h"
...@@ -364,24 +359,6 @@ NumLWLocks(void) ...@@ -364,24 +359,6 @@ NumLWLocks(void)
/* proc.c needs one for each backend or auxiliary process */ /* proc.c needs one for each backend or auxiliary process */
numLocks += MaxBackends + NUM_AUXILIARY_PROCS; numLocks += MaxBackends + NUM_AUXILIARY_PROCS;
/* clog.c needs one per CLOG buffer */
numLocks += CLOGShmemBuffers();
/* commit_ts.c needs one per CommitTs buffer */
numLocks += CommitTsShmemBuffers();
/* subtrans.c needs one per SubTrans buffer */
numLocks += NUM_SUBTRANS_BUFFERS;
/* multixact.c needs two SLRU areas */
numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
/* async.c needs one per Async buffer */
numLocks += NUM_ASYNC_BUFFERS;
/* predicate.c needs one per old serializable xid buffer */
numLocks += NUM_OLDSERXID_BUFFERS;
/* slot.c needs one for each slot */ /* slot.c needs one for each slot */
numLocks += max_replication_slots; numLocks += max_replication_slots;
......
...@@ -794,7 +794,7 @@ OldSerXidInit(void) ...@@ -794,7 +794,7 @@ OldSerXidInit(void)
* Set up SLRU management of the pg_serial data. * Set up SLRU management of the pg_serial data.
*/ */
OldSerXidSlruCtl->PagePrecedes = OldSerXidPagePrecedesLogically; OldSerXidSlruCtl->PagePrecedes = OldSerXidPagePrecedesLogically;
SimpleLruInit(OldSerXidSlruCtl, "OldSerXid SLRU Ctl", SimpleLruInit(OldSerXidSlruCtl, "oldserxid",
NUM_OLDSERXID_BUFFERS, 0, OldSerXidLock, "pg_serial"); NUM_OLDSERXID_BUFFERS, 0, OldSerXidLock, "pg_serial");
/* Override default assumption that writes should be fsync'd */ /* Override default assumption that writes should be fsync'd */
OldSerXidSlruCtl->do_fsync = false; OldSerXidSlruCtl->do_fsync = false;
......
...@@ -36,6 +36,9 @@ ...@@ -36,6 +36,9 @@
*/ */
#define SLRU_PAGES_PER_SEGMENT 32 #define SLRU_PAGES_PER_SEGMENT 32
/* Maximum length of an SLRU name */
#define SLRU_MAX_NAME_LENGTH 32
/* /*
* Page status codes. Note that these do not include the "dirty" bit. * Page status codes. Note that these do not include the "dirty" bit.
* page_dirty can be TRUE only in the VALID or WRITE_IN_PROGRESS states; * page_dirty can be TRUE only in the VALID or WRITE_IN_PROGRESS states;
...@@ -69,7 +72,6 @@ typedef struct SlruSharedData ...@@ -69,7 +72,6 @@ typedef struct SlruSharedData
bool *page_dirty; bool *page_dirty;
int *page_number; int *page_number;
int *page_lru_count; int *page_lru_count;
LWLock **buffer_locks;
/* /*
* Optional array of WAL flush LSNs associated with entries in the SLRU * Optional array of WAL flush LSNs associated with entries in the SLRU
...@@ -99,6 +101,12 @@ typedef struct SlruSharedData ...@@ -99,6 +101,12 @@ typedef struct SlruSharedData
* the latest page. * the latest page.
*/ */
int latest_page_number; int latest_page_number;
/* LWLocks */
int lwlock_tranche_id;
LWLockTranche lwlock_tranche;
char lwlock_tranche_name[SLRU_MAX_NAME_LENGTH];
LWLockPadded *buffer_locks;
} SlruSharedData; } SlruSharedData;
typedef SlruSharedData *SlruShared; typedef SlruSharedData *SlruShared;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment