Commit 3cba8999 authored by Robert Haas's avatar Robert Haas

Create a "fast path" for acquiring weak relation locks.

When an AccessShareLock, RowShareLock, or RowExclusiveLock is requested
on an unshared database relation, and we can verify that no conflicting
locks can possibly be present, record the lock in a per-backend queue,
stored within the PGPROC, rather than in the primary lock table.  This
eliminates a great deal of contention on the lock manager LWLocks.

This patch also refactors the interface between GetLockStatusData() and
pg_lock_status() to be a bit more abstract, so that we don't rely so
heavily on the lock manager's internal representation details.  The new
fast path lock structures don't have a LOCK or PROCLOCK structure to
return, so we mustn't depend on that for purposes of listing outstanding
locks.

Review by Jeff Davis.
parent 7ed8f6c5
......@@ -7040,6 +7040,12 @@
<entry></entry>
<entry>True if lock is held, false if lock is awaited</entry>
</row>
<row>
<entry><structfield>fastpath</structfield></entry>
<entry><type>boolean</type></entry>
<entry></entry>
<entry>True if lock was taken via fast path, false if taken via main lock table</entry>
</row>
</tbody>
</tgroup>
</table>
......@@ -7090,16 +7096,29 @@
<para>
The <structname>pg_locks</structname> view displays data from both the
regular lock manager and the predicate lock manager, which are
separate systems. When this view is accessed, the internal data
structures of each lock manager are momentarily locked, and copies are
made for the view to display. Each lock manager will therefore
produce a consistent set of results, but as we do not lock both lock
managers simultaneously, it is possible for locks to be taken or
released after we interrogate the regular lock manager and before we
interrogate the predicate lock manager. Each lock manager is only
locked for the minimum possible time so as to reduce the performance
impact of querying this view, but there could nevertheless be some
impact on database performance if it is frequently accessed.
separate systems. This data is not guaranteed to be entirely consistent.
Data on fast-path locks (with <structfield>fastpath</> = <literal>true</>)
is gathered from each backend one at a time, without freezing the state of
the entire lock manager, so it is possible for locks to be taken and
released as information is gathered. Note, however, that these locks are
known not to conflict with any other lock currently in place. After
all backends have been queried for fast-path locks, the remainder of the
lock manager is locked as a unit, and a consistent snapshot of all
remaining locks is dumped as an atomic action. Once the lock manager has
been unlocked, the predicate lock manager is similarly locked and all
predicate locks are dumped as an atomic action. Thus, with the exception
of fast-path locks, each lock manager will deliver a consistent set of
results, but as we do not lock both lock managers simultaneously, it is
possible for locks to be taken or released after we interrogate the regular
lock manager and before we interrogate the predicate lock manager.
</para>
<para>
Locking the lock manger and/or predicate lock manager could have some
impact on database performance if this view is very frequently accessed.
The locks are held only for the minimum amount of time necessary to
obtain data from the lock manager, but this does not completely eliminate
the possibility of a performance impact.
</para>
<para>
......
......@@ -4592,7 +4592,6 @@ MaxLivePostmasterChildren(void)
extern slock_t *ShmemLock;
extern LWLock *LWLockArray;
extern slock_t *ProcStructLock;
extern PROC_HDR *ProcGlobal;
extern PGPROC *AuxiliaryProcs;
extern PMSignalData *PMSignalState;
extern pgsocket pgStatSock;
......
......@@ -60,20 +60,29 @@ identical lock mode sets. See src/tools/backend/index.html and
src/include/storage/lock.h for more details. (Lock modes are also called
lock types in some places in the code and documentation.)
There are two fundamental lock structures in shared memory: the
per-lockable-object LOCK struct, and the per-lock-and-requestor PROCLOCK
struct. A LOCK object exists for each lockable object that currently has
locks held or requested on it. A PROCLOCK struct exists for each backend
that is holding or requesting lock(s) on each LOCK object.
In addition to these, each backend maintains an unshared LOCALLOCK structure
for each lockable object and lock mode that it is currently holding or
requesting. The shared lock structures only allow a single lock grant to
be made per lockable object/lock mode/backend. Internally to a backend,
however, the same lock may be requested and perhaps released multiple times
in a transaction, and it can also be held both transactionally and session-
wide. The internal request counts are held in LOCALLOCK so that the shared
data structures need not be accessed to alter them.
There are two main methods for recording locks in shared memory. The primary
mechanism uses two main structures: the per-lockable-object LOCK struct, and
the per-lock-and-requestor PROCLOCK struct. A LOCK object exists for each
lockable object that currently has locks held or requested on it. A PROCLOCK
struct exists for each backend that is holding or requesting lock(s) on each
LOCK object.
There is also a special "fast path" mechanism which backends may use to
record a limited number of locks with very specific characteristics: they must
use the DEFAULT lockmethod; they must represent a lock on a database relation
(not a shared relation), they must be a "weak" lock which is unlikely to
conflict (AccessShareLock, RowShareLock, or RowExclusiveLock); and the system
must be able to quickly verify that no conflicting locks could possibly be
present. See "Fast Path Locking", below, for more details.
Each backend also maintains an unshared LOCALLOCK structure for each lockable
object and lock mode that it is currently holding or requesting. The shared
lock structures only allow a single lock grant to be made per lockable
object/lock mode/backend. Internally to a backend, however, the same lock may
be requested and perhaps released multiple times in a transaction, and it can
also be held both transactionally and session-wide. The internal request
counts are held in LOCALLOCK so that the shared data structures need not be
accessed to alter them.
---------------------------------------------------------------------------
......@@ -250,6 +259,65 @@ tradeoff: we could instead recalculate the partition number from the LOCKTAG
when needed.
Fast Path Locking
-----------------
Fast path locking is a special purpose mechanism designed to reduce the
overhead of taking and releasing weak relation locks. SELECT, INSERT,
UPDATE, and DELETE must acquire a lock on every relation they operate on,
as well as various system catalogs that can be used internally. These locks
are notable not only for the very high frequency with which they are taken
and released, but also for the fact that they virtually never conflict.
Many DML operations can proceed in parallel against the same table at the
same time; only DDL operations such as CLUSTER, ALTER TABLE, or DROP -- or
explicit user action such as LOCK TABLE -- will create lock conflicts with
the "weak" locks (AccessShareLock, RowShareLock, RowExclusiveLock) acquired
by DML operations.
The primary locking mechanism does not cope well with this workload. Even
though the lock manager locks are partitioned, the locktag for any given
relation still falls in one, and only one, partition. Thus, if many short
queries are accessing the same relation, the lock manager partition lock for
that partition becomes a contention bottleneck. This effect is measurable
even on 2-core servers, and becomes very pronounced as core count increases.
To alleviate this bottleneck, beginning in PostgreSQL 9.2, each backend is
permitted to record a limited number of locks on unshared relations in an
array within its PGPROC structure, rather than using the primary lock table.
This is called the "fast path" mechanism, and can only be used when the
locker can verify that no conflicting locks can possibly exist.
A key point of this algorithm is that it must be possible to verify the
absence of possibly conflicting locks without fighting over a shared LWLock or
spinlock. Otherwise, this effort would simply move the contention bottleneck
from one place to another. We accomplish this using an array of 1024 integer
counters, which are in effect a 1024-way partitioning of the lock space. Each
counter records the number of "strong" locks (that is, ShareLock,
ShareRowExclusiveLock, ExclusiveLock, and AccessExclusiveLock) on unshared
relations that fall into that partition. When this counter is non-zero, the
fast path mechanism may not be used for relation locks in that partition. A
strong locker bumps the counter and then scans each per-backend array for
matching fast-path locks; any which are found must be transferred to the
primary lock table before attempting to acquire the lock, to ensure proper
lock conflict and deadlock detection.
On an SMP system, we must guarantee proper memory synchronization. Here we
rely on the fact that LWLock acquisition acts as a memory sequence point: if
A performs a store, A and B both acquire an LWLock in either order, and B
then performs a load on the same memory location, it is guaranteed to see
A's store. In this case, each backend's fast-path lock queue is protected
by an LWLock. A backend wishing to acquire a fast-path lock grabs this
LWLock before examining FastPathStrongLocks to check for the presence of a
conflicting strong lock. And the backend attempting to acquire a strong
lock, because it must transfer any matching weak locks taken via the fast-path
mechanism to the shared lock table, will acquire every LWLock protecting
a backend fast-path queue in turn. Thus, if we examine FastPathStrongLocks
and see a zero, then either the value is truly zero, or if it is a stale value,
the strong locker has yet to acquire the per-backend LWLock we now hold (or,
indeed, even the first per-backend LWLock) and will notice any weak lock we
take when it does.
The Deadlock Detection Algorithm
--------------------------------
......
......@@ -112,6 +112,84 @@ static const char *const lock_mode_names[] =
"AccessExclusiveLock"
};
/*
* Count of the number of fast path lock slots we believe to be used. This
* might be higher than the real number if another backend has transferred
* our locks to the primary lock table, but it can never be lower than the
* real value, since only we can acquire locks on our own behalf.
*/
static int FastPathLocalUseCount = 0;
/* Macros for manipulating proc->fpLockBits */
#define FAST_PATH_BITS_PER_SLOT 3
#define FAST_PATH_LOCKNUMBER_OFFSET 1
#define FAST_PATH_MASK ((1 << FAST_PATH_BITS_PER_SLOT) - 1)
#define FAST_PATH_GET_BITS(proc, n) \
(((proc)->fpLockBits >> (FAST_PATH_BITS_PER_SLOT * n)) & FAST_PATH_MASK)
#define FAST_PATH_BIT_POSITION(n, l) \
(AssertMacro((l) >= FAST_PATH_LOCKNUMBER_OFFSET), \
AssertMacro((l) < FAST_PATH_BITS_PER_SLOT+FAST_PATH_LOCKNUMBER_OFFSET), \
AssertMacro((n) < FP_LOCK_SLOTS_PER_BACKEND), \
((l) - FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT * (n)))
#define FAST_PATH_SET_LOCKMODE(proc, n, l) \
(proc)->fpLockBits |= UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)
#define FAST_PATH_CLEAR_LOCKMODE(proc, n, l) \
(proc)->fpLockBits &= ~(UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l))
#define FAST_PATH_CHECK_LOCKMODE(proc, n, l) \
((proc)->fpLockBits & (UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)))
/*
* The fast-path lock mechanism is concerned only with relation locks on
* unshared relations by backends bound to a database. The fast-path
* mechanism exists mostly to accelerate acquisition and release of locks
* that rarely conflict. Because ShareUpdateExclusiveLock is
* self-conflicting, it can't use the fast-path mechanism; but it also does
* not conflict with any of the locks that do, so we can ignore it completely.
*/
#define FastPathTag(locktag) \
((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
(locktag)->locktag_type == LOCKTAG_RELATION && \
(locktag)->locktag_field1 == MyDatabaseId && \
MyDatabaseId != InvalidOid)
#define FastPathWeakMode(mode) ((mode) < ShareUpdateExclusiveLock)
#define FastPathStrongMode(mode) ((mode) > ShareUpdateExclusiveLock)
#define FastPathRelevantMode(mode) ((mode) != ShareUpdateExclusiveLock)
static bool FastPathGrantLock(Oid relid, LOCKMODE lockmode);
static bool FastPathUnGrantLock(Oid relid, LOCKMODE lockmode);
static bool FastPathTransferLocks(LockMethod lockMethodTable,
const LOCKTAG *locktag, uint32 hashcode);
static PROCLOCK *FastPathGetLockEntry(LOCALLOCK *locallock);
/*
* To make the fast-path lock mechanism work, we must have some way of
* preventing the use of the fast-path when a conflicting lock might be
* present. We partition* the locktag space into FAST_PATH_HASH_BUCKETS
* partitions, and maintain an integer count of the number of "strong" lockers
* in each partition. When any "strong" lockers are present (which is
* hopefully not very often), the fast-path mechanism can't be used, and we
* must fall back to the slower method of pushing matching locks directly
* into the main lock tables.
*
* The deadlock detector does not know anything about the fast path mechanism,
* so any locks that might be involved in a deadlock must be transferred from
* the fast-path queues to the main lock table.
*/
#define FAST_PATH_STRONG_LOCK_HASH_BITS 10
#define FAST_PATH_STRONG_LOCK_HASH_PARTITIONS \
(1 << FAST_PATH_STRONG_LOCK_HASH_BITS)
#define FastPathStrongLockHashPartition(hashcode) \
((hashcode) % FAST_PATH_STRONG_LOCK_HASH_PARTITIONS)
typedef struct
{
slock_t mutex;
uint32 count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS];
} FastPathStrongLockData;
FastPathStrongLockData *FastPathStrongLocks;
#ifndef LOCK_DEBUG
static bool Dummy_trace = false;
#endif
......@@ -254,6 +332,8 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
static uint32 proclock_hash(const void *key, Size keysize);
static void RemoveLocalLock(LOCALLOCK *locallock);
static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
static void ReleaseLockForOwner(LOCALLOCK *locallock, ResourceOwner owner);
......@@ -262,6 +342,9 @@ static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
LockMethod lockMethodTable, uint32 hashcode,
bool wakeupNeeded);
static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
LOCKTAG *locktag, LOCKMODE lockmode,
bool decrement_strong_lock_count);
/*
......@@ -283,6 +366,7 @@ InitLocks(void)
int hash_flags;
long init_table_size,
max_table_size;
bool found;
/*
* Compute init/max size to request for lock hashtables. Note these
......@@ -328,6 +412,14 @@ InitLocks(void)
&info,
hash_flags);
/*
* Allocate fast-path structures.
*/
FastPathStrongLocks = ShmemInitStruct("Fast Path Strong Lock Data",
sizeof(FastPathStrongLockData), &found);
if (!found)
SpinLockInit(&FastPathStrongLocks->mutex);
/*
* Allocate non-shared hash table for LOCALLOCK structs. This stores lock
* counts and resource owner information.
......@@ -492,12 +584,9 @@ LockAcquireExtended(const LOCKTAG *locktag,
LOCALLOCK *locallock;
LOCK *lock;
PROCLOCK *proclock;
PROCLOCKTAG proclocktag;
bool found;
ResourceOwner owner;
uint32 hashcode;
uint32 proclock_hashcode;
int partition;
LWLockId partitionLock;
int status;
bool log_lock = false;
......@@ -553,6 +642,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
locallock->nLocks = 0;
locallock->numLockOwners = 0;
locallock->maxLockOwners = 8;
locallock->holdsStrongLockCount = FALSE;
locallock->lockOwners = NULL;
locallock->lockOwners = (LOCALLOCKOWNER *)
MemoryContextAlloc(TopMemoryContext,
......@@ -571,6 +661,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
locallock->maxLockOwners = newsize;
}
}
hashcode = locallock->hashcode;
/*
* If we already hold the lock, we can just increase the count locally.
......@@ -600,15 +691,244 @@ LockAcquireExtended(const LOCKTAG *locktag,
log_lock = true;
}
/* Locks that participate in the fast path require special handling. */
if (FastPathTag(locktag) && FastPathRelevantMode(lockmode))
{
uint32 fasthashcode;
fasthashcode = FastPathStrongLockHashPartition(hashcode);
/*
* If we remember having filled up the fast path array, we don't
* attempt to make any further use of it until we release some locks.
* It's possible that some other backend has transferred some of those
* locks to the shared hash table, leaving space free, but it's not
* worth acquiring the LWLock just to check. It's also possible that
* we're acquiring a second or third lock type on a relation we have
* already locked using the fast-path, but for now we don't worry about
* that case either.
*/
if (FastPathWeakMode(lockmode)
&& FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND)
{
bool acquired;
/*
* LWLockAcquire acts as a memory sequencing point, so it's safe
* to assume that any strong locker whose increment to
* FastPathStrongLocks->counts becomes visible after we test it has
* yet to begin to transfer fast-path locks.
*/
LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
if (FastPathStrongLocks->count[fasthashcode] != 0)
acquired = false;
else
acquired = FastPathGrantLock(locktag->locktag_field2, lockmode);
LWLockRelease(MyProc->backendLock);
if (acquired)
{
GrantLockLocal(locallock, owner);
return LOCKACQUIRE_OK;
}
}
else if (FastPathStrongMode(lockmode))
{
/*
* Adding to a memory location is not atomic, so we take a
* spinlock to ensure we don't collide with someone else trying
* to bump the count at the same time.
*
* XXX: It might be worth considering using an atomic fetch-and-add
* instruction here, on architectures where that is supported.
*/
Assert(locallock->holdsStrongLockCount == FALSE);
SpinLockAcquire(&FastPathStrongLocks->mutex);
FastPathStrongLocks->count[fasthashcode]++;
locallock->holdsStrongLockCount = TRUE;
SpinLockRelease(&FastPathStrongLocks->mutex);
if (!FastPathTransferLocks(lockMethodTable, locktag, hashcode))
{
if (reportMemoryError)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errhint("You might need to increase max_locks_per_transaction.")));
else
return LOCKACQUIRE_NOT_AVAIL;
}
}
}
/*
* Otherwise we've got to mess with the shared lock table.
*/
hashcode = locallock->hashcode;
partition = LockHashPartition(hashcode);
partitionLock = LockHashPartitionLock(hashcode);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/*
* Find or create a proclock entry with this tag
*/
proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
hashcode, lockmode);
if (!proclock)
{
LWLockRelease(partitionLock);
if (reportMemoryError)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errhint("You might need to increase max_locks_per_transaction.")));
else
return LOCKACQUIRE_NOT_AVAIL;
}
locallock->proclock = proclock;
lock = proclock->tag.myLock;
locallock->lock = lock;
/*
* If lock requested conflicts with locks requested by waiters, must join
* wait queue. Otherwise, check for conflict with already-held locks.
* (That's last because most complex check.)
*/
if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
status = STATUS_FOUND;
else
status = LockCheckConflicts(lockMethodTable, lockmode,
lock, proclock, MyProc);
if (status == STATUS_OK)
{
/* No conflict with held or previously requested locks */
GrantLock(lock, proclock, lockmode);
GrantLockLocal(locallock, owner);
}
else
{
Assert(status == STATUS_FOUND);
/*
* We can't acquire the lock immediately. If caller specified no
* blocking, remove useless table entries and return NOT_AVAIL without
* waiting.
*/
if (dontWait)
{
if (proclock->holdMask == 0)
{
uint32 proclock_hashcode;
proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
SHMQueueDelete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink);
if (!hash_search_with_hash_value(LockMethodProcLockHash,
(void *) &(proclock->tag),
proclock_hashcode,
HASH_REMOVE,
NULL))
elog(PANIC, "proclock table corrupted");
}
else
PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
lock->nRequested--;
lock->requested[lockmode]--;
LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
Assert(lock->nGranted <= lock->nRequested);
LWLockRelease(partitionLock);
if (locallock->nLocks == 0)
RemoveLocalLock(locallock);
return LOCKACQUIRE_NOT_AVAIL;
}
/*
* In Hot Standby perform early deadlock detection in normal backends.
* If deadlock found we release partition lock but do not return.
*/
if (RecoveryInProgress() && !InRecovery)
CheckRecoveryConflictDeadlock(partitionLock);
/*
* Set bitmask of locks this process already holds on this object.
*/
MyProc->heldLocks = proclock->holdMask;
/*
* Sleep till someone wakes me up.
*/
TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1,
locktag->locktag_field2,
locktag->locktag_field3,
locktag->locktag_field4,
locktag->locktag_type,
lockmode);
WaitOnLock(locallock, owner);
TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
locktag->locktag_field2,
locktag->locktag_field3,
locktag->locktag_field4,
locktag->locktag_type,
lockmode);
/*
* NOTE: do not do any material change of state between here and
* return. All required changes in locktable state must have been
* done when the lock was granted to us --- see notes in WaitOnLock.
*/
/*
* Check the proclock entry status, in case something in the ipc
* communication doesn't work correctly.
*/
if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
{
PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
/* Should we retry ? */
LWLockRelease(partitionLock);
elog(ERROR, "LockAcquire failed");
}
PROCLOCK_PRINT("LockAcquire: granted", proclock);
LOCK_PRINT("LockAcquire: granted", lock, lockmode);
}
LWLockRelease(partitionLock);
/*
* Emit a WAL record if acquisition of this lock need to be replayed in a
* standby server.
*/
if (log_lock)
{
/*
* Decode the locktag back to the original values, to avoid sending
* lots of empty bytes with every message. See lock.h to check how a
* locktag is defined for LOCKTAG_RELATION
*/
LogAccessExclusiveLock(locktag->locktag_field1,
locktag->locktag_field2);
}
return LOCKACQUIRE_OK;
}
/*
* Find or create LOCK and PROCLOCK objects as needed for a new lock
* request.
*/
static PROCLOCK *
SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode)
{
LOCK *lock;
PROCLOCK *proclock;
PROCLOCKTAG proclocktag;
uint32 proclock_hashcode;
bool found;
/*
* Find or create a lock with this tag.
*
......@@ -623,17 +943,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
HASH_ENTER_NULL,
&found);
if (!lock)
{
LWLockRelease(partitionLock);
if (reportMemoryError)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errhint("You might need to increase max_locks_per_transaction.")));
else
return LOCKACQUIRE_NOT_AVAIL;
}
locallock->lock = lock;
return NULL;
/*
* if it's a new lock object, initialize it
......@@ -662,7 +972,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
* Create the hash key for the proclock table.
*/
proclocktag.myLock = lock;
proclocktag.myProc = MyProc;
proclocktag.myProc = proc;
proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
......@@ -693,27 +1003,21 @@ LockAcquireExtended(const LOCKTAG *locktag,
NULL))
elog(PANIC, "lock table corrupted");
}
LWLockRelease(partitionLock);
if (reportMemoryError)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errhint("You might need to increase max_locks_per_transaction.")));
else
return LOCKACQUIRE_NOT_AVAIL;
return NULL;
}
locallock->proclock = proclock;
/*
* If new, initialize the new entry
*/
if (!found)
{
uint32 partition = LockHashPartition(hashcode);
proclock->holdMask = 0;
proclock->releaseMask = 0;
/* Add proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
SHMQueueInsertBefore(&(MyProc->myProcLocks[partition]),
SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
&proclock->procLink);
PROCLOCK_PRINT("LockAcquire: new", proclock);
}
......@@ -779,130 +1083,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
lock->tag.locktag_field1, lock->tag.locktag_field2,
lock->tag.locktag_field3);
/*
* If lock requested conflicts with locks requested by waiters, must join
* wait queue. Otherwise, check for conflict with already-held locks.
* (That's last because most complex check.)
*/
if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
status = STATUS_FOUND;
else
status = LockCheckConflicts(lockMethodTable, lockmode,
lock, proclock, MyProc);
if (status == STATUS_OK)
{
/* No conflict with held or previously requested locks */
GrantLock(lock, proclock, lockmode);
GrantLockLocal(locallock, owner);
}
else
{
Assert(status == STATUS_FOUND);
/*
* We can't acquire the lock immediately. If caller specified no
* blocking, remove useless table entries and return NOT_AVAIL without
* waiting.
*/
if (dontWait)
{
if (proclock->holdMask == 0)
{
SHMQueueDelete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink);
if (!hash_search_with_hash_value(LockMethodProcLockHash,
(void *) &(proclock->tag),
proclock_hashcode,
HASH_REMOVE,
NULL))
elog(PANIC, "proclock table corrupted");
}
else
PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
lock->nRequested--;
lock->requested[lockmode]--;
LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
Assert(lock->nGranted <= lock->nRequested);
LWLockRelease(partitionLock);
if (locallock->nLocks == 0)
RemoveLocalLock(locallock);
return LOCKACQUIRE_NOT_AVAIL;
}
/*
* In Hot Standby perform early deadlock detection in normal backends.
* If deadlock found we release partition lock but do not return.
*/
if (RecoveryInProgress() && !InRecovery)
CheckRecoveryConflictDeadlock(partitionLock);
/*
* Set bitmask of locks this process already holds on this object.
*/
MyProc->heldLocks = proclock->holdMask;
/*
* Sleep till someone wakes me up.
*/
TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1,
locktag->locktag_field2,
locktag->locktag_field3,
locktag->locktag_field4,
locktag->locktag_type,
lockmode);
WaitOnLock(locallock, owner);
TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
locktag->locktag_field2,
locktag->locktag_field3,
locktag->locktag_field4,
locktag->locktag_type,
lockmode);
/*
* NOTE: do not do any material change of state between here and
* return. All required changes in locktable state must have been
* done when the lock was granted to us --- see notes in WaitOnLock.
*/
/*
* Check the proclock entry status, in case something in the ipc
* communication doesn't work correctly.
*/
if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
{
PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
/* Should we retry ? */
LWLockRelease(partitionLock);
elog(ERROR, "LockAcquire failed");
}
PROCLOCK_PRINT("LockAcquire: granted", proclock);
LOCK_PRINT("LockAcquire: granted", lock, lockmode);
}
LWLockRelease(partitionLock);
/*
* Emit a WAL record if acquisition of this lock need to be replayed in a
* standby server.
*/
if (log_lock)
{
/*
* Decode the locktag back to the original values, to avoid sending
* lots of empty bytes with every message. See lock.h to check how a
* locktag is defined for LOCKTAG_RELATION
*/
LogAccessExclusiveLock(locktag->locktag_field1,
locktag->locktag_field2);
}
return LOCKACQUIRE_OK;
return proclock;
}
/*
......@@ -913,6 +1094,17 @@ RemoveLocalLock(LOCALLOCK *locallock)
{
pfree(locallock->lockOwners);
locallock->lockOwners = NULL;
if (locallock->holdsStrongLockCount)
{
uint32 fasthashcode;
fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
SpinLockAcquire(&FastPathStrongLocks->mutex);
Assert(FastPathStrongLocks->count[fasthashcode] > 0);
FastPathStrongLocks->count[fasthashcode]--;
locallock->holdsStrongLockCount = FALSE;
SpinLockRelease(&FastPathStrongLocks->mutex);
}
if (!hash_search(LockMethodLocalHash,
(void *) &(locallock->tag),
HASH_REMOVE, NULL))
......@@ -1439,6 +1631,26 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
if (locallock->nLocks > 0)
return TRUE;
/* Locks that participate in the fast path require special handling. */
if (FastPathTag(locktag) && FastPathWeakMode(lockmode)
&& FastPathLocalUseCount > 0)
{
bool released;
/*
* We might not find the lock here, even if we originally entered
* it here. Another backend may have moved it to the main table.
*/
LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
released = FastPathUnGrantLock(locktag->locktag_field2, lockmode);
LWLockRelease(MyProc->backendLock);
if (released)
{
RemoveLocalLock(locallock);
return TRUE;
}
}
/*
* Otherwise we've got to mess with the shared lock table.
*/
......@@ -1447,11 +1659,34 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/*
* We don't need to re-find the lock or proclock, since we kept their
* addresses in the locallock table, and they couldn't have been removed
* while we were holding a lock on them.
* Normally, we don't need to re-find the lock or proclock, since we kept
* their addresses in the locallock table, and they couldn't have been
* removed while we were holding a lock on them. But it's possible that
* the locks have been moved to the main hash table by another backend, in
* which case we might need to go look them up after all.
*/
lock = locallock->lock;
if (!lock)
{
PROCLOCKTAG proclocktag;
bool found;
Assert(FastPathTag(locktag) && FastPathWeakMode(lockmode));
lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
(void *) locktag,
locallock->hashcode,
HASH_FIND,
&found);
Assert(found && lock != NULL);
locallock->lock = lock;
proclocktag.myLock = lock;
proclocktag.myProc = MyProc;
locallock->proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
(void *) &proclocktag,
HASH_FIND, &found);
Assert(found);
}
LOCK_PRINT("LockRelease: found", lock, lockmode);
proclock = locallock->proclock;
PROCLOCK_PRINT("LockRelease: found", proclock);
......@@ -1529,6 +1764,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
LOCK *lock;
PROCLOCK *proclock;
int partition;
bool have_fast_path_lwlock = false;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
......@@ -1554,11 +1790,64 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
{
if (locallock->proclock == NULL || locallock->lock == NULL)
{
LOCKMODE lockmode = locallock->tag.mode;
Oid relid;
/*
* We must've run out of shared memory while trying to set up this
* lock. Just forget the local entry.
* If the LOCALLOCK entry is unused, we must've run out of shared
* memory while trying to set up this lock. Just forget the local
* entry.
*/
Assert(locallock->nLocks == 0);
if (locallock->nLocks == 0)
{
RemoveLocalLock(locallock);
continue;
}
/*
* Otherwise, we should be dealing with a lock acquired via the
* fast-path. If not, we've got trouble.
*/
if (!FastPathTag(&locallock->tag.lock)
|| !FastPathWeakMode(lockmode))
elog(PANIC, "locallock table corrupted");
/*
* If we don't currently hold the LWLock that protects our
* fast-path data structures, we must acquire it before
* attempting to release the lock via the fast-path.
*/
if (!have_fast_path_lwlock)
{
LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
have_fast_path_lwlock = true;
}
/* Attempt fast-path release. */
relid = locallock->tag.lock.locktag_field2;
if (FastPathUnGrantLock(relid, lockmode))
{
RemoveLocalLock(locallock);
continue;
}
/*
* Our lock, originally taken via the fast path, has been
* transferred to the main lock table. That's going to require
* some extra work, so release our fast-path lock before starting.
*/
LWLockRelease(MyProc->backendLock);
have_fast_path_lwlock = false;
/*
* Now dump the lock. We haven't got a pointer to the LOCK or
* PROCLOCK in this case, so we have to handle this a bit
* differently than a normal lock release. Unfortunately, this
* requires an extra LWLock acquire-and-release cycle on the
* partitionLock, but hopefully it shouldn't happen often.
*/
LockRefindAndRelease(lockMethodTable, MyProc,
&locallock->tag.lock, lockmode, false);
RemoveLocalLock(locallock);
continue;
}
......@@ -1606,6 +1895,9 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
RemoveLocalLock(locallock);
}
if (have_fast_path_lwlock)
LWLockRelease(MyProc->backendLock);
/*
* Now, scan each lock partition separately.
*/
......@@ -1807,23 +2099,252 @@ LockReassignCurrentOwner(void)
if (ic < 0)
continue; /* no current locks */
if (ip < 0)
{
/* Parent has no slot, so just give it child's slot */
lockOwners[ic].owner = parent;
}
else
if (ip < 0)
{
/* Parent has no slot, so just give it child's slot */
lockOwners[ic].owner = parent;
}
else
{
/* Merge child's count with parent's */
lockOwners[ip].nLocks += lockOwners[ic].nLocks;
/* compact out unused slot */
locallock->numLockOwners--;
if (ic < locallock->numLockOwners)
lockOwners[ic] = lockOwners[locallock->numLockOwners];
}
}
}
/*
* FastPathGrantLock
* Grant lock using per-backend fast-path array, if there is space.
*/
static bool
FastPathGrantLock(Oid relid, LOCKMODE lockmode)
{
uint32 f;
uint32 unused_slot = FP_LOCK_SLOTS_PER_BACKEND;
/* Scan for existing entry for this relid, remembering empty slot. */
for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
{
if (FAST_PATH_GET_BITS(MyProc, f) == 0)
unused_slot = f;
else if (MyProc->fpRelId[f] == relid)
{
Assert(!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode));
FAST_PATH_SET_LOCKMODE(MyProc, f, lockmode);
return true;
}
}
/* If no existing entry, use any empty slot. */
if (unused_slot < FP_LOCK_SLOTS_PER_BACKEND)
{
MyProc->fpRelId[unused_slot] = relid;
FAST_PATH_SET_LOCKMODE(MyProc, unused_slot, lockmode);
++FastPathLocalUseCount;
return true;
}
/* No existing entry, and no empty slot. */
return false;
}
/*
* FastPathUnGrantLock
* Release fast-path lock, if present. Update backend-private local
* use count, while we're at it.
*/
static bool
FastPathUnGrantLock(Oid relid, LOCKMODE lockmode)
{
uint32 f;
bool result = false;
FastPathLocalUseCount = 0;
for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
{
if (MyProc->fpRelId[f] == relid
&& FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
{
Assert(!result);
FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
result = true;
}
if (FAST_PATH_GET_BITS(MyProc, f) != 0)
++FastPathLocalUseCount;
}
return result;
}
/*
* FastPathTransferLocks
* Transfer locks matching the given lock tag from per-backend fast-path
* arrays to the shared hash table.
*/
static bool
FastPathTransferLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
uint32 hashcode)
{
LWLockId partitionLock = LockHashPartitionLock(hashcode);
Oid relid = locktag->locktag_field2;
uint32 i;
/*
* Every PGPROC that can potentially hold a fast-path lock is present
* in ProcGlobal->allProcs. Prepared transactions are not, but
* any outstanding fast-path locks held by prepared transactions are
* transferred to the main lock table.
*/
for (i = 0; i < ProcGlobal->allProcCount; i++)
{
PGPROC *proc = &ProcGlobal->allProcs[i];
uint32 f;
LWLockAcquire(proc->backendLock, LW_EXCLUSIVE);
/*
* If the target backend isn't referencing the same database as we are,
* then we needn't examine the individual relation IDs at all; none of
* them can be relevant.
*
* proc->databaseId is set at backend startup time and never changes
* thereafter, so it might be safe to perform this test before
* acquiring proc->backendLock. In particular, it's certainly safe to
* assume that if the target backend holds any fast-path locks, it must
* have performed a memory-fencing operation (in particular, an LWLock
* acquisition) since setting proc->databaseId. However, it's less
* clear that our backend is certain to have performed a memory fencing
* operation since the other backend set proc->databaseId. So for now,
* we test it after acquiring the LWLock just to be safe.
*/
if (proc->databaseId != MyDatabaseId)
{
LWLockRelease(proc->backendLock);
continue;
}
for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
{
uint32 lockmode;
/* Look for an allocated slot matching the given relid. */
if (relid != proc->fpRelId[f] || FAST_PATH_GET_BITS(proc, f) == 0)
continue;
/* Find or create lock object. */
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
for (lockmode = FAST_PATH_LOCKNUMBER_OFFSET;
lockmode < FAST_PATH_LOCKNUMBER_OFFSET+FAST_PATH_BITS_PER_SLOT;
++lockmode)
{
PROCLOCK *proclock;
if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode))
continue;
proclock = SetupLockInTable(lockMethodTable, proc, locktag,
hashcode, lockmode);
if (!proclock)
{
LWLockRelease(partitionLock);
return false;
}
GrantLock(proclock->tag.myLock, proclock, lockmode);
FAST_PATH_CLEAR_LOCKMODE(proc, f, lockmode);
}
LWLockRelease(partitionLock);
}
LWLockRelease(proc->backendLock);
}
return true;
}
/*
* FastPathGetLockEntry
* Return the PROCLOCK for a lock originally taken via the fast-path,
* transferring it to the primary lock table if necessary.
*/
static PROCLOCK *
FastPathGetLockEntry(LOCALLOCK *locallock)
{
LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
LOCKTAG *locktag = &locallock->tag.lock;
PROCLOCK *proclock = NULL;
LWLockId partitionLock = LockHashPartitionLock(locallock->hashcode);
Oid relid = locktag->locktag_field2;
uint32 f;
LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
{
uint32 lockmode;
/* Look for an allocated slot matching the given relid. */
if (relid != MyProc->fpRelId[f] || FAST_PATH_GET_BITS(MyProc, f) == 0)
continue;
/* If we don't have a lock of the given mode, forget it! */
lockmode = locallock->tag.mode;
if (!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
break;
/* Find or create lock object. */
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
locallock->hashcode, lockmode);
if (!proclock)
{
/* Merge child's count with parent's */
lockOwners[ip].nLocks += lockOwners[ic].nLocks;
/* compact out unused slot */
locallock->numLockOwners--;
if (ic < locallock->numLockOwners)
lockOwners[ic] = lockOwners[locallock->numLockOwners];
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errhint("You might need to increase max_locks_per_transaction.")));
}
GrantLock(proclock->tag.myLock, proclock, lockmode);
FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
LWLockRelease(partitionLock);
}
LWLockRelease(MyProc->backendLock);
/* Lock may have already been transferred by some other backend. */
if (proclock == NULL)
{
LOCK *lock;
PROCLOCKTAG proclocktag;
uint32 proclock_hashcode;
LWLockAcquire(partitionLock, LW_SHARED);
lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
(void *) locktag,
locallock->hashcode,
HASH_FIND,
NULL);
if (!lock)
elog(ERROR, "failed to re-find shared lock object");
proclocktag.myLock = lock;
proclocktag.myProc = MyProc;
proclock_hashcode = ProcLockHashCode(&proclocktag, locallock->hashcode);
proclock = (PROCLOCK *)
hash_search_with_hash_value(LockMethodProcLockHash,
(void *) &proclocktag,
proclock_hashcode,
HASH_FIND,
NULL);
if (!proclock)
elog(ERROR, "failed to re-find shared proclock object");
LWLockRelease(partitionLock);
}
}
return proclock;
}
/*
* GetLockConflicts
......@@ -1854,6 +2375,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
uint32 hashcode;
LWLockId partitionLock;
int count = 0;
int fast_count = 0;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
......@@ -1877,12 +2399,100 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
vxids = (VirtualTransactionId *)
palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));
/*
* Look up the lock object matching the tag.
*/
/* Compute hash code and partiton lock, and look up conflicting modes. */
hashcode = LockTagHashCode(locktag);
partitionLock = LockHashPartitionLock(hashcode);
conflictMask = lockMethodTable->conflictTab[lockmode];
/*
* Fast path locks might not have been entered in the primary lock table.
* But only strong locks can conflict with anything that might have been
* taken via the fast-path mechanism.
*/
if (FastPathTag(locktag) && FastPathStrongMode(lockmode))
{
int i;
Oid relid = locktag->locktag_field2;
VirtualTransactionId vxid;
/*
* Iterate over relevant PGPROCs. Anything held by a prepared
* transaction will have been transferred to the primary lock table,
* so we need not worry about those. This is all a bit fuzzy,
* because new locks could be taken after we've visited a particular
* partition, but the callers had better be prepared to deal with
* that anyway, since the locks could equally well be taken between the
* time we return the value and the time the caller does something
* with it.
*/
for (i = 0; i < ProcGlobal->allProcCount; i++)
{
PGPROC *proc = &ProcGlobal->allProcs[i];
uint32 f;
/* A backend never blocks itself */
if (proc == MyProc)
continue;
LWLockAcquire(proc->backendLock, LW_SHARED);
/*
* If the target backend isn't referencing the same database as we
* are, then we needn't examine the individual relation IDs at all;
* none of them can be relevant.
*
* See FastPathTransferLocks() for discussion of why we do this
* test after acquiring the lock.
*/
if (proc->databaseId != MyDatabaseId)
{
LWLockRelease(proc->backendLock);
continue;
}
for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
{
uint32 lockmask;
/* Look for an allocated slot matching the given relid. */
if (relid != proc->fpRelId[f])
continue;
lockmask = FAST_PATH_GET_BITS(proc, f);
if (!lockmask)
continue;
lockmask <<= FAST_PATH_LOCKNUMBER_OFFSET;
/*
* There can only be one entry per relation, so if we found
* it and it doesn't conflict, we can skip the rest of the
* slots.
*/
if ((lockmask & conflictMask) == 0)
break;
/* Conflict! */
GET_VXID_FROM_PGPROC(vxid, *proc);
/*
* If we see an invalid VXID, then either the xact has already
* committed (or aborted), or it's a prepared xact. In either
* case we may ignore it.
*/
if (VirtualTransactionIdIsValid(vxid))
vxids[count++] = vxid;
break;
}
LWLockRelease(proc->backendLock);
}
}
/* Remember how many fast-path conflicts we found. */
fast_count = count;
/*
* Look up the lock object matching the tag.
*/
LWLockAcquire(partitionLock, LW_SHARED);
lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
......@@ -1903,7 +2513,6 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
/*
* Examine each existing holder (or awaiter) of the lock.
*/
conflictMask = lockMethodTable->conflictTab[lockmode];
procLocks = &(lock->procLocks);
......@@ -1929,7 +2538,16 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
* case we may ignore it.
*/
if (VirtualTransactionIdIsValid(vxid))
vxids[count++] = vxid;
{
int i;
/* Avoid duplicate entries. */
for (i = 0; i < fast_count; ++i)
if (VirtualTransactionIdEquals(vxids[i], vxid))
break;
if (i >= fast_count)
vxids[count++] = vxid;
}
}
}
......@@ -1945,6 +2563,98 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
return vxids;
}
/*
* Find a lock in the shared lock table and release it. It is the caller's
* responsibility to verify that this is a sane thing to do. (For example, it
* would be bad to release a lock here if there might still be a LOCALLOCK
* object with pointers to it.)
*
* We currently use this in two situations: first, to release locks held by
* prepared transactions on commit (see lock_twophase_postcommit); and second,
* to release locks taken via the fast-path, transferred to the main hash
* table, and then released (see LockReleaseAll).
*/
static void
LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
LOCKTAG *locktag, LOCKMODE lockmode,
bool decrement_strong_lock_count)
{
LOCK *lock;
PROCLOCK *proclock;
PROCLOCKTAG proclocktag;
uint32 hashcode;
uint32 proclock_hashcode;
LWLockId partitionLock;
bool wakeupNeeded;
hashcode = LockTagHashCode(locktag);
partitionLock = LockHashPartitionLock(hashcode);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/*
* Re-find the lock object (it had better be there).
*/
lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
(void *) locktag,
hashcode,
HASH_FIND,
NULL);
if (!lock)
elog(PANIC, "failed to re-find shared lock object");
/*
* Re-find the proclock object (ditto).
*/
proclocktag.myLock = lock;
proclocktag.myProc = proc;
proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
(void *) &proclocktag,
proclock_hashcode,
HASH_FIND,
NULL);
if (!proclock)
elog(PANIC, "failed to re-find shared proclock object");
/*
* Double-check that we are actually holding a lock of the type we want to
* release.
*/
if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
{
PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
LWLockRelease(partitionLock);
elog(WARNING, "you don't own a lock of type %s",
lockMethodTable->lockModeNames[lockmode]);
return;
}
/*
* Do the releasing. CleanUpLock will waken any now-wakable waiters.
*/
wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
CleanUpLock(lock, proclock,
lockMethodTable, hashcode,
wakeupNeeded);
LWLockRelease(partitionLock);
/*
* Decrement strong lock count. This logic is needed only for 2PC.
*/
if (decrement_strong_lock_count
&& FastPathTag(&lock->tag) && FastPathStrongMode(lockmode))
{
uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
SpinLockAcquire(&FastPathStrongLocks->mutex);
FastPathStrongLocks->count[fasthashcode]--;
SpinLockRelease(&FastPathStrongLocks->mutex);
}
}
/*
* AtPrepare_Locks
......@@ -1966,8 +2676,10 @@ AtPrepare_Locks(void)
LOCALLOCK *locallock;
/*
* We don't need to touch shared memory for this --- all the necessary
* state information is in the locallock table.
* For the most part, we don't need to touch shared memory for this ---
* all the necessary state information is in the locallock table.
* Fast-path locks are an exception, however: we move any such locks
* to the main table before allowing PREPARE TRANSACTION to succeed.
*/
hash_seq_init(&status, LockMethodLocalHash);
......@@ -2000,6 +2712,24 @@ AtPrepare_Locks(void)
elog(ERROR, "cannot PREPARE when session locks exist");
}
/*
* If the local lock was taken via the fast-path, we need to move it
* to the primary lock table, or just get a pointer to the existing
* primary lock table if by chance it's already been transferred.
*/
if (locallock->proclock == NULL)
{
locallock->proclock = FastPathGetLockEntry(locallock);
locallock->lock = locallock->proclock->tag.myLock;
}
/*
* Arrange not to release any strong lock count held by this lock
* entry. We must retain the count until the prepared transaction
* is committed or rolled back.
*/
locallock->holdsStrongLockCount = FALSE;
/*
* Create a 2PC record.
*/
......@@ -2257,10 +2987,62 @@ GetLockStatusData(void)
data = (LockData *) palloc(sizeof(LockData));
/* Guess how much space we'll need. */
els = MaxBackends;
el = 0;
data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * els);
/*
* First, we iterate through the per-backend fast-path arrays, locking
* them one at a time. This might produce an inconsistent picture of the
* system state, but taking all of those LWLocks at the same time seems
* impractical (in particular, note MAX_SIMUL_LWLOCKS). It shouldn't
* matter too much, because none of these locks can be involved in lock
* conflicts anyway - anything that might must be present in the main
* lock table.
*/
for (i = 0; i < ProcGlobal->allProcCount; ++i)
{
PGPROC *proc = &ProcGlobal->allProcs[i];
uint32 f;
LWLockAcquire(proc->backendLock, LW_SHARED);
for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; ++f)
{
LockInstanceData *instance = &data->locks[el];
uint32 lockbits = FAST_PATH_GET_BITS(proc, f);
/* Skip unallocated slots. */
if (!lockbits)
continue;
if (el >= els)
{
els += MaxBackends;
data->locks = (LockInstanceData *)
repalloc(data->locks, sizeof(LockInstanceData) * els);
}
SET_LOCKTAG_RELATION(instance->locktag, proc->databaseId,
proc->fpRelId[f]);
instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET;
instance->waitLockMode = NoLock;
instance->backend = proc->backendId;
instance->lxid = proc->lxid;
instance->pid = proc->pid;
instance->fastpath = true;
el++;
}
LWLockRelease(proc->backendLock);
}
/*
* Acquire lock on the entire shared lock data structure. We can't
* operate one partition at a time if we want to deliver a self-consistent
* view of the state.
* Next, acquire lock on the entire shared lock data structure. We do
* this so that, at least for locks in the primary lock table, the state
* will be self-consistent.
*
* Since this is a read-only operation, we take shared instead of
* exclusive lock. There's not a whole lot of point to this, because all
......@@ -2274,25 +3056,33 @@ GetLockStatusData(void)
LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
/* Now we can safely count the number of proclocks */
els = hash_get_num_entries(LockMethodProcLockHash);
data->nelements = els;
data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * els);
data->procs = (PGPROC *) palloc(sizeof(PGPROC) * els);
data->locks = (LOCK *) palloc(sizeof(LOCK) * els);
data->nelements = el + hash_get_num_entries(LockMethodProcLockHash);
if (data->nelements > els)
{
els = data->nelements;
data->locks = (LockInstanceData *)
repalloc(data->locks, sizeof(LockInstanceData) * els);
}
/* Now scan the tables to copy the data */
hash_seq_init(&seqstat, LockMethodProcLockHash);
el = 0;
while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
{
PGPROC *proc = proclock->tag.myProc;
LOCK *lock = proclock->tag.myLock;
LockInstanceData *instance = &data->locks[el];
memcpy(&(data->proclocks[el]), proclock, sizeof(PROCLOCK));
memcpy(&(data->procs[el]), proc, sizeof(PGPROC));
memcpy(&(data->locks[el]), lock, sizeof(LOCK));
memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
instance->holdMask = proclock->holdMask;
if (proc->waitLock == proclock->tag.myLock)
instance->waitLockMode = proc->waitLockMode;
else
instance->waitLockMode = NoLock;
instance->backend = proc->backendId;
instance->lxid = proc->lxid;
instance->pid = proc->pid;
instance->fastpath = false;
el++;
}
......@@ -2658,6 +3448,18 @@ lock_twophase_recover(TransactionId xid, uint16 info,
*/
GrantLock(lock, proclock, lockmode);
/*
* Bump strong lock count, to make sure any fast-path lock requests won't
* be granted without consulting the primary lock table.
*/
if (FastPathTag(&lock->tag) && FastPathStrongMode(lockmode))
{
uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
SpinLockAcquire(&FastPathStrongLocks->mutex);
FastPathStrongLocks->count[fasthashcode]++;
SpinLockRelease(&FastPathStrongLocks->mutex);
}
LWLockRelease(partitionLock);
}
......@@ -2704,81 +3506,18 @@ lock_twophase_postcommit(TransactionId xid, uint16 info,
TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
PGPROC *proc = TwoPhaseGetDummyProc(xid);
LOCKTAG *locktag;
LOCKMODE lockmode;
LOCKMETHODID lockmethodid;
LOCK *lock;
PROCLOCK *proclock;
PROCLOCKTAG proclocktag;
uint32 hashcode;
uint32 proclock_hashcode;
LWLockId partitionLock;
LockMethod lockMethodTable;
bool wakeupNeeded;
Assert(len == sizeof(TwoPhaseLockRecord));
locktag = &rec->locktag;
lockmode = rec->lockmode;
lockmethodid = locktag->locktag_lockmethodid;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
lockMethodTable = LockMethods[lockmethodid];
hashcode = LockTagHashCode(locktag);
partitionLock = LockHashPartitionLock(hashcode);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/*
* Re-find the lock object (it had better be there).
*/
lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
(void *) locktag,
hashcode,
HASH_FIND,
NULL);
if (!lock)
elog(PANIC, "failed to re-find shared lock object");
/*
* Re-find the proclock object (ditto).
*/
proclocktag.myLock = lock;
proclocktag.myProc = proc;
proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
(void *) &proclocktag,
proclock_hashcode,
HASH_FIND,
NULL);
if (!proclock)
elog(PANIC, "failed to re-find shared proclock object");
/*
* Double-check that we are actually holding a lock of the type we want to
* release.
*/
if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
{
PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
LWLockRelease(partitionLock);
elog(WARNING, "you don't own a lock of type %s",
lockMethodTable->lockModeNames[lockmode]);
return;
}
/*
* Do the releasing. CleanUpLock will waken any now-wakable waiters.
*/
wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
CleanUpLock(lock, proclock,
lockMethodTable, hashcode,
wakeupNeeded);
LWLockRelease(partitionLock);
LockRefindAndRelease(lockMethodTable, proc, locktag, rec->lockmode, true);
}
/*
......
......@@ -167,6 +167,9 @@ NumLWLocks(void)
/* bufmgr.c needs two for each shared buffer */
numLocks += 2 * NBuffers;
/* lock.c needs one per backend */
numLocks += MaxBackends;
/* clog.c needs one per CLOG buffer */
numLocks += NUM_CLOG_BUFFERS;
......
......@@ -67,7 +67,7 @@ PGPROC *MyProc = NULL;
NON_EXEC_STATIC slock_t *ProcStructLock = NULL;
/* Pointers to shared-memory structures */
NON_EXEC_STATIC PROC_HDR *ProcGlobal = NULL;
PROC_HDR *ProcGlobal = NULL;
NON_EXEC_STATIC PGPROC *AuxiliaryProcs = NULL;
/* If we are waiting for a lock, this points to the associated LOCALLOCK */
......@@ -183,6 +183,8 @@ InitProcGlobal(void)
* one of these purposes, and they do not move between groups.
*/
procs = (PGPROC *) ShmemAlloc(TotalProcs * sizeof(PGPROC));
ProcGlobal->allProcs = procs;
ProcGlobal->allProcCount = TotalProcs;
if (!procs)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
......@@ -192,6 +194,7 @@ InitProcGlobal(void)
{
/* Common initialization for all PGPROCs, regardless of type. */
PGSemaphoreCreate(&(procs[i].sem));
procs[i].backendLock = LWLockAssign();
InitSharedLatch(&procs[i].waitLatch);
/*
......
......@@ -49,6 +49,8 @@ typedef struct
int predLockIdx; /* current index for pred lock */
} PG_Lock_Status;
/* Number of columns in pg_locks output */
#define NUM_LOCK_STATUS_COLUMNS 15
/*
* VXIDGetDatum - Construct a text representation of a VXID
......@@ -96,7 +98,7 @@ pg_lock_status(PG_FUNCTION_ARGS)
/* build tupdesc for result tuples */
/* this had better match pg_locks view in system_views.sql */
tupdesc = CreateTemplateTupleDesc(14, false);
tupdesc = CreateTemplateTupleDesc(NUM_LOCK_STATUS_COLUMNS, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "locktype",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "database",
......@@ -125,6 +127,8 @@ pg_lock_status(PG_FUNCTION_ARGS)
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 14, "granted",
BOOLOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 15, "fastpath",
BOOLOID, -1, 0);
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
......@@ -149,21 +153,17 @@ pg_lock_status(PG_FUNCTION_ARGS)
while (mystatus->currIdx < lockData->nelements)
{
PROCLOCK *proclock;
LOCK *lock;
PGPROC *proc;
bool granted;
LOCKMODE mode = 0;
const char *locktypename;
char tnbuf[32];
Datum values[14];
bool nulls[14];
Datum values[NUM_LOCK_STATUS_COLUMNS];
bool nulls[NUM_LOCK_STATUS_COLUMNS];
HeapTuple tuple;
Datum result;
LockInstanceData *instance;
proclock = &(lockData->proclocks[mystatus->currIdx]);
lock = &(lockData->locks[mystatus->currIdx]);
proc = &(lockData->procs[mystatus->currIdx]);
instance = &(lockData->locks[mystatus->currIdx]);
/*
* Look to see if there are any held lock modes in this PROCLOCK. If
......@@ -171,14 +171,14 @@ pg_lock_status(PG_FUNCTION_ARGS)
* again.
*/
granted = false;
if (proclock->holdMask)
if (instance->holdMask)
{
for (mode = 0; mode < MAX_LOCKMODES; mode++)
{
if (proclock->holdMask & LOCKBIT_ON(mode))
if (instance->holdMask & LOCKBIT_ON(mode))
{
granted = true;
proclock->holdMask &= LOCKBIT_OFF(mode);
instance->holdMask &= LOCKBIT_OFF(mode);
break;
}
}
......@@ -190,10 +190,10 @@ pg_lock_status(PG_FUNCTION_ARGS)
*/
if (!granted)
{
if (proc->waitLock == proclock->tag.myLock)
if (instance->waitLockMode != NoLock)
{
/* Yes, so report it with proper mode */
mode = proc->waitLockMode;
mode = instance->waitLockMode;
/*
* We are now done with this PROCLOCK, so advance pointer to
......@@ -218,22 +218,22 @@ pg_lock_status(PG_FUNCTION_ARGS)
MemSet(values, 0, sizeof(values));
MemSet(nulls, false, sizeof(nulls));
if (lock->tag.locktag_type <= LOCKTAG_LAST_TYPE)
locktypename = LockTagTypeNames[lock->tag.locktag_type];
if (instance->locktag.locktag_type <= LOCKTAG_LAST_TYPE)
locktypename = LockTagTypeNames[instance->locktag.locktag_type];
else
{
snprintf(tnbuf, sizeof(tnbuf), "unknown %d",
(int) lock->tag.locktag_type);
(int) instance->locktag.locktag_type);
locktypename = tnbuf;
}
values[0] = CStringGetTextDatum(locktypename);
switch ((LockTagType) lock->tag.locktag_type)
switch ((LockTagType) instance->locktag.locktag_type)
{
case LOCKTAG_RELATION:
case LOCKTAG_RELATION_EXTEND:
values[1] = ObjectIdGetDatum(lock->tag.locktag_field1);
values[2] = ObjectIdGetDatum(lock->tag.locktag_field2);
values[1] = ObjectIdGetDatum(instance->locktag.locktag_field1);
values[2] = ObjectIdGetDatum(instance->locktag.locktag_field2);
nulls[3] = true;
nulls[4] = true;
nulls[5] = true;
......@@ -243,9 +243,9 @@ pg_lock_status(PG_FUNCTION_ARGS)
nulls[9] = true;
break;
case LOCKTAG_PAGE:
values[1] = ObjectIdGetDatum(lock->tag.locktag_field1);
values[2] = ObjectIdGetDatum(lock->tag.locktag_field2);
values[3] = UInt32GetDatum(lock->tag.locktag_field3);
values[1] = ObjectIdGetDatum(instance->locktag.locktag_field1);
values[2] = ObjectIdGetDatum(instance->locktag.locktag_field2);
values[3] = UInt32GetDatum(instance->locktag.locktag_field3);
nulls[4] = true;
nulls[5] = true;
nulls[6] = true;
......@@ -254,10 +254,10 @@ pg_lock_status(PG_FUNCTION_ARGS)
nulls[9] = true;
break;
case LOCKTAG_TUPLE:
values[1] = ObjectIdGetDatum(lock->tag.locktag_field1);
values[2] = ObjectIdGetDatum(lock->tag.locktag_field2);
values[3] = UInt32GetDatum(lock->tag.locktag_field3);
values[4] = UInt16GetDatum(lock->tag.locktag_field4);
values[1] = ObjectIdGetDatum(instance->locktag.locktag_field1);
values[2] = ObjectIdGetDatum(instance->locktag.locktag_field2);
values[3] = UInt32GetDatum(instance->locktag.locktag_field3);
values[4] = UInt16GetDatum(instance->locktag.locktag_field4);
nulls[5] = true;
nulls[6] = true;
nulls[7] = true;
......@@ -265,7 +265,8 @@ pg_lock_status(PG_FUNCTION_ARGS)
nulls[9] = true;
break;
case LOCKTAG_TRANSACTION:
values[6] = TransactionIdGetDatum(lock->tag.locktag_field1);
values[6] =
TransactionIdGetDatum(instance->locktag.locktag_field1);
nulls[1] = true;
nulls[2] = true;
nulls[3] = true;
......@@ -276,8 +277,8 @@ pg_lock_status(PG_FUNCTION_ARGS)
nulls[9] = true;
break;
case LOCKTAG_VIRTUALTRANSACTION:
values[5] = VXIDGetDatum(lock->tag.locktag_field1,
lock->tag.locktag_field2);
values[5] = VXIDGetDatum(instance->locktag.locktag_field1,
instance->locktag.locktag_field2);
nulls[1] = true;
nulls[2] = true;
nulls[3] = true;
......@@ -291,10 +292,10 @@ pg_lock_status(PG_FUNCTION_ARGS)
case LOCKTAG_USERLOCK:
case LOCKTAG_ADVISORY:
default: /* treat unknown locktags like OBJECT */
values[1] = ObjectIdGetDatum(lock->tag.locktag_field1);
values[7] = ObjectIdGetDatum(lock->tag.locktag_field2);
values[8] = ObjectIdGetDatum(lock->tag.locktag_field3);
values[9] = Int16GetDatum(lock->tag.locktag_field4);
values[1] = ObjectIdGetDatum(instance->locktag.locktag_field1);
values[7] = ObjectIdGetDatum(instance->locktag.locktag_field2);
values[8] = ObjectIdGetDatum(instance->locktag.locktag_field3);
values[9] = Int16GetDatum(instance->locktag.locktag_field4);
nulls[2] = true;
nulls[3] = true;
nulls[4] = true;
......@@ -303,13 +304,14 @@ pg_lock_status(PG_FUNCTION_ARGS)
break;
}
values[10] = VXIDGetDatum(proc->backendId, proc->lxid);
if (proc->pid != 0)
values[11] = Int32GetDatum(proc->pid);
values[10] = VXIDGetDatum(instance->backend, instance->lxid);
if (instance->pid != 0)
values[11] = Int32GetDatum(instance->pid);
else
nulls[11] = true;
values[12] = CStringGetTextDatum(GetLockmodeName(LOCK_LOCKMETHOD(*lock), mode));
values[12] = CStringGetTextDatum(GetLockmodeName(instance->locktag.locktag_lockmethodid, mode));
values[13] = BoolGetDatum(granted);
values[14] = BoolGetDatum(instance->fastpath);
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
result = HeapTupleGetDatum(tuple);
......@@ -327,8 +329,8 @@ pg_lock_status(PG_FUNCTION_ARGS)
PREDICATELOCKTARGETTAG *predTag = &(predLockData->locktags[mystatus->predLockIdx]);
SERIALIZABLEXACT *xact = &(predLockData->xacts[mystatus->predLockIdx]);
Datum values[14];
bool nulls[14];
Datum values[NUM_LOCK_STATUS_COLUMNS];
bool nulls[NUM_LOCK_STATUS_COLUMNS];
HeapTuple tuple;
Datum result;
......@@ -374,11 +376,12 @@ pg_lock_status(PG_FUNCTION_ARGS)
nulls[11] = true;
/*
* Lock mode. Currently all predicate locks are SIReadLocks, which are
* always held (never waiting)
* Lock mode. Currently all predicate locks are SIReadLocks, which
* are always held (never waiting) and have no fast path
*/
values[12] = CStringGetTextDatum("SIReadLock");
values[13] = BoolGetDatum(true);
values[14] = BoolGetDatum(false);
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
result = HeapTupleGetDatum(tuple);
......
......@@ -2811,7 +2811,7 @@ DATA(insert OID = 2078 ( set_config PGNSP PGUID 12 1 0 0 0 f f f f f v 3 0 25
DESCR("SET X as a function");
DATA(insert OID = 2084 ( pg_show_all_settings PGNSP PGUID 12 1 1000 0 0 f f f t t s 0 0 2249 "" "{25,25,25,25,25,25,25,25,25,25,25,1009,25,25,25,23}" "{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{name,setting,unit,category,short_desc,extra_desc,context,vartype,source,min_val,max_val,enumvals,boot_val,reset_val,sourcefile,sourceline}" _null_ show_all_settings _null_ _null_ _null_ ));
DESCR("SHOW ALL as a function");
DATA(insert OID = 1371 ( pg_lock_status PGNSP PGUID 12 1 1000 0 0 f f f t t v 0 0 2249 "" "{25,26,26,23,21,25,28,26,26,21,25,23,25,16}" "{o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{locktype,database,relation,page,tuple,virtualxid,transactionid,classid,objid,objsubid,virtualtransaction,pid,mode,granted}" _null_ pg_lock_status _null_ _null_ _null_ ));
DATA(insert OID = 1371 ( pg_lock_status PGNSP PGUID 12 1 1000 0 0 f f f t t v 0 0 2249 "" "{25,26,26,23,21,25,28,26,26,21,25,23,25,16,16}" "{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{locktype,database,relation,page,tuple,virtualxid,transactionid,classid,objid,objsubid,virtualtransaction,pid,mode,granted,fastpath}" _null_ pg_lock_status _null_ _null_ _null_ ));
DESCR("view system lock information");
DATA(insert OID = 1065 ( pg_prepared_xact PGNSP PGUID 12 1 1000 0 0 f f f t t v 0 0 2249 "" "{28,25,1184,26,26}" "{o,o,o,o,o}" "{transaction,gid,prepared,ownerid,dbid}" _null_ pg_prepared_xact _null_ _null_ _null_ ));
DESCR("view two-phase transactions");
......
......@@ -412,6 +412,7 @@ typedef struct LOCALLOCK
int64 nLocks; /* total number of times lock is held */
int numLockOwners; /* # of relevant ResourceOwners */
int maxLockOwners; /* allocated size of array */
bool holdsStrongLockCount; /* did we bump FastPathStrongLocks? */
LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
} LOCALLOCK;
......@@ -419,19 +420,25 @@ typedef struct LOCALLOCK
/*
* This struct holds information passed from lmgr internals to the lock
* listing user-level functions (in lockfuncs.c). For each PROCLOCK in
* the system, copies of the PROCLOCK object and associated PGPROC and
* LOCK objects are stored. Note there will often be multiple copies
* of the same PGPROC or LOCK --- to detect whether two are the same,
* compare the PROCLOCK tag fields.
* These structures hold information passed from lmgr internals to the lock
* listing user-level functions (in lockfuncs.c).
*/
typedef struct LockInstanceData
{
LOCKTAG locktag; /* locked object */
LOCKMASK holdMask; /* locks held by this PGPROC */
LOCKMODE waitLockMode; /* lock awaited by this PGPROC, if any */
BackendId backend; /* backend ID of this PGPROC */
LocalTransactionId lxid; /* local transaction ID of this PGPROC */
int pid; /* pid of this PGPROC */
bool fastpath; /* taken via fastpath? */
} LockInstanceData;
typedef struct LockData
{
int nelements; /* The length of each of the arrays */
PROCLOCK *proclocks;
PGPROC *procs;
LOCK *locks;
int nelements; /* The length of the array */
LockInstanceData *locks;
} LockData;
......
......@@ -50,6 +50,14 @@ struct XidCache
/* flags reset at EOXact */
#define PROC_VACUUM_STATE_MASK (0x0E)
/*
* We allow a small number of "weak" relation locks (AccesShareLock,
* RowShareLock, RowExclusiveLock) to be recorded in the PGPROC structure
* rather than the main lock table. This eases contention on the lock
* manager LWLocks. See storage/lmgr/README for additional details.
*/
#define FP_LOCK_SLOTS_PER_BACKEND 16
/*
* Each backend has a PGPROC struct in shared memory. There is also a list of
* currently-unused PGPROC structs that will be reallocated to new backends.
......@@ -137,6 +145,13 @@ struct PGPROC
SHM_QUEUE myProcLocks[NUM_LOCK_PARTITIONS];
struct XidCache subxids; /* cache for subtransaction XIDs */
/* Per-backend LWLock. Protects fields below. */
LWLockId backendLock; /* protects the fields below */
/* Lock manager data, recording fast-path locks taken by this backend. */
uint64 fpLockBits; /* lock modes held for each fast-path slot */
Oid fpRelId[FP_LOCK_SLOTS_PER_BACKEND]; /* slots for rel oids */
};
/* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
......@@ -150,6 +165,10 @@ extern PGDLLIMPORT PGPROC *MyProc;
*/
typedef struct PROC_HDR
{
/* Array of PGPROC structures (not including dummies for prepared txns) */
PGPROC *allProcs;
/* Length of allProcs array */
uint32 allProcCount;
/* Head of list of free PGPROC structures */
PGPROC *freeProcs;
/* Head of list of autovacuum's free PGPROC structures */
......@@ -163,6 +182,8 @@ typedef struct PROC_HDR
int startupBufferPinWaitBufId;
} PROC_HDR;
extern PROC_HDR *ProcGlobal;
/*
* We set aside some extra PGPROC structures for auxiliary processes,
* ie things that aren't full-fledged backends but need shmem access.
......
......@@ -1284,7 +1284,7 @@ SELECT viewname, definition FROM pg_views WHERE schemaname <> 'information_schem
pg_cursors | SELECT c.name, c.statement, c.is_holdable, c.is_binary, c.is_scrollable, c.creation_time FROM pg_cursor() c(name, statement, is_holdable, is_binary, is_scrollable, creation_time);
pg_group | SELECT pg_authid.rolname AS groname, pg_authid.oid AS grosysid, ARRAY(SELECT pg_auth_members.member FROM pg_auth_members WHERE (pg_auth_members.roleid = pg_authid.oid)) AS grolist FROM pg_authid WHERE (NOT pg_authid.rolcanlogin);
pg_indexes | SELECT n.nspname AS schemaname, c.relname AS tablename, i.relname AS indexname, t.spcname AS tablespace, pg_get_indexdef(i.oid) AS indexdef FROM ((((pg_index x JOIN pg_class c ON ((c.oid = x.indrelid))) JOIN pg_class i ON ((i.oid = x.indexrelid))) LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace))) LEFT JOIN pg_tablespace t ON ((t.oid = i.reltablespace))) WHERE ((c.relkind = 'r'::"char") AND (i.relkind = 'i'::"char"));
pg_locks | SELECT l.locktype, l.database, l.relation, l.page, l.tuple, l.virtualxid, l.transactionid, l.classid, l.objid, l.objsubid, l.virtualtransaction, l.pid, l.mode, l.granted FROM pg_lock_status() l(locktype, database, relation, page, tuple, virtualxid, transactionid, classid, objid, objsubid, virtualtransaction, pid, mode, granted);
pg_locks | SELECT l.locktype, l.database, l.relation, l.page, l.tuple, l.virtualxid, l.transactionid, l.classid, l.objid, l.objsubid, l.virtualtransaction, l.pid, l.mode, l.granted, l.fastpath FROM pg_lock_status() l(locktype, database, relation, page, tuple, virtualxid, transactionid, classid, objid, objsubid, virtualtransaction, pid, mode, granted, fastpath);
pg_prepared_statements | SELECT p.name, p.statement, p.prepare_time, p.parameter_types, p.from_sql FROM pg_prepared_statement() p(name, statement, prepare_time, parameter_types, from_sql);
pg_prepared_xacts | SELECT p.transaction, p.gid, p.prepared, u.rolname AS owner, d.datname AS database FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid) LEFT JOIN pg_authid u ON ((p.ownerid = u.oid))) LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
pg_roles | SELECT pg_authid.rolname, pg_authid.rolsuper, pg_authid.rolinherit, pg_authid.rolcreaterole, pg_authid.rolcreatedb, pg_authid.rolcatupdate, pg_authid.rolcanlogin, pg_authid.rolreplication, pg_authid.rolconnlimit, '********'::text AS rolpassword, pg_authid.rolvaliduntil, s.setconfig AS rolconfig, pg_authid.oid FROM (pg_authid LEFT JOIN pg_db_role_setting s ON (((pg_authid.oid = s.setrole) AND (s.setdatabase = (0)::oid))));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment