Commit ea9df812 authored by Robert Haas's avatar Robert Haas

Relax the requirement that all lwlocks be stored in a single array.

This makes it possible to store lwlocks as part of some other data
structure in the main shared memory segment, or in a dynamic shared
memory segment.  There is still a main LWLock array and this patch does
not move anything out of it, but it provides necessary infrastructure
for doing that in the future.

This change is likely to increase the size of LWLockPadded on some
platforms, especially 32-bit platforms where it was previously only
16 bytes.

Patch by me.  Review by Andres Freund and KaiGai Kohei.
parent f62eba20
...@@ -116,7 +116,7 @@ pg_buffercache_pages(PG_FUNCTION_ARGS) ...@@ -116,7 +116,7 @@ pg_buffercache_pages(PG_FUNCTION_ARGS)
* possible deadlocks. * possible deadlocks.
*/ */
for (i = 0; i < NUM_BUFFER_PARTITIONS; i++) for (i = 0; i < NUM_BUFFER_PARTITIONS; i++)
LWLockAcquire(FirstBufMappingLock + i, LW_SHARED); LWLockAcquire(BufMappingPartitionLockByIndex(i), LW_SHARED);
/* /*
* Scan though all the buffers, saving the relevant fields in the * Scan though all the buffers, saving the relevant fields in the
...@@ -157,7 +157,7 @@ pg_buffercache_pages(PG_FUNCTION_ARGS) ...@@ -157,7 +157,7 @@ pg_buffercache_pages(PG_FUNCTION_ARGS)
* avoids O(N^2) behavior inside LWLockRelease. * avoids O(N^2) behavior inside LWLockRelease.
*/ */
for (i = NUM_BUFFER_PARTITIONS; --i >= 0;) for (i = NUM_BUFFER_PARTITIONS; --i >= 0;)
LWLockRelease(FirstBufMappingLock + i); LWLockRelease(BufMappingPartitionLockByIndex(i));
} }
funcctx = SRF_PERCALL_SETUP(); funcctx = SRF_PERCALL_SETUP();
......
...@@ -150,7 +150,7 @@ typedef struct pgssEntry ...@@ -150,7 +150,7 @@ typedef struct pgssEntry
*/ */
typedef struct pgssSharedState typedef struct pgssSharedState
{ {
LWLockId lock; /* protects hashtable search/modification */ LWLock *lock; /* protects hashtable search/modification */
int query_size; /* max query length in bytes */ int query_size; /* max query length in bytes */
double cur_median_usage; /* current median usage in hashtable */ double cur_median_usage; /* current median usage in hashtable */
} pgssSharedState; } pgssSharedState;
......
...@@ -2212,49 +2212,55 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, ...@@ -2212,49 +2212,55 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
</row> </row>
<row> <row>
<entry>lwlock-acquire</entry> <entry>lwlock-acquire</entry>
<entry>(LWLockId, LWLockMode)</entry> <entry>(char *, int, LWLockMode)</entry>
<entry>Probe that fires when an LWLock has been acquired. <entry>Probe that fires when an LWLock has been acquired.
arg0 is the LWLock's ID. arg0 is the LWLock's tranche.
arg1 is the requested lock mode, either exclusive or shared.</entry> arg1 is the LWLock's offset within its trance.
arg2 is the requested lock mode, either exclusive or shared.</entry>
</row> </row>
<row> <row>
<entry>lwlock-release</entry> <entry>lwlock-release</entry>
<entry>(LWLockId)</entry> <entry>(char *, int)</entry>
<entry>Probe that fires when an LWLock has been released (but note <entry>Probe that fires when an LWLock has been released (but note
that any released waiters have not yet been awakened). that any released waiters have not yet been awakened).
arg0 is the LWLock's ID.</entry> arg0 is the LWLock's tranche.
arg1 is the LWLock's offset within its trance.</entry>
</row> </row>
<row> <row>
<entry>lwlock-wait-start</entry> <entry>lwlock-wait-start</entry>
<entry>(LWLockId, LWLockMode)</entry> <entry>(char *, int, LWLockMode)</entry>
<entry>Probe that fires when an LWLock was not immediately available and <entry>Probe that fires when an LWLock was not immediately available and
a server process has begun to wait for the lock to become available. a server process has begun to wait for the lock to become available.
arg0 is the LWLock's ID. arg0 is the LWLock's tranche.
arg1 is the requested lock mode, either exclusive or shared.</entry> arg1 is the LWLock's offset within its trance.
arg2 is the requested lock mode, either exclusive or shared.</entry>
</row> </row>
<row> <row>
<entry>lwlock-wait-done</entry> <entry>lwlock-wait-done</entry>
<entry>(LWLockId, LWLockMode)</entry> <entry>(char *, int, LWLockMode)</entry>
<entry>Probe that fires when a server process has been released from its <entry>Probe that fires when a server process has been released from its
wait for an LWLock (it does not actually have the lock yet). wait for an LWLock (it does not actually have the lock yet).
arg0 is the LWLock's ID. arg0 is the LWLock's tranche.
arg1 is the requested lock mode, either exclusive or shared.</entry> arg1 is the LWLock's offset within its trance.
arg2 is the requested lock mode, either exclusive or shared.</entry>
</row> </row>
<row> <row>
<entry>lwlock-condacquire</entry> <entry>lwlock-condacquire</entry>
<entry>(LWLockId, LWLockMode)</entry> <entry>(char *, int, LWLockMode)</entry>
<entry>Probe that fires when an LWLock was successfully acquired when the <entry>Probe that fires when an LWLock was successfully acquired when the
caller specified no waiting. caller specified no waiting.
arg0 is the LWLock's ID. arg0 is the LWLock's tranche.
arg1 is the requested lock mode, either exclusive or shared.</entry> arg1 is the LWLock's offset within its trance.
arg2 is the requested lock mode, either exclusive or shared.</entry>
</row> </row>
<row> <row>
<entry>lwlock-condacquire-fail</entry> <entry>lwlock-condacquire-fail</entry>
<entry>(LWLockId, LWLockMode)</entry> <entry>(char *, int, LWLockMode)</entry>
<entry>Probe that fires when an LWLock was not successfully acquired when <entry>Probe that fires when an LWLock was not successfully acquired when
the caller specified no waiting. the caller specified no waiting.
arg0 is the LWLock's ID. arg0 is the LWLock's tranche.
arg1 is the requested lock mode, either exclusive or shared.</entry> arg1 is the LWLock's offset within its trance.
arg2 is the requested lock mode, either exclusive or shared.</entry>
</row> </row>
<row> <row>
<entry>lock-wait-start</entry> <entry>lock-wait-start</entry>
...@@ -2299,10 +2305,6 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, ...@@ -2299,10 +2305,6 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
<entry>LocalTransactionId</entry> <entry>LocalTransactionId</entry>
<entry>unsigned int</entry> <entry>unsigned int</entry>
</row> </row>
<row>
<entry>LWLockId</entry>
<entry>int</entry>
</row>
<row> <row>
<entry>LWLockMode</entry> <entry>LWLockMode</entry>
<entry>int</entry> <entry>int</entry>
......
...@@ -151,7 +151,7 @@ SimpleLruShmemSize(int nslots, int nlsns) ...@@ -151,7 +151,7 @@ SimpleLruShmemSize(int nslots, int nlsns)
sz += MAXALIGN(nslots * sizeof(bool)); /* page_dirty[] */ sz += MAXALIGN(nslots * sizeof(bool)); /* page_dirty[] */
sz += MAXALIGN(nslots * sizeof(int)); /* page_number[] */ sz += MAXALIGN(nslots * sizeof(int)); /* page_number[] */
sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */ sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */
sz += MAXALIGN(nslots * sizeof(LWLockId)); /* buffer_locks[] */ sz += MAXALIGN(nslots * sizeof(LWLock *)); /* buffer_locks[] */
if (nlsns > 0) if (nlsns > 0)
sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */ sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */
...@@ -161,7 +161,7 @@ SimpleLruShmemSize(int nslots, int nlsns) ...@@ -161,7 +161,7 @@ SimpleLruShmemSize(int nslots, int nlsns)
void void
SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
LWLockId ctllock, const char *subdir) LWLock *ctllock, const char *subdir)
{ {
SlruShared shared; SlruShared shared;
bool found; bool found;
...@@ -202,8 +202,8 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, ...@@ -202,8 +202,8 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
offset += MAXALIGN(nslots * sizeof(int)); offset += MAXALIGN(nslots * sizeof(int));
shared->page_lru_count = (int *) (ptr + offset); shared->page_lru_count = (int *) (ptr + offset);
offset += MAXALIGN(nslots * sizeof(int)); offset += MAXALIGN(nslots * sizeof(int));
shared->buffer_locks = (LWLockId *) (ptr + offset); shared->buffer_locks = (LWLock **) (ptr + offset);
offset += MAXALIGN(nslots * sizeof(LWLockId)); offset += MAXALIGN(nslots * sizeof(LWLock *));
if (nlsns > 0) if (nlsns > 0)
{ {
......
...@@ -448,8 +448,6 @@ typedef struct ...@@ -448,8 +448,6 @@ typedef struct
typedef int InheritableSocket; typedef int InheritableSocket;
#endif #endif
typedef struct LWLock LWLock; /* ugly kluge */
/* /*
* Structure contains all variables passed to exec:ed backends * Structure contains all variables passed to exec:ed backends
*/ */
...@@ -473,7 +471,7 @@ typedef struct ...@@ -473,7 +471,7 @@ typedef struct
#ifndef HAVE_SPINLOCKS #ifndef HAVE_SPINLOCKS
PGSemaphore SpinlockSemaArray; PGSemaphore SpinlockSemaArray;
#endif #endif
LWLock *LWLockArray; LWLock *MainLWLockArray;
slock_t *ProcStructLock; slock_t *ProcStructLock;
PROC_HDR *ProcGlobal; PROC_HDR *ProcGlobal;
PGPROC *AuxiliaryProcs; PGPROC *AuxiliaryProcs;
...@@ -5576,7 +5574,6 @@ PostmasterMarkPIDForWorkerNotify(int pid) ...@@ -5576,7 +5574,6 @@ PostmasterMarkPIDForWorkerNotify(int pid)
* functions. They are marked NON_EXEC_STATIC in their home modules. * functions. They are marked NON_EXEC_STATIC in their home modules.
*/ */
extern slock_t *ShmemLock; extern slock_t *ShmemLock;
extern LWLock *LWLockArray;
extern slock_t *ProcStructLock; extern slock_t *ProcStructLock;
extern PGPROC *AuxiliaryProcs; extern PGPROC *AuxiliaryProcs;
extern PMSignalData *PMSignalState; extern PMSignalData *PMSignalState;
...@@ -5625,7 +5622,7 @@ save_backend_variables(BackendParameters *param, Port *port, ...@@ -5625,7 +5622,7 @@ save_backend_variables(BackendParameters *param, Port *port,
#ifndef HAVE_SPINLOCKS #ifndef HAVE_SPINLOCKS
param->SpinlockSemaArray = SpinlockSemaArray; param->SpinlockSemaArray = SpinlockSemaArray;
#endif #endif
param->LWLockArray = LWLockArray; param->MainLWLockArray = MainLWLockArray;
param->ProcStructLock = ProcStructLock; param->ProcStructLock = ProcStructLock;
param->ProcGlobal = ProcGlobal; param->ProcGlobal = ProcGlobal;
param->AuxiliaryProcs = AuxiliaryProcs; param->AuxiliaryProcs = AuxiliaryProcs;
...@@ -5856,7 +5853,7 @@ restore_backend_variables(BackendParameters *param, Port *port) ...@@ -5856,7 +5853,7 @@ restore_backend_variables(BackendParameters *param, Port *port)
#ifndef HAVE_SPINLOCKS #ifndef HAVE_SPINLOCKS
SpinlockSemaArray = param->SpinlockSemaArray; SpinlockSemaArray = param->SpinlockSemaArray;
#endif #endif
LWLockArray = param->LWLockArray; MainLWLockArray = param->MainLWLockArray;
ProcStructLock = param->ProcStructLock; ProcStructLock = param->ProcStructLock;
ProcGlobal = param->ProcGlobal; ProcGlobal = param->ProcGlobal;
AuxiliaryProcs = param->AuxiliaryProcs; AuxiliaryProcs = param->AuxiliaryProcs;
......
...@@ -146,7 +146,7 @@ PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum) ...@@ -146,7 +146,7 @@ PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
{ {
BufferTag newTag; /* identity of requested block */ BufferTag newTag; /* identity of requested block */
uint32 newHash; /* hash value for newTag */ uint32 newHash; /* hash value for newTag */
LWLockId newPartitionLock; /* buffer partition lock for it */ LWLock *newPartitionLock; /* buffer partition lock for it */
int buf_id; int buf_id;
/* create a tag so we can lookup the buffer */ /* create a tag so we can lookup the buffer */
...@@ -539,10 +539,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, ...@@ -539,10 +539,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
{ {
BufferTag newTag; /* identity of requested block */ BufferTag newTag; /* identity of requested block */
uint32 newHash; /* hash value for newTag */ uint32 newHash; /* hash value for newTag */
LWLockId newPartitionLock; /* buffer partition lock for it */ LWLock *newPartitionLock; /* buffer partition lock for it */
BufferTag oldTag; /* previous identity of selected buffer */ BufferTag oldTag; /* previous identity of selected buffer */
uint32 oldHash; /* hash value for oldTag */ uint32 oldHash; /* hash value for oldTag */
LWLockId oldPartitionLock; /* buffer partition lock for it */ LWLock *oldPartitionLock; /* buffer partition lock for it */
BufFlags oldFlags; BufFlags oldFlags;
int buf_id; int buf_id;
volatile BufferDesc *buf; volatile BufferDesc *buf;
...@@ -891,7 +891,7 @@ InvalidateBuffer(volatile BufferDesc *buf) ...@@ -891,7 +891,7 @@ InvalidateBuffer(volatile BufferDesc *buf)
{ {
BufferTag oldTag; BufferTag oldTag;
uint32 oldHash; /* hash value for oldTag */ uint32 oldHash; /* hash value for oldTag */
LWLockId oldPartitionLock; /* buffer partition lock for it */ LWLock *oldPartitionLock; /* buffer partition lock for it */
BufFlags oldFlags; BufFlags oldFlags;
/* Save the original buffer tag before dropping the spinlock */ /* Save the original buffer tag before dropping the spinlock */
......
...@@ -182,7 +182,6 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) ...@@ -182,7 +182,6 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
* Now initialize LWLocks, which do shared memory allocation and are * Now initialize LWLocks, which do shared memory allocation and are
* needed for InitShmemIndex. * needed for InitShmemIndex.
*/ */
if (!IsUnderPostmaster)
CreateLWLocks(); CreateLWLocks();
/* /*
......
...@@ -565,7 +565,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) ...@@ -565,7 +565,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
LOCALLOCK *locallock; LOCALLOCK *locallock;
LOCK *lock; LOCK *lock;
PROCLOCK *proclock; PROCLOCK *proclock;
LWLockId partitionLock; LWLock *partitionLock;
bool hasWaiters = false; bool hasWaiters = false;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
...@@ -702,7 +702,7 @@ LockAcquireExtended(const LOCKTAG *locktag, ...@@ -702,7 +702,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
bool found; bool found;
ResourceOwner owner; ResourceOwner owner;
uint32 hashcode; uint32 hashcode;
LWLockId partitionLock; LWLock *partitionLock;
int status; int status;
bool log_lock = false; bool log_lock = false;
...@@ -1744,7 +1744,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) ...@@ -1744,7 +1744,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
LOCALLOCK *locallock; LOCALLOCK *locallock;
LOCK *lock; LOCK *lock;
PROCLOCK *proclock; PROCLOCK *proclock;
LWLockId partitionLock; LWLock *partitionLock;
bool wakeupNeeded; bool wakeupNeeded;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
...@@ -2096,10 +2096,12 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) ...@@ -2096,10 +2096,12 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
*/ */
for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
{ {
LWLockId partitionLock = FirstLockMgrLock + partition; LWLock *partitionLock;
SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]);
PROCLOCK *nextplock; PROCLOCK *nextplock;
partitionLock = LockHashPartitionLockByIndex(partition);
/* /*
* If the proclock list for this partition is empty, we can skip * If the proclock list for this partition is empty, we can skip
* acquiring the partition lock. This optimization is trickier than * acquiring the partition lock. This optimization is trickier than
...@@ -2475,7 +2477,7 @@ static bool ...@@ -2475,7 +2477,7 @@ static bool
FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag, FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
uint32 hashcode) uint32 hashcode)
{ {
LWLockId partitionLock = LockHashPartitionLock(hashcode); LWLock *partitionLock = LockHashPartitionLock(hashcode);
Oid relid = locktag->locktag_field2; Oid relid = locktag->locktag_field2;
uint32 i; uint32 i;
...@@ -2565,7 +2567,7 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock) ...@@ -2565,7 +2567,7 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock)
LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD]; LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
LOCKTAG *locktag = &locallock->tag.lock; LOCKTAG *locktag = &locallock->tag.lock;
PROCLOCK *proclock = NULL; PROCLOCK *proclock = NULL;
LWLockId partitionLock = LockHashPartitionLock(locallock->hashcode); LWLock *partitionLock = LockHashPartitionLock(locallock->hashcode);
Oid relid = locktag->locktag_field2; Oid relid = locktag->locktag_field2;
uint32 f; uint32 f;
...@@ -2671,7 +2673,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -2671,7 +2673,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
SHM_QUEUE *procLocks; SHM_QUEUE *procLocks;
PROCLOCK *proclock; PROCLOCK *proclock;
uint32 hashcode; uint32 hashcode;
LWLockId partitionLock; LWLock *partitionLock;
int count = 0; int count = 0;
int fast_count = 0; int fast_count = 0;
...@@ -2883,7 +2885,7 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc, ...@@ -2883,7 +2885,7 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
PROCLOCKTAG proclocktag; PROCLOCKTAG proclocktag;
uint32 hashcode; uint32 hashcode;
uint32 proclock_hashcode; uint32 proclock_hashcode;
LWLockId partitionLock; LWLock *partitionLock;
bool wakeupNeeded; bool wakeupNeeded;
hashcode = LockTagHashCode(locktag); hashcode = LockTagHashCode(locktag);
...@@ -3159,10 +3161,12 @@ PostPrepare_Locks(TransactionId xid) ...@@ -3159,10 +3161,12 @@ PostPrepare_Locks(TransactionId xid)
*/ */
for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
{ {
LWLockId partitionLock = FirstLockMgrLock + partition; LWLock *partitionLock;
SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]);
PROCLOCK *nextplock; PROCLOCK *nextplock;
partitionLock = LockHashPartitionLockByIndex(partition);
/* /*
* If the proclock list for this partition is empty, we can skip * If the proclock list for this partition is empty, we can skip
* acquiring the partition lock. This optimization is safer than the * acquiring the partition lock. This optimization is safer than the
...@@ -3400,7 +3404,7 @@ GetLockStatusData(void) ...@@ -3400,7 +3404,7 @@ GetLockStatusData(void)
* Must grab LWLocks in partition-number order to avoid LWLock deadlock. * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
*/ */
for (i = 0; i < NUM_LOCK_PARTITIONS; i++) for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
LWLockAcquire(FirstLockMgrLock + i, LW_SHARED); LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
/* Now we can safely count the number of proclocks */ /* Now we can safely count the number of proclocks */
data->nelements = el + hash_get_num_entries(LockMethodProcLockHash); data->nelements = el + hash_get_num_entries(LockMethodProcLockHash);
...@@ -3442,7 +3446,7 @@ GetLockStatusData(void) ...@@ -3442,7 +3446,7 @@ GetLockStatusData(void)
* behavior inside LWLockRelease. * behavior inside LWLockRelease.
*/ */
for (i = NUM_LOCK_PARTITIONS; --i >= 0;) for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
LWLockRelease(FirstLockMgrLock + i); LWLockRelease(LockHashPartitionLockByIndex(i));
Assert(el == data->nelements); Assert(el == data->nelements);
...@@ -3477,7 +3481,7 @@ GetRunningTransactionLocks(int *nlocks) ...@@ -3477,7 +3481,7 @@ GetRunningTransactionLocks(int *nlocks)
* Must grab LWLocks in partition-number order to avoid LWLock deadlock. * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
*/ */
for (i = 0; i < NUM_LOCK_PARTITIONS; i++) for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
LWLockAcquire(FirstLockMgrLock + i, LW_SHARED); LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
/* Now we can safely count the number of proclocks */ /* Now we can safely count the number of proclocks */
els = hash_get_num_entries(LockMethodProcLockHash); els = hash_get_num_entries(LockMethodProcLockHash);
...@@ -3537,7 +3541,7 @@ GetRunningTransactionLocks(int *nlocks) ...@@ -3537,7 +3541,7 @@ GetRunningTransactionLocks(int *nlocks)
* behavior inside LWLockRelease. * behavior inside LWLockRelease.
*/ */
for (i = NUM_LOCK_PARTITIONS; --i >= 0;) for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
LWLockRelease(FirstLockMgrLock + i); LWLockRelease(LockHashPartitionLockByIndex(i));
*nlocks = index; *nlocks = index;
return accessExclusiveLocks; return accessExclusiveLocks;
...@@ -3673,7 +3677,7 @@ lock_twophase_recover(TransactionId xid, uint16 info, ...@@ -3673,7 +3677,7 @@ lock_twophase_recover(TransactionId xid, uint16 info,
uint32 hashcode; uint32 hashcode;
uint32 proclock_hashcode; uint32 proclock_hashcode;
int partition; int partition;
LWLockId partitionLock; LWLock *partitionLock;
LockMethod lockMethodTable; LockMethod lockMethodTable;
Assert(len == sizeof(TwoPhaseLockRecord)); Assert(len == sizeof(TwoPhaseLockRecord));
...@@ -4044,7 +4048,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) ...@@ -4044,7 +4048,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
{ {
PROCLOCK *proclock; PROCLOCK *proclock;
uint32 hashcode; uint32 hashcode;
LWLockId partitionLock; LWLock *partitionLock;
hashcode = LockTagHashCode(&tag); hashcode = LockTagHashCode(&tag);
......
...@@ -31,50 +31,37 @@ ...@@ -31,50 +31,37 @@
#include "storage/predicate.h" #include "storage/predicate.h"
#include "storage/proc.h" #include "storage/proc.h"
#include "storage/spin.h" #include "storage/spin.h"
#include "utils/memutils.h"
#ifdef LWLOCK_STATS
#include "utils/hsearch.h"
#endif
/* We use the ShmemLock spinlock to protect LWLockAssign */ /* We use the ShmemLock spinlock to protect LWLockAssign */
extern slock_t *ShmemLock; extern slock_t *ShmemLock;
typedef struct LWLock
{
slock_t mutex; /* Protects LWLock and queue of PGPROCs */
bool releaseOK; /* T if ok to release waiters */
char exclusive; /* # of exclusive holders (0 or 1) */
int shared; /* # of shared holders (0..MaxBackends) */
PGPROC *head; /* head of list of waiting PGPROCs */
PGPROC *tail; /* tail of list of waiting PGPROCs */
/* tail is undefined when head is NULL */
} LWLock;
/* /*
* All the LWLock structs are allocated as an array in shared memory. * This is indexed by tranche ID and stores metadata for all tranches known
* (LWLockIds are indexes into the array.) We force the array stride to * to the current backend.
* be a power of 2, which saves a few cycles in indexing, but more
* importantly also ensures that individual LWLocks don't cross cache line
* boundaries. This reduces cache contention problems, especially on AMD
* Opterons. (Of course, we have to also ensure that the array start
* address is suitably aligned.)
*
* LWLock is between 16 and 32 bytes on all known platforms, so these two
* cases are sufficient.
*/ */
#define LWLOCK_PADDED_SIZE (sizeof(LWLock) <= 16 ? 16 : 32) static LWLockTranche **LWLockTrancheArray = NULL;
static int LWLockTranchesAllocated = 0;
typedef union LWLockPadded #define T_NAME(lock) \
{ (LWLockTrancheArray[(lock)->tranche]->name)
LWLock lock; #define T_ID(lock) \
char pad[LWLOCK_PADDED_SIZE]; ((int) ((((char *) lock) - \
} LWLockPadded; ((char *) LWLockTrancheArray[(lock)->tranche]->array_base)) / \
LWLockTrancheArray[(lock)->tranche]->array_stride))
/* /*
* This points to the array of LWLocks in shared memory. Backends inherit * This points to the main array of LWLocks in shared memory. Backends inherit
* the pointer by fork from the postmaster (except in the EXEC_BACKEND case, * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
* where we have special measures to pass it down). * where we have special measures to pass it down).
*/ */
NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL; LWLockPadded *MainLWLockArray = NULL;
static LWLockTranche MainLWLockTranche;
/* /*
* We use this structure to keep track of locked LWLocks for release * We use this structure to keep track of locked LWLocks for release
...@@ -85,58 +72,78 @@ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL; ...@@ -85,58 +72,78 @@ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
#define MAX_SIMUL_LWLOCKS 100 #define MAX_SIMUL_LWLOCKS 100
static int num_held_lwlocks = 0; static int num_held_lwlocks = 0;
static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS]; static LWLock *held_lwlocks[MAX_SIMUL_LWLOCKS];
static int lock_addin_request = 0; static int lock_addin_request = 0;
static bool lock_addin_request_allowed = true; static bool lock_addin_request_allowed = true;
#ifdef LWLOCK_STATS #ifdef LWLOCK_STATS
typedef struct lwlock_stats_key
{
int tranche;
int instance;
} lwlock_stats_key;
typedef struct lwlock_stats
{
lwlock_stats_key key;
int sh_acquire_count;
int ex_acquire_count;
int block_count;
int spin_delay_count;
} lwlock_stats;
static int counts_for_pid = 0; static int counts_for_pid = 0;
static int *sh_acquire_counts; static HTAB *lwlock_stats_htab;
static int *ex_acquire_counts;
static int *block_counts;
static int *spin_delay_counts;
#endif #endif
#ifdef LOCK_DEBUG #ifdef LOCK_DEBUG
bool Trace_lwlocks = false; bool Trace_lwlocks = false;
inline static void inline static void
PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock) PRINT_LWDEBUG(const char *where, const volatile LWLock *lock)
{ {
if (Trace_lwlocks) if (Trace_lwlocks)
elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d", elog(LOG, "%s(%s %d): excl %d shared %d head %p rOK %d",
where, (int) lockid, where, T_NAME(lock), T_ID(lock),
(int) lock->exclusive, lock->shared, lock->head, (int) lock->exclusive, lock->shared, lock->head,
(int) lock->releaseOK); (int) lock->releaseOK);
} }
inline static void inline static void
LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg) LOG_LWDEBUG(const char *where, const char *name, int index, const char *msg)
{ {
if (Trace_lwlocks) if (Trace_lwlocks)
elog(LOG, "%s(%d): %s", where, (int) lockid, msg); elog(LOG, "%s(%s %d): %s", where, name, index, msg);
} }
#else /* not LOCK_DEBUG */ #else /* not LOCK_DEBUG */
#define PRINT_LWDEBUG(a,b,c) #define PRINT_LWDEBUG(a,b)
#define LOG_LWDEBUG(a,b,c) #define LOG_LWDEBUG(a,b,c,d)
#endif /* LOCK_DEBUG */ #endif /* LOCK_DEBUG */
#ifdef LWLOCK_STATS #ifdef LWLOCK_STATS
static void init_lwlock_stats(void); static void init_lwlock_stats(void);
static void print_lwlock_stats(int code, Datum arg); static void print_lwlock_stats(int code, Datum arg);
static lwlock_stats *get_lwlock_stats_entry(LWLock *lockid);
static void static void
init_lwlock_stats(void) init_lwlock_stats(void)
{ {
int *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int)); HASHCTL ctl;
int numLocks = LWLockCounter[1];
if (lwlock_stats_htab != NULL)
{
hash_destroy(lwlock_stats_htab);
lwlock_stats_htab = NULL;
}
sh_acquire_counts = calloc(numLocks, sizeof(int)); MemSet(&ctl, 0, sizeof(ctl));
ex_acquire_counts = calloc(numLocks, sizeof(int)); ctl.keysize = sizeof(lwlock_stats_key);
spin_delay_counts = calloc(numLocks, sizeof(int)); ctl.entrysize = sizeof(lwlock_stats);
block_counts = calloc(numLocks, sizeof(int)); ctl.hash = tag_hash;
lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
HASH_ELEM | HASH_FUNCTION);
counts_for_pid = MyProcPid; counts_for_pid = MyProcPid;
on_shmem_exit(print_lwlock_stats, 0); on_shmem_exit(print_lwlock_stats, 0);
} }
...@@ -144,30 +151,58 @@ init_lwlock_stats(void) ...@@ -144,30 +151,58 @@ init_lwlock_stats(void)
static void static void
print_lwlock_stats(int code, Datum arg) print_lwlock_stats(int code, Datum arg)
{ {
int i; HASH_SEQ_STATUS scan;
int *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int)); lwlock_stats *lwstats;
int numLocks = LWLockCounter[1];
hash_seq_init(&scan, lwlock_stats_htab);
/* Grab an LWLock to keep different backends from mixing reports */ /* Grab an LWLock to keep different backends from mixing reports */
LWLockAcquire(0, LW_EXCLUSIVE); LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
for (i = 0; i < numLocks; i++) while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
{ {
if (sh_acquire_counts[i] || ex_acquire_counts[i] || block_counts[i] || spin_delay_counts[i]) fprintf(stderr,
fprintf(stderr, "PID %d lwlock %d: shacq %u exacq %u blk %u spindelay %u\n", "PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u\n",
MyProcPid, i, sh_acquire_counts[i], ex_acquire_counts[i], MyProcPid, LWLockTrancheArray[lwstats->key.tranche]->name,
block_counts[i], spin_delay_counts[i]); lwstats->key.instance, lwstats->sh_acquire_count,
lwstats->ex_acquire_count, lwstats->block_count,
lwstats->spin_delay_count);
} }
LWLockRelease(0); LWLockRelease(&MainLWLockArray[0].lock);
}
static lwlock_stats *
get_lwlock_stats_entry(LWLock *lock)
{
lwlock_stats_key key;
lwlock_stats *lwstats;
bool found;
/* Set up local count state first time through in a given process */
if (counts_for_pid != MyProcPid)
init_lwlock_stats();
/* Fetch or create the entry. */
key.tranche = lock->tranche;
key.instance = T_ID(lock);
lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
if (!found)
{
lwstats->sh_acquire_count = 0;
lwstats->ex_acquire_count = 0;
lwstats->block_count = 0;
lwstats->spin_delay_count = 0;
}
return lwstats;
} }
#endif /* LWLOCK_STATS */ #endif /* LWLOCK_STATS */
/* /*
* Compute number of LWLocks to allocate. * Compute number of LWLocks to allocate in the main array.
*/ */
int static int
NumLWLocks(void) NumLWLocks(void)
{ {
int numLocks; int numLocks;
...@@ -180,7 +215,7 @@ NumLWLocks(void) ...@@ -180,7 +215,7 @@ NumLWLocks(void)
*/ */
/* Predefined LWLocks */ /* Predefined LWLocks */
numLocks = (int) NumFixedLWLocks; numLocks = NUM_FIXED_LWLOCKS;
/* bufmgr.c needs two for each shared buffer */ /* bufmgr.c needs two for each shared buffer */
numLocks += 2 * NBuffers; numLocks += 2 * NBuffers;
...@@ -248,18 +283,21 @@ LWLockShmemSize(void) ...@@ -248,18 +283,21 @@ LWLockShmemSize(void)
size = mul_size(numLocks, sizeof(LWLockPadded)); size = mul_size(numLocks, sizeof(LWLockPadded));
/* Space for dynamic allocation counter, plus room for alignment. */ /* Space for dynamic allocation counter, plus room for alignment. */
size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE); size = add_size(size, 3 * sizeof(int) + LWLOCK_PADDED_SIZE);
return size; return size;
} }
/* /*
* Allocate shmem space for LWLocks and initialize the locks. * Allocate shmem space for the main LWLock array and initialize it. We also
* register the main tranch here.
*/ */
void void
CreateLWLocks(void) CreateLWLocks(void)
{ {
if (!IsUnderPostmaster)
{
int numLocks = NumLWLocks(); int numLocks = NumLWLocks();
Size spaceLocks = LWLockShmemSize(); Size spaceLocks = LWLockShmemSize();
LWLockPadded *lock; LWLockPadded *lock;
...@@ -270,34 +308,42 @@ CreateLWLocks(void) ...@@ -270,34 +308,42 @@ CreateLWLocks(void)
/* Allocate space */ /* Allocate space */
ptr = (char *) ShmemAlloc(spaceLocks); ptr = (char *) ShmemAlloc(spaceLocks);
/* Leave room for dynamic allocation counter */ /* Leave room for dynamic allocation of locks and tranches */
ptr += 2 * sizeof(int); ptr += 3 * sizeof(int);
/* Ensure desired alignment of LWLock array */ /* Ensure desired alignment of LWLock array */
ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE; ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
LWLockArray = (LWLockPadded *) ptr; MainLWLockArray = (LWLockPadded *) ptr;
/* Initialize all LWLocks in main array */
for (id = 0, lock = MainLWLockArray; id < numLocks; id++, lock++)
LWLockInitialize(&lock->lock, 0);
/* /*
* Initialize all LWLocks to "unlocked" state * Initialize the dynamic-allocation counters, which are stored just
*/ * before the first LWLock. LWLockCounter[0] is the allocation
for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++) * counter for lwlocks, LWLockCounter[1] is the maximum number that
* can be allocated from the main array, and LWLockCounter[2] is the
* allocation counter for tranches.
*/
LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
LWLockCounter[0] = NUM_FIXED_LWLOCKS;
LWLockCounter[1] = numLocks;
LWLockCounter[2] = 1; /* 0 is the main array */
}
if (LWLockTrancheArray == NULL)
{ {
SpinLockInit(&lock->lock.mutex); LWLockTranchesAllocated = 16;
lock->lock.releaseOK = true; LWLockTrancheArray = MemoryContextAlloc(TopMemoryContext,
lock->lock.exclusive = 0; LWLockTranchesAllocated * sizeof(LWLockTranche *));
lock->lock.shared = 0;
lock->lock.head = NULL;
lock->lock.tail = NULL;
} }
/* MainLWLockTranche.name = "main";
* Initialize the dynamic-allocation counter, which is stored just before MainLWLockTranche.array_base = MainLWLockArray;
* the first LWLock. MainLWLockTranche.array_stride = sizeof(LWLockPadded);
*/ LWLockRegisterTranche(0, &MainLWLockTranche);
LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
LWLockCounter[0] = (int) NumFixedLWLocks;
LWLockCounter[1] = numLocks;
} }
...@@ -309,26 +355,86 @@ CreateLWLocks(void) ...@@ -309,26 +355,86 @@ CreateLWLocks(void)
* startup, but it is needed if any user-defined code tries to allocate * startup, but it is needed if any user-defined code tries to allocate
* LWLocks after startup. * LWLocks after startup.
*/ */
LWLockId LWLock *
LWLockAssign(void) LWLockAssign(void)
{ {
LWLockId result; LWLock *result;
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile int *LWLockCounter; volatile int *LWLockCounter;
LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int)); LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
SpinLockAcquire(ShmemLock); SpinLockAcquire(ShmemLock);
if (LWLockCounter[0] >= LWLockCounter[1]) if (LWLockCounter[0] >= LWLockCounter[1])
{ {
SpinLockRelease(ShmemLock); SpinLockRelease(ShmemLock);
elog(ERROR, "no more LWLockIds available"); elog(ERROR, "no more LWLocks available");
} }
result = (LWLockId) (LWLockCounter[0]++); result = &MainLWLockArray[LWLockCounter[0]++].lock;
SpinLockRelease(ShmemLock);
return result;
}
/*
* Allocate a new tranche ID.
*/
int
LWLockNewTrancheId(void)
{
int result;
/* use volatile pointer to prevent code rearrangement */
volatile int *LWLockCounter;
LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
SpinLockAcquire(ShmemLock);
result = LWLockCounter[2]++;
SpinLockRelease(ShmemLock); SpinLockRelease(ShmemLock);
return result; return result;
} }
/*
* Register a tranche ID in the lookup table for the current process. This
* routine will save a pointer to the tranche object passed as an argument,
* so that object should be allocated in a backend-lifetime context
* (TopMemoryContext, static variable, or similar).
*/
void
LWLockRegisterTranche(int tranche_id, LWLockTranche *tranche)
{
Assert(LWLockTrancheArray != NULL);
if (tranche_id >= LWLockTranchesAllocated)
{
int i = LWLockTranchesAllocated;
while (i < tranche_id)
i *= 2;
LWLockTrancheArray = repalloc(LWLockTrancheArray,
i * sizeof(LWLockTranche *));
LWLockTranchesAllocated = i;
}
LWLockTrancheArray[tranche_id] = tranche;
}
/*
* LWLockInitialize - initialize a new lwlock; it's initially unlocked
*/
void
LWLockInitialize(LWLock *lock, int tranche_id)
{
SpinLockInit(&lock->mutex);
lock->releaseOK = true;
lock->exclusive = 0;
lock->shared = 0;
lock->tranche = tranche_id;
lock->head = NULL;
lock->tail = NULL;
}
/* /*
* LWLockAcquire - acquire a lightweight lock in the specified mode * LWLockAcquire - acquire a lightweight lock in the specified mode
...@@ -338,24 +444,26 @@ LWLockAssign(void) ...@@ -338,24 +444,26 @@ LWLockAssign(void)
* Side effect: cancel/die interrupts are held off until lock release. * Side effect: cancel/die interrupts are held off until lock release.
*/ */
void void
LWLockAcquire(LWLockId lockid, LWLockMode mode) LWLockAcquire(LWLock *l, LWLockMode mode)
{ {
volatile LWLock *lock = &(LWLockArray[lockid].lock); volatile LWLock *lock = l;
PGPROC *proc = MyProc; PGPROC *proc = MyProc;
bool retry = false; bool retry = false;
int extraWaits = 0; int extraWaits = 0;
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
#endif
PRINT_LWDEBUG("LWLockAcquire", lockid, lock); PRINT_LWDEBUG("LWLockAcquire", lock);
#ifdef LWLOCK_STATS #ifdef LWLOCK_STATS
/* Set up local count state first time through in a given process */ lwstats = get_lwlock_stats_entry(l);
if (counts_for_pid != MyProcPid)
init_lwlock_stats();
/* Count lock acquisition attempts */ /* Count lock acquisition attempts */
if (mode == LW_EXCLUSIVE) if (mode == LW_EXCLUSIVE)
ex_acquire_counts[lockid]++; lwstats->ex_acquire_count++;
else else
sh_acquire_counts[lockid]++; lwstats->sh_acquire_count++;
#endif /* LWLOCK_STATS */ #endif /* LWLOCK_STATS */
/* /*
...@@ -398,7 +506,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) ...@@ -398,7 +506,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
/* Acquire mutex. Time spent holding mutex should be short! */ /* Acquire mutex. Time spent holding mutex should be short! */
#ifdef LWLOCK_STATS #ifdef LWLOCK_STATS
spin_delay_counts[lockid] += SpinLockAcquire(&lock->mutex); lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
#else #else
SpinLockAcquire(&lock->mutex); SpinLockAcquire(&lock->mutex);
#endif #endif
...@@ -466,13 +574,13 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) ...@@ -466,13 +574,13 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
* so that the lock manager or signal manager will see the received * so that the lock manager or signal manager will see the received
* signal when it next waits. * signal when it next waits.
*/ */
LOG_LWDEBUG("LWLockAcquire", lockid, "waiting"); LOG_LWDEBUG("LWLockAcquire", T_NAME(l), T_ID(l), "waiting");
#ifdef LWLOCK_STATS #ifdef LWLOCK_STATS
block_counts[lockid]++; lwstats->block_count++;
#endif #endif
TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode); TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode);
for (;;) for (;;)
{ {
...@@ -483,9 +591,9 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) ...@@ -483,9 +591,9 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
extraWaits++; extraWaits++;
} }
TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode); TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode);
LOG_LWDEBUG("LWLockAcquire", lockid, "awakened"); LOG_LWDEBUG("LWLockAcquire", T_NAME(l), T_ID(l), "awakened");
/* Now loop back and try to acquire lock again. */ /* Now loop back and try to acquire lock again. */
retry = true; retry = true;
...@@ -494,10 +602,10 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) ...@@ -494,10 +602,10 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
/* We are done updating shared state of the lock itself. */ /* We are done updating shared state of the lock itself. */
SpinLockRelease(&lock->mutex); SpinLockRelease(&lock->mutex);
TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode); TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(l), T_ID(l), mode);
/* Add lock to list of locks held by this backend */ /* Add lock to list of locks held by this backend */
held_lwlocks[num_held_lwlocks++] = lockid; held_lwlocks[num_held_lwlocks++] = l;
/* /*
* Fix the process wait semaphore's count for any absorbed wakeups. * Fix the process wait semaphore's count for any absorbed wakeups.
...@@ -514,12 +622,12 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) ...@@ -514,12 +622,12 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
* If successful, cancel/die interrupts are held off until lock release. * If successful, cancel/die interrupts are held off until lock release.
*/ */
bool bool
LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) LWLockConditionalAcquire(LWLock *l, LWLockMode mode)
{ {
volatile LWLock *lock = &(LWLockArray[lockid].lock); volatile LWLock *lock = l;
bool mustwait; bool mustwait;
PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock); PRINT_LWDEBUG("LWLockConditionalAcquire", lock);
/* Ensure we will have room to remember the lock */ /* Ensure we will have room to remember the lock */
if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
...@@ -564,14 +672,14 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) ...@@ -564,14 +672,14 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
{ {
/* Failed to get lock, so release interrupt holdoff */ /* Failed to get lock, so release interrupt holdoff */
RESUME_INTERRUPTS(); RESUME_INTERRUPTS();
LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed"); LOG_LWDEBUG("LWLockConditionalAcquire", T_NAME(l), T_ID(l), "failed");
TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode); TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(l), T_ID(l), mode);
} }
else else
{ {
/* Add lock to list of locks held by this backend */ /* Add lock to list of locks held by this backend */
held_lwlocks[num_held_lwlocks++] = lockid; held_lwlocks[num_held_lwlocks++] = l;
TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode); TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(l), T_ID(l), mode);
} }
return !mustwait; return !mustwait;
...@@ -592,19 +700,20 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) ...@@ -592,19 +700,20 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
* wake up, observe that their records have already been flushed, and return. * wake up, observe that their records have already been flushed, and return.
*/ */
bool bool
LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) LWLockAcquireOrWait(LWLock *l, LWLockMode mode)
{ {
volatile LWLock *lock = &(LWLockArray[lockid].lock); volatile LWLock *lock = l;
PGPROC *proc = MyProc; PGPROC *proc = MyProc;
bool mustwait; bool mustwait;
int extraWaits = 0; int extraWaits = 0;
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
#endif
PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock); PRINT_LWDEBUG("LWLockAcquireOrWait", lock);
#ifdef LWLOCK_STATS #ifdef LWLOCK_STATS
/* Set up local count state first time through in a given process */ lwstats = get_lwlock_stats_entry(l);
if (counts_for_pid != MyProcPid)
init_lwlock_stats();
#endif #endif
/* Ensure we will have room to remember the lock */ /* Ensure we will have room to remember the lock */
...@@ -671,13 +780,13 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) ...@@ -671,13 +780,13 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
* Wait until awakened. Like in LWLockAcquire, be prepared for bogus * Wait until awakened. Like in LWLockAcquire, be prepared for bogus
* wakups, because we share the semaphore with ProcWaitForSignal. * wakups, because we share the semaphore with ProcWaitForSignal.
*/ */
LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "waiting"); LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "waiting");
#ifdef LWLOCK_STATS #ifdef LWLOCK_STATS
block_counts[lockid]++; lwstats->block_count++;
#endif #endif
TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode); TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode);
for (;;) for (;;)
{ {
...@@ -688,9 +797,9 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) ...@@ -688,9 +797,9 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
extraWaits++; extraWaits++;
} }
TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode); TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode);
LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "awakened"); LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "awakened");
} }
else else
{ {
...@@ -708,14 +817,14 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) ...@@ -708,14 +817,14 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
{ {
/* Failed to get lock, so release interrupt holdoff */ /* Failed to get lock, so release interrupt holdoff */
RESUME_INTERRUPTS(); RESUME_INTERRUPTS();
LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "failed"); LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "failed");
TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode); TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(T_NAME(l), T_ID(l), mode);
} }
else else
{ {
/* Add lock to list of locks held by this backend */ /* Add lock to list of locks held by this backend */
held_lwlocks[num_held_lwlocks++] = lockid; held_lwlocks[num_held_lwlocks++] = l;
TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode); TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(T_NAME(l), T_ID(l), mode);
} }
return !mustwait; return !mustwait;
...@@ -725,14 +834,14 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) ...@@ -725,14 +834,14 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
* LWLockRelease - release a previously acquired lock * LWLockRelease - release a previously acquired lock
*/ */
void void
LWLockRelease(LWLockId lockid) LWLockRelease(LWLock *l)
{ {
volatile LWLock *lock = &(LWLockArray[lockid].lock); volatile LWLock *lock = l;
PGPROC *head; PGPROC *head;
PGPROC *proc; PGPROC *proc;
int i; int i;
PRINT_LWDEBUG("LWLockRelease", lockid, lock); PRINT_LWDEBUG("LWLockRelease", lock);
/* /*
* Remove lock from list of locks held. Usually, but not always, it will * Remove lock from list of locks held. Usually, but not always, it will
...@@ -740,11 +849,11 @@ LWLockRelease(LWLockId lockid) ...@@ -740,11 +849,11 @@ LWLockRelease(LWLockId lockid)
*/ */
for (i = num_held_lwlocks; --i >= 0;) for (i = num_held_lwlocks; --i >= 0;)
{ {
if (lockid == held_lwlocks[i]) if (l == held_lwlocks[i])
break; break;
} }
if (i < 0) if (i < 0)
elog(ERROR, "lock %d is not held", (int) lockid); elog(ERROR, "lock %s %d is not held", T_NAME(l), T_ID(l));
num_held_lwlocks--; num_held_lwlocks--;
for (; i < num_held_lwlocks; i++) for (; i < num_held_lwlocks; i++)
held_lwlocks[i] = held_lwlocks[i + 1]; held_lwlocks[i] = held_lwlocks[i + 1];
...@@ -824,14 +933,14 @@ LWLockRelease(LWLockId lockid) ...@@ -824,14 +933,14 @@ LWLockRelease(LWLockId lockid)
/* We are done updating shared state of the lock itself. */ /* We are done updating shared state of the lock itself. */
SpinLockRelease(&lock->mutex); SpinLockRelease(&lock->mutex);
TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid); TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(l), T_ID(l));
/* /*
* Awaken any waiters I removed from the queue. * Awaken any waiters I removed from the queue.
*/ */
while (head != NULL) while (head != NULL)
{ {
LOG_LWDEBUG("LWLockRelease", lockid, "release waiter"); LOG_LWDEBUG("LWLockRelease", T_NAME(l), T_ID(l), "release waiter");
proc = head; proc = head;
head = proc->lwWaitLink; head = proc->lwWaitLink;
proc->lwWaitLink = NULL; proc->lwWaitLink = NULL;
...@@ -874,13 +983,13 @@ LWLockReleaseAll(void) ...@@ -874,13 +983,13 @@ LWLockReleaseAll(void)
* lock is held shared or exclusive. * lock is held shared or exclusive.
*/ */
bool bool
LWLockHeldByMe(LWLockId lockid) LWLockHeldByMe(LWLock *l)
{ {
int i; int i;
for (i = 0; i < num_held_lwlocks; i++) for (i = 0; i < num_held_lwlocks; i++)
{ {
if (held_lwlocks[i] == lockid) if (held_lwlocks[i] == l)
return true; return true;
} }
return false; return false;
......
...@@ -241,7 +241,10 @@ ...@@ -241,7 +241,10 @@
#define PredicateLockHashPartition(hashcode) \ #define PredicateLockHashPartition(hashcode) \
((hashcode) % NUM_PREDICATELOCK_PARTITIONS) ((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
#define PredicateLockHashPartitionLock(hashcode) \ #define PredicateLockHashPartitionLock(hashcode) \
((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode))) (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + \
PredicateLockHashPartition(hashcode)].lock)
#define PredicateLockHashPartitionLockByIndex(i) \
(&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + (i)].lock)
#define NPREDICATELOCKTARGETENTS() \ #define NPREDICATELOCKTARGETENTS() \
mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts)) mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
...@@ -383,7 +386,7 @@ static SHM_QUEUE *FinishedSerializableTransactions; ...@@ -383,7 +386,7 @@ static SHM_QUEUE *FinishedSerializableTransactions;
*/ */
static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0}; static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0};
static uint32 ScratchTargetTagHash; static uint32 ScratchTargetTagHash;
static int ScratchPartitionLock; static LWLock *ScratchPartitionLock;
/* /*
* The local hash table used to determine when to combine multiple fine- * The local hash table used to determine when to combine multiple fine-
...@@ -1398,7 +1401,7 @@ GetPredicateLockStatusData(void) ...@@ -1398,7 +1401,7 @@ GetPredicateLockStatusData(void)
* in ascending order, then SerializableXactHashLock. * in ascending order, then SerializableXactHashLock.
*/ */
for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++) for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED); LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED);
LWLockAcquire(SerializableXactHashLock, LW_SHARED); LWLockAcquire(SerializableXactHashLock, LW_SHARED);
/* Get number of locks and allocate appropriately-sized arrays. */ /* Get number of locks and allocate appropriately-sized arrays. */
...@@ -1427,7 +1430,7 @@ GetPredicateLockStatusData(void) ...@@ -1427,7 +1430,7 @@ GetPredicateLockStatusData(void)
/* Release locks in reverse order */ /* Release locks in reverse order */
LWLockRelease(SerializableXactHashLock); LWLockRelease(SerializableXactHashLock);
for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--) for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
LWLockRelease(FirstPredicateLockMgrLock + i); LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
return data; return data;
} }
...@@ -1856,7 +1859,7 @@ PageIsPredicateLocked(Relation relation, BlockNumber blkno) ...@@ -1856,7 +1859,7 @@ PageIsPredicateLocked(Relation relation, BlockNumber blkno)
{ {
PREDICATELOCKTARGETTAG targettag; PREDICATELOCKTARGETTAG targettag;
uint32 targettaghash; uint32 targettaghash;
LWLockId partitionLock; LWLock *partitionLock;
PREDICATELOCKTARGET *target; PREDICATELOCKTARGET *target;
SET_PREDICATELOCKTARGETTAG_PAGE(targettag, SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
...@@ -2089,7 +2092,7 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) ...@@ -2089,7 +2092,7 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
if (TargetTagIsCoveredBy(oldtargettag, *newtargettag)) if (TargetTagIsCoveredBy(oldtargettag, *newtargettag))
{ {
uint32 oldtargettaghash; uint32 oldtargettaghash;
LWLockId partitionLock; LWLock *partitionLock;
PREDICATELOCK *rmpredlock PG_USED_FOR_ASSERTS_ONLY; PREDICATELOCK *rmpredlock PG_USED_FOR_ASSERTS_ONLY;
oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag); oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
...@@ -2301,7 +2304,7 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, ...@@ -2301,7 +2304,7 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
PREDICATELOCKTARGET *target; PREDICATELOCKTARGET *target;
PREDICATELOCKTAG locktag; PREDICATELOCKTAG locktag;
PREDICATELOCK *lock; PREDICATELOCK *lock;
LWLockId partitionLock; LWLock *partitionLock;
bool found; bool found;
partitionLock = PredicateLockHashPartitionLock(targettaghash); partitionLock = PredicateLockHashPartitionLock(targettaghash);
...@@ -2599,10 +2602,10 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, ...@@ -2599,10 +2602,10 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
bool removeOld) bool removeOld)
{ {
uint32 oldtargettaghash; uint32 oldtargettaghash;
LWLockId oldpartitionLock; LWLock *oldpartitionLock;
PREDICATELOCKTARGET *oldtarget; PREDICATELOCKTARGET *oldtarget;
uint32 newtargettaghash; uint32 newtargettaghash;
LWLockId newpartitionLock; LWLock *newpartitionLock;
bool found; bool found;
bool outOfShmem = false; bool outOfShmem = false;
...@@ -2858,7 +2861,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) ...@@ -2858,7 +2861,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer)
/* Acquire locks on all lock partitions */ /* Acquire locks on all lock partitions */
LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE); LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++) for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE); LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_EXCLUSIVE);
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
/* /*
...@@ -2996,7 +2999,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) ...@@ -2996,7 +2999,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer)
/* Release locks in reverse order */ /* Release locks in reverse order */
LWLockRelease(SerializableXactHashLock); LWLockRelease(SerializableXactHashLock);
for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--) for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
LWLockRelease(FirstPredicateLockMgrLock + i); LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
LWLockRelease(SerializablePredicateLockListLock); LWLockRelease(SerializablePredicateLockListLock);
} }
...@@ -3611,7 +3614,7 @@ ClearOldPredicateLocks(void) ...@@ -3611,7 +3614,7 @@ ClearOldPredicateLocks(void)
PREDICATELOCKTARGET *target; PREDICATELOCKTARGET *target;
PREDICATELOCKTARGETTAG targettag; PREDICATELOCKTARGETTAG targettag;
uint32 targettaghash; uint32 targettaghash;
LWLockId partitionLock; LWLock *partitionLock;
tag = predlock->tag; tag = predlock->tag;
target = tag.myTarget; target = tag.myTarget;
...@@ -3690,7 +3693,7 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, ...@@ -3690,7 +3693,7 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
PREDICATELOCKTARGET *target; PREDICATELOCKTARGET *target;
PREDICATELOCKTARGETTAG targettag; PREDICATELOCKTARGETTAG targettag;
uint32 targettaghash; uint32 targettaghash;
LWLockId partitionLock; LWLock *partitionLock;
nextpredlock = (PREDICATELOCK *) nextpredlock = (PREDICATELOCK *)
SHMQueueNext(&(sxact->predicateLocks), SHMQueueNext(&(sxact->predicateLocks),
...@@ -4068,7 +4071,7 @@ static void ...@@ -4068,7 +4071,7 @@ static void
CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
{ {
uint32 targettaghash; uint32 targettaghash;
LWLockId partitionLock; LWLock *partitionLock;
PREDICATELOCKTARGET *target; PREDICATELOCKTARGET *target;
PREDICATELOCK *predlock; PREDICATELOCK *predlock;
PREDICATELOCK *mypredlock = NULL; PREDICATELOCK *mypredlock = NULL;
...@@ -4360,7 +4363,7 @@ CheckTableForSerializableConflictIn(Relation relation) ...@@ -4360,7 +4363,7 @@ CheckTableForSerializableConflictIn(Relation relation)
LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE); LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++) for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED); LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED);
LWLockAcquire(SerializableXactHashLock, LW_SHARED); LWLockAcquire(SerializableXactHashLock, LW_SHARED);
/* Scan through target list */ /* Scan through target list */
...@@ -4407,7 +4410,7 @@ CheckTableForSerializableConflictIn(Relation relation) ...@@ -4407,7 +4410,7 @@ CheckTableForSerializableConflictIn(Relation relation)
/* Release locks in reverse order */ /* Release locks in reverse order */
LWLockRelease(SerializableXactHashLock); LWLockRelease(SerializableXactHashLock);
for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--) for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
LWLockRelease(FirstPredicateLockMgrLock + i); LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
LWLockRelease(SerializablePredicateLockListLock); LWLockRelease(SerializablePredicateLockListLock);
} }
......
...@@ -189,7 +189,8 @@ InitProcGlobal(void) ...@@ -189,7 +189,8 @@ InitProcGlobal(void)
*/ */
procs = (PGPROC *) ShmemAlloc(TotalProcs * sizeof(PGPROC)); procs = (PGPROC *) ShmemAlloc(TotalProcs * sizeof(PGPROC));
ProcGlobal->allProcs = procs; ProcGlobal->allProcs = procs;
ProcGlobal->allProcCount = TotalProcs; /* XXX allProcCount isn't really all of them; it excludes prepared xacts */
ProcGlobal->allProcCount = MaxBackends + NUM_AUXILIARY_PROCS;
if (!procs) if (!procs)
ereport(FATAL, ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY), (errcode(ERRCODE_OUT_OF_MEMORY),
...@@ -663,7 +664,7 @@ IsWaitingForLock(void) ...@@ -663,7 +664,7 @@ IsWaitingForLock(void)
void void
LockErrorCleanup(void) LockErrorCleanup(void)
{ {
LWLockId partitionLock; LWLock *partitionLock;
DisableTimeoutParams timeouts[2]; DisableTimeoutParams timeouts[2];
AbortStrongLockAcquire(); AbortStrongLockAcquire();
...@@ -942,7 +943,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) ...@@ -942,7 +943,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
LOCK *lock = locallock->lock; LOCK *lock = locallock->lock;
PROCLOCK *proclock = locallock->proclock; PROCLOCK *proclock = locallock->proclock;
uint32 hashcode = locallock->hashcode; uint32 hashcode = locallock->hashcode;
LWLockId partitionLock = LockHashPartitionLock(hashcode); LWLock *partitionLock = LockHashPartitionLock(hashcode);
PROC_QUEUE *waitQueue = &(lock->waitProcs); PROC_QUEUE *waitQueue = &(lock->waitProcs);
LOCKMASK myHeldLocks = MyProc->heldLocks; LOCKMASK myHeldLocks = MyProc->heldLocks;
bool early_deadlock = false; bool early_deadlock = false;
...@@ -1440,7 +1441,7 @@ CheckDeadLock(void) ...@@ -1440,7 +1441,7 @@ CheckDeadLock(void)
* interrupts. * interrupts.
*/ */
for (i = 0; i < NUM_LOCK_PARTITIONS; i++) for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
LWLockAcquire(FirstLockMgrLock + i, LW_EXCLUSIVE); LWLockAcquire(LockHashPartitionLockByIndex(i), LW_EXCLUSIVE);
/* /*
* Check to see if we've been awoken by anyone in the interim. * Check to see if we've been awoken by anyone in the interim.
...@@ -1522,7 +1523,7 @@ CheckDeadLock(void) ...@@ -1522,7 +1523,7 @@ CheckDeadLock(void)
*/ */
check_done: check_done:
for (i = NUM_LOCK_PARTITIONS; --i >= 0;) for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
LWLockRelease(FirstLockMgrLock + i); LWLockRelease(LockHashPartitionLockByIndex(i));
} }
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
* in probe definitions, as they cause compilation errors on Mac OS X 10.5. * in probe definitions, as they cause compilation errors on Mac OS X 10.5.
*/ */
#define LocalTransactionId unsigned int #define LocalTransactionId unsigned int
#define LWLockId int
#define LWLockMode int #define LWLockMode int
#define LOCKMODE int #define LOCKMODE int
#define BlockNumber unsigned int #define BlockNumber unsigned int
...@@ -29,14 +28,14 @@ provider postgresql { ...@@ -29,14 +28,14 @@ provider postgresql {
probe transaction__commit(LocalTransactionId); probe transaction__commit(LocalTransactionId);
probe transaction__abort(LocalTransactionId); probe transaction__abort(LocalTransactionId);
probe lwlock__acquire(LWLockId, LWLockMode); probe lwlock__acquire(const char *, int, LWLockMode);
probe lwlock__release(LWLockId); probe lwlock__release(const char *, int);
probe lwlock__wait__start(LWLockId, LWLockMode); probe lwlock__wait__start(const char *, int, LWLockMode);
probe lwlock__wait__done(LWLockId, LWLockMode); probe lwlock__wait__done(const char *, int, LWLockMode);
probe lwlock__condacquire(LWLockId, LWLockMode); probe lwlock__condacquire(const char *, int, LWLockMode);
probe lwlock__condacquire__fail(LWLockId, LWLockMode); probe lwlock__condacquire__fail(const char *, int, LWLockMode);
probe lwlock__wait__until__free(LWLockId, LWLockMode); probe lwlock__wait__until__free(const char *, int, LWLockMode);
probe lwlock__wait__until__free__fail(LWLockId, LWLockMode); probe lwlock__wait__until__free__fail(const char *, int, LWLockMode);
probe lock__wait__start(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE); probe lock__wait__start(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE);
probe lock__wait__done(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE); probe lock__wait__done(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE);
......
...@@ -55,7 +55,7 @@ typedef enum ...@@ -55,7 +55,7 @@ typedef enum
*/ */
typedef struct SlruSharedData typedef struct SlruSharedData
{ {
LWLockId ControlLock; LWLock *ControlLock;
/* Number of buffers managed by this SLRU structure */ /* Number of buffers managed by this SLRU structure */
int num_slots; int num_slots;
...@@ -69,7 +69,7 @@ typedef struct SlruSharedData ...@@ -69,7 +69,7 @@ typedef struct SlruSharedData
bool *page_dirty; bool *page_dirty;
int *page_number; int *page_number;
int *page_lru_count; int *page_lru_count;
LWLockId *buffer_locks; LWLock **buffer_locks;
/* /*
* Optional array of WAL flush LSNs associated with entries in the SLRU * Optional array of WAL flush LSNs associated with entries in the SLRU
...@@ -136,7 +136,7 @@ typedef SlruCtlData *SlruCtl; ...@@ -136,7 +136,7 @@ typedef SlruCtlData *SlruCtl;
extern Size SimpleLruShmemSize(int nslots, int nlsns); extern Size SimpleLruShmemSize(int nslots, int nlsns);
extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
LWLockId ctllock, const char *subdir); LWLock *ctllock, const char *subdir);
extern int SimpleLruZeroPage(SlruCtl ctl, int pageno); extern int SimpleLruZeroPage(SlruCtl ctl, int pageno);
extern int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, extern int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
TransactionId xid); TransactionId xid);
......
...@@ -104,7 +104,10 @@ typedef struct buftag ...@@ -104,7 +104,10 @@ typedef struct buftag
#define BufTableHashPartition(hashcode) \ #define BufTableHashPartition(hashcode) \
((hashcode) % NUM_BUFFER_PARTITIONS) ((hashcode) % NUM_BUFFER_PARTITIONS)
#define BufMappingPartitionLock(hashcode) \ #define BufMappingPartitionLock(hashcode) \
((LWLockId) (FirstBufMappingLock + BufTableHashPartition(hashcode))) (&MainLWLockArray[BUFFER_MAPPING_LWLOCK_OFFSET + \
BufTableHashPartition(hashcode)].lock)
#define BufMappingPartitionLockByIndex(i) \
(&MainLWLockArray[BUFFER_MAPPING_LWLOCK_OFFSET + (i)].lock)
/* /*
* BufferDesc -- shared descriptor/state data for a single shared buffer. * BufferDesc -- shared descriptor/state data for a single shared buffer.
...@@ -144,8 +147,8 @@ typedef struct sbufdesc ...@@ -144,8 +147,8 @@ typedef struct sbufdesc
int buf_id; /* buffer's index number (from 0) */ int buf_id; /* buffer's index number (from 0) */
int freeNext; /* link in freelist chain */ int freeNext; /* link in freelist chain */
LWLockId io_in_progress_lock; /* to wait for I/O to complete */ LWLock *io_in_progress_lock; /* to wait for I/O to complete */
LWLockId content_lock; /* to lock access to buffer contents */ LWLock *content_lock; /* to lock access to buffer contents */
} BufferDesc; } BufferDesc;
#define BufferDescriptorGetBuffer(bdesc) ((bdesc)->buf_id + 1) #define BufferDescriptorGetBuffer(bdesc) ((bdesc)->buf_id + 1)
......
...@@ -483,8 +483,10 @@ typedef enum ...@@ -483,8 +483,10 @@ typedef enum
#define LockHashPartition(hashcode) \ #define LockHashPartition(hashcode) \
((hashcode) % NUM_LOCK_PARTITIONS) ((hashcode) % NUM_LOCK_PARTITIONS)
#define LockHashPartitionLock(hashcode) \ #define LockHashPartitionLock(hashcode) \
((LWLockId) (FirstLockMgrLock + LockHashPartition(hashcode))) (&MainLWLockArray[LOCK_MANAGER_LWLOCK_OFFSET + \
LockHashPartition(hashcode)].lock)
#define LockHashPartitionLockByIndex(i) \
(&MainLWLockArray[LOCK_MANAGER_LWLOCK_OFFSET + (i)].lock)
/* /*
* function prototypes * function prototypes
......
...@@ -14,10 +14,123 @@ ...@@ -14,10 +14,123 @@
#ifndef LWLOCK_H #ifndef LWLOCK_H
#define LWLOCK_H #define LWLOCK_H
#include "storage/s_lock.h"
struct PGPROC;
/*
* It's occasionally necessary to identify a particular LWLock "by name"; e.g.
* because we wish to report the lock to dtrace. We could store a name or
* other identifying information in the lock itself, but since it's common
* to have many nearly-identical locks (e.g. one per buffer) this would end
* up wasting significant amounts of memory. Instead, each lwlock stores a
* tranche ID which tells us which array it's part of. Based on that, we can
* figure out where the lwlock lies within the array using the data structure
* shown below; the lock is then identified based on the tranche name and
* computed array index. We need the array stride because the array might not
* be an array of lwlocks, but rather some larger data structure that includes
* one or more lwlocks per element.
*/
typedef struct LWLockTranche
{
const char *name;
void *array_base;
Size array_stride;
} LWLockTranche;
/*
* Code outside of lwlock.c should not manipulate the contents of this
* structure directly, but we have to declare it here to allow LWLocks to be
* incorporated into other data structures.
*/
typedef struct LWLock
{
slock_t mutex; /* Protects LWLock and queue of PGPROCs */
bool releaseOK; /* T if ok to release waiters */
char exclusive; /* # of exclusive holders (0 or 1) */
int shared; /* # of shared holders (0..MaxBackends) */
int tranche; /* tranche ID */
struct PGPROC *head; /* head of list of waiting PGPROCs */
struct PGPROC *tail; /* tail of list of waiting PGPROCs */
/* tail is undefined when head is NULL */
} LWLock;
/*
* Prior to PostgreSQL 9.4, every lightweight lock in the system was stored
* in a single array. For convenience and for compatibility with past
* releases, we still have a main array, but it's now also permissible to
* store LWLocks elsewhere in the main shared memory segment or in a dynamic
* shared memory segment. In the main array, we force the array stride to
* be a power of 2, which saves a few cycles in indexing, but more importantly
* also ensures that individual LWLocks don't cross cache line boundaries.
* This reduces cache contention problems, especially on AMD Opterons.
* (Of course, we have to also ensure that the array start address is suitably
* aligned.)
*
* Even on a 32-bit platform, an lwlock will be more than 16 bytes, because
* it contains 2 integers and 2 pointers, plus other stuff. It should fit
* into 32 bytes, though, unless slock_t is really big. On a 64-bit platform,
* it should fit into 32 bytes unless slock_t is larger than 4 bytes. We
* allow for that just in case.
*/
#define LWLOCK_PADDED_SIZE (sizeof(LWLock) <= 32 ? 32 : 64)
typedef union LWLockPadded
{
LWLock lock;
char pad[LWLOCK_PADDED_SIZE];
} LWLockPadded;
extern LWLockPadded *MainLWLockArray;
/*
* Some commonly-used locks have predefined positions within MainLWLockArray;
* defining macros here makes it much easier to keep track of these. If you
* add a lock, add it to the end to avoid renumbering the existing locks;
* if you remove a lock, consider leaving a gap in the numbering sequence for
* the benefit of DTrace and other external debugging scripts.
*/
#define BufFreelistLock (&MainLWLockArray[0].lock)
#define ShmemIndexLock (&MainLWLockArray[1].lock)
#define OidGenLock (&MainLWLockArray[2].lock)
#define XidGenLock (&MainLWLockArray[3].lock)
#define ProcArrayLock (&MainLWLockArray[4].lock)
#define SInvalReadLock (&MainLWLockArray[5].lock)
#define SInvalWriteLock (&MainLWLockArray[6].lock)
#define WALBufMappingLock (&MainLWLockArray[7].lock)
#define WALWriteLock (&MainLWLockArray[8].lock)
#define ControlFileLock (&MainLWLockArray[9].lock)
#define CheckpointLock (&MainLWLockArray[10].lock)
#define CLogControlLock (&MainLWLockArray[11].lock)
#define SubtransControlLock (&MainLWLockArray[12].lock)
#define MultiXactGenLock (&MainLWLockArray[13].lock)
#define MultiXactOffsetControlLock (&MainLWLockArray[14].lock)
#define MultiXactMemberControlLock (&MainLWLockArray[15].lock)
#define RelCacheInitLock (&MainLWLockArray[16].lock)
#define CheckpointerCommLock (&MainLWLockArray[17].lock)
#define TwoPhaseStateLock (&MainLWLockArray[18].lock)
#define TablespaceCreateLock (&MainLWLockArray[19].lock)
#define BtreeVacuumLock (&MainLWLockArray[20].lock)
#define AddinShmemInitLock (&MainLWLockArray[21].lock)
#define AutovacuumLock (&MainLWLockArray[22].lock)
#define AutovacuumScheduleLock (&MainLWLockArray[23].lock)
#define SyncScanLock (&MainLWLockArray[24].lock)
#define RelationMappingLock (&MainLWLockArray[25].lock)
#define AsyncCtlLock (&MainLWLockArray[26].lock)
#define AsyncQueueLock (&MainLWLockArray[27].lock)
#define SerializableXactHashLock (&MainLWLockArray[28].lock)
#define SerializableFinishedListLock (&MainLWLockArray[29].lock)
#define SerializablePredicateLockListLock (&MainLWLockArray[30].lock)
#define OldSerXidLock (&MainLWLockArray[31].lock)
#define SyncRepLock (&MainLWLockArray[32].lock)
#define BackgroundWorkerLock (&MainLWLockArray[33].lock)
#define DynamicSharedMemoryControlLock (&MainLWLockArray[34].lock)
#define AutoFileLock (&MainLWLockArray[35].lock)
#define NUM_INDIVIDUAL_LWLOCKS 36
/* /*
* It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
* here, but we need them to set up enum LWLockId correctly, and having * here, but we need them to figure out offsets within MainLWLockArray, and
* this file include lock.h or bufmgr.h would be backwards. * having this file include lock.h or bufmgr.h would be backwards.
*/ */
/* Number of partitions of the shared buffer mapping hashtable */ /* Number of partitions of the shared buffer mapping hashtable */
...@@ -31,68 +144,14 @@ ...@@ -31,68 +144,14 @@
#define LOG2_NUM_PREDICATELOCK_PARTITIONS 4 #define LOG2_NUM_PREDICATELOCK_PARTITIONS 4
#define NUM_PREDICATELOCK_PARTITIONS (1 << LOG2_NUM_PREDICATELOCK_PARTITIONS) #define NUM_PREDICATELOCK_PARTITIONS (1 << LOG2_NUM_PREDICATELOCK_PARTITIONS)
/* /* Offsets for various chunks of preallocated lwlocks. */
* We have a number of predefined LWLocks, plus a bunch of LWLocks that are #define BUFFER_MAPPING_LWLOCK_OFFSET NUM_INDIVIDUAL_LWLOCKS
* dynamically assigned (e.g., for shared buffers). The LWLock structures #define LOCK_MANAGER_LWLOCK_OFFSET \
* live in shared memory (since they contain shared data) and are identified (BUFFER_MAPPING_LWLOCK_OFFSET + NUM_BUFFER_PARTITIONS)
* by values of this enumerated type. We abuse the notion of an enum somewhat #define PREDICATELOCK_MANAGER_LWLOCK_OFFSET \
* by allowing values not listed in the enum declaration to be assigned. (NUM_INDIVIDUAL_LWLOCKS + NUM_LOCK_PARTITIONS)
* The extra value MaxDynamicLWLock is there to keep the compiler from #define NUM_FIXED_LWLOCKS \
* deciding that the enum can be represented as char or short ... (PREDICATELOCK_MANAGER_LWLOCK_OFFSET + NUM_PREDICATELOCK_PARTITIONS)
*
* If you remove a lock, please replace it with a placeholder. This retains
* the lock numbering, which is helpful for DTrace and other external
* debugging scripts.
*/
typedef enum LWLockId
{
BufFreelistLock,
ShmemIndexLock,
OidGenLock,
XidGenLock,
ProcArrayLock,
SInvalReadLock,
SInvalWriteLock,
WALBufMappingLock,
WALWriteLock,
ControlFileLock,
CheckpointLock,
CLogControlLock,
SubtransControlLock,
MultiXactGenLock,
MultiXactOffsetControlLock,
MultiXactMemberControlLock,
RelCacheInitLock,
CheckpointerCommLock,
TwoPhaseStateLock,
TablespaceCreateLock,
BtreeVacuumLock,
AddinShmemInitLock,
AutovacuumLock,
AutovacuumScheduleLock,
SyncScanLock,
RelationMappingLock,
AsyncCtlLock,
AsyncQueueLock,
SerializableXactHashLock,
SerializableFinishedListLock,
SerializablePredicateLockListLock,
OldSerXidLock,
SyncRepLock,
BackgroundWorkerLock,
DynamicSharedMemoryControlLock,
AutoFileLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
FirstPredicateLockMgrLock = FirstLockMgrLock + NUM_LOCK_PARTITIONS,
/* must be last except for MaxDynamicLWLock: */
NumFixedLWLocks = FirstPredicateLockMgrLock + NUM_PREDICATELOCK_PARTITIONS,
MaxDynamicLWLock = 1000000000
} LWLockId;
typedef enum LWLockMode typedef enum LWLockMode
{ {
...@@ -108,18 +167,47 @@ typedef enum LWLockMode ...@@ -108,18 +167,47 @@ typedef enum LWLockMode
extern bool Trace_lwlocks; extern bool Trace_lwlocks;
#endif #endif
extern LWLockId LWLockAssign(void); extern void LWLockAcquire(LWLock *lock, LWLockMode mode);
extern void LWLockAcquire(LWLockId lockid, LWLockMode mode); extern bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode);
extern bool LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode); extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode);
extern bool LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode); extern void LWLockRelease(LWLock *lock);
extern void LWLockRelease(LWLockId lockid);
extern void LWLockReleaseAll(void); extern void LWLockReleaseAll(void);
extern bool LWLockHeldByMe(LWLockId lockid); extern bool LWLockHeldByMe(LWLock *lock);
extern int NumLWLocks(void);
extern Size LWLockShmemSize(void); extern Size LWLockShmemSize(void);
extern void CreateLWLocks(void); extern void CreateLWLocks(void);
/*
* The traditional method for obtaining an lwlock for use by an extension is
* to call RequestAddinLWLocks() during postmaster startup; this will reserve
* space for the indicated number of locks in MainLWLockArray. Subsequently,
* a lock can be allocated using LWLockAssign.
*/
extern void RequestAddinLWLocks(int n); extern void RequestAddinLWLocks(int n);
extern LWLock *LWLockAssign(void);
/*
* There is another, more flexible method of obtaining lwlocks. First, call
* LWLockNewTrancheId just once to obtain a tranche ID; this allocates from
* a shared counter. Next, each individual process using the tranche should
* call LWLockRegisterTranche() to associate that tranche ID with appropriate
* metadata. Finally, LWLockInitialize should be called just once per lwlock,
* passing the tranche ID as an argument.
*
* It may seem strange that each process using the tranche must register it
* separately, but dynamic shared memory segments aren't guaranteed to be
* mapped at the same address in all coordinating backends, so storing the
* registration in the main shared memory segment wouldn't work for that case.
*/
extern int LWLockNewTrancheId(void);
extern void LWLockRegisterTranche(int, LWLockTranche *);
extern void LWLockInitialize(LWLock *, int tranche_id);
/*
* Prior to PostgreSQL 9.4, we used an enum type called LWLockId to refer
* to LWLocks. New code should instead use LWLock *. However, for the
* convenience of third-party code, we include the following typedef.
*/
typedef LWLock *LWLockId;
#endif /* LWLOCK_H */ #endif /* LWLOCK_H */
...@@ -131,7 +131,7 @@ struct PGPROC ...@@ -131,7 +131,7 @@ struct PGPROC
struct XidCache subxids; /* cache for subtransaction XIDs */ struct XidCache subxids; /* cache for subtransaction XIDs */
/* Per-backend LWLock. Protects fields below. */ /* Per-backend LWLock. Protects fields below. */
LWLockId backendLock; /* protects the fields below */ LWLock *backendLock; /* protects the fields below */
/* Lock manager data, recording fast-path locks taken by this backend. */ /* Lock manager data, recording fast-path locks taken by this backend. */
uint64 fpLockBits; /* lock modes held for each fast-path slot */ uint64 fpLockBits; /* lock modes held for each fast-path slot */
......
...@@ -896,7 +896,6 @@ LPWSTR ...@@ -896,7 +896,6 @@ LPWSTR
LSEG LSEG
LVRelStats LVRelStats
LWLock LWLock
LWLockId
LWLockMode LWLockMode
LWLockPadded LWLockPadded
LabelProvider LabelProvider
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment