Commit 2065dd28 authored by Tom Lane's avatar Tom Lane

Prevent very-low-probability PANIC during PREPARE TRANSACTION.

The code in PostPrepare_Locks supposed that it could reassign locks to
the prepared transaction's dummy PGPROC by deleting the PROCLOCK table
entries and immediately creating new ones.  This was safe when that code
was written, but since we invented partitioning of the shared lock table,
it's not safe --- another process could steal away the PROCLOCK entry in
the short interval when it's on the freelist.  Then, if we were otherwise
out of shared memory, PostPrepare_Locks would have to PANIC, since it's
too late to back out of the PREPARE at that point.

Fix by inventing a dynahash.c function to atomically update a hashtable
entry's key.  (This might possibly have other uses in future.)

This is an ancient bug that in principle we ought to back-patch, but the
odds of someone hitting it in the field seem really tiny, because (a) the
risk window is small, and (b) nobody runs servers with maxed-out lock
tables for long, because they'll be getting non-PANIC out-of-memory errors
anyway.  So fixing it in HEAD seems sufficient, at least until the new
code has gotten some testing.
parent 9d2cd99a
......@@ -3028,7 +3028,6 @@ PostPrepare_Locks(TransactionId xid)
LOCK *lock;
PROCLOCK *proclock;
PROCLOCKTAG proclocktag;
bool found;
int partition;
/* This is a critical section: any error means big trouble */
......@@ -3114,10 +3113,8 @@ PostPrepare_Locks(TransactionId xid)
while (proclock)
{
PROCLOCK *nextplock;
LOCKMASK holdMask;
PROCLOCK *newproclock;
/* Get link first, since we may unlink/delete this proclock */
/* Get link first, since we may unlink/relink this proclock */
nextplock = (PROCLOCK *)
SHMQueueNext(procLocks, &proclock->procLink,
offsetof(PROCLOCK, procLink));
......@@ -3145,64 +3142,42 @@ PostPrepare_Locks(TransactionId xid)
if (proclock->releaseMask != proclock->holdMask)
elog(PANIC, "we seem to have dropped a bit somewhere");
holdMask = proclock->holdMask;
/*
* We cannot simply modify proclock->tag.myProc to reassign
* ownership of the lock, because that's part of the hash key and
* the proclock would then be in the wrong hash chain. So, unlink
* and delete the old proclock; create a new one with the right
* contents; and link it into place. We do it in this order to be
* certain we won't run out of shared memory (the way dynahash.c
* works, the deleted object is certain to be available for
* reallocation).
* the proclock would then be in the wrong hash chain. Instead
* use hash_update_hash_key. (We used to create a new hash entry,
* but that risks out-of-memory failure if other processes are
* busy making proclocks too.) We must unlink the proclock from
* our procLink chain and put it into the new proc's chain, too.
*
* Note: the updated proclock hash key will still belong to the
* same hash partition, cf proclock_hash(). So the partition
* lock we already hold is sufficient for this.
*/
SHMQueueDelete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink);
if (!hash_search(LockMethodProcLockHash,
(void *) &(proclock->tag),
HASH_REMOVE, NULL))
elog(PANIC, "proclock table corrupted");
/*
* Create the hash key for the new proclock table.
* Create the new hash key for the proclock.
*/
proclocktag.myLock = lock;
proclocktag.myProc = newproc;
newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
(void *) &proclocktag,
HASH_ENTER_NULL, &found);
if (!newproclock)
ereport(PANIC, /* should not happen */
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errdetail("Not enough memory for reassigning the prepared transaction's locks.")));
/*
* If new, initialize the new entry
* Update the proclock. We should not find any existing entry
* for the same hash key, since there can be only one entry for
* any given lock with my own proc.
*/
if (!found)
{
newproclock->holdMask = 0;
newproclock->releaseMask = 0;
/* Add new proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink);
SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
&newproclock->procLink);
PROCLOCK_PRINT("PostPrepare_Locks: new", newproclock);
}
else
{
PROCLOCK_PRINT("PostPrepare_Locks: found", newproclock);
Assert((newproclock->holdMask & ~lock->grantMask) == 0);
}
if (!hash_update_hash_key(LockMethodProcLockHash,
(void *) proclock,
(void *) &proclocktag))
elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks");
/*
* Pass over the identified lock ownership.
*/
Assert((newproclock->holdMask & holdMask) == 0);
newproclock->holdMask |= holdMask;
/* Re-link into the new proc's proclock list */
SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
&proclock->procLink);
PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock);
next_item:
proclock = nextplock;
......
......@@ -183,6 +183,12 @@ struct HTAB
*/
#define ELEMENTKEY(helem) (((char *)(helem)) + MAXALIGN(sizeof(HASHELEMENT)))
/*
* Obtain element pointer given pointer to key
*/
#define ELEMENT_FROM_KEY(key) \
((HASHELEMENT *) (((char *) (key)) - MAXALIGN(sizeof(HASHELEMENT))))
/*
* Fast MOD arithmetic, assuming that y is a power of 2 !
*/
......@@ -987,6 +993,144 @@ hash_search_with_hash_value(HTAB *hashp,
return NULL; /* keep compiler quiet */
}
/*
* hash_update_hash_key -- change the hash key of an existing table entry
*
* This is equivalent to removing the entry, making a new entry, and copying
* over its data, except that the entry never goes to the table's freelist.
* Therefore this cannot suffer an out-of-memory failure, even if there are
* other processes operating in other partitions of the hashtable.
*
* Returns TRUE if successful, FALSE if the requested new hash key is already
* present. Throws error if the specified entry pointer isn't actually a
* table member.
*
* NB: currently, there is no special case for old and new hash keys being
* identical, which means we'll report FALSE for that situation. This is
* preferable for existing uses.
*
* NB: for a partitioned hashtable, caller must hold lock on both relevant
* partitions, if the new hash key would belong to a different partition.
*/
bool
hash_update_hash_key(HTAB *hashp,
void *existingEntry,
const void *newKeyPtr)
{
HASHELEMENT *existingElement = ELEMENT_FROM_KEY(existingEntry);
HASHHDR *hctl = hashp->hctl;
uint32 newhashvalue;
Size keysize;
uint32 bucket;
long segment_num;
long segment_ndx;
HASHSEGMENT segp;
HASHBUCKET currBucket;
HASHBUCKET *prevBucketPtr;
HASHBUCKET *oldPrevPtr;
HashCompareFunc match;
#if HASH_STATISTICS
hash_accesses++;
hctl->accesses++;
#endif
/* disallow updates if frozen */
if (hashp->frozen)
elog(ERROR, "cannot update in frozen hashtable \"%s\"",
hashp->tabname);
/*
* Lookup the existing element using its saved hash value. We need to
* do this to be able to unlink it from its hash chain, but as a side
* benefit we can verify the validity of the passed existingEntry pointer.
*/
bucket = calc_bucket(hctl, existingElement->hashvalue);
segment_num = bucket >> hashp->sshift;
segment_ndx = MOD(bucket, hashp->ssize);
segp = hashp->dir[segment_num];
if (segp == NULL)
hash_corrupted(hashp);
prevBucketPtr = &segp[segment_ndx];
currBucket = *prevBucketPtr;
while (currBucket != NULL)
{
if (currBucket == existingElement)
break;
prevBucketPtr = &(currBucket->link);
currBucket = *prevBucketPtr;
}
if (currBucket == NULL)
elog(ERROR, "hash_update_hash_key argument is not in hashtable \"%s\"",
hashp->tabname);
oldPrevPtr = prevBucketPtr;
/*
* Now perform the equivalent of a HASH_ENTER operation to locate the
* hash chain we want to put the entry into.
*/
newhashvalue = hashp->hash(newKeyPtr, hashp->keysize);
bucket = calc_bucket(hctl, newhashvalue);
segment_num = bucket >> hashp->sshift;
segment_ndx = MOD(bucket, hashp->ssize);
segp = hashp->dir[segment_num];
if (segp == NULL)
hash_corrupted(hashp);
prevBucketPtr = &segp[segment_ndx];
currBucket = *prevBucketPtr;
/*
* Follow collision chain looking for matching key
*/
match = hashp->match; /* save one fetch in inner loop */
keysize = hashp->keysize; /* ditto */
while (currBucket != NULL)
{
if (currBucket->hashvalue == newhashvalue &&
match(ELEMENTKEY(currBucket), newKeyPtr, keysize) == 0)
break;
prevBucketPtr = &(currBucket->link);
currBucket = *prevBucketPtr;
#if HASH_STATISTICS
hash_collisions++;
hctl->collisions++;
#endif
}
if (currBucket != NULL)
return false; /* collision with an existing entry */
currBucket = existingElement;
/* OK to remove record from old hash bucket's chain. */
*oldPrevPtr = currBucket->link;
/* link into new hashbucket chain */
*prevBucketPtr = currBucket;
currBucket->link = NULL;
/* copy new key into record */
currBucket->hashvalue = newhashvalue;
hashp->keycopy(ELEMENTKEY(currBucket), newKeyPtr, keysize);
/* rest of record is untouched */
return true;
}
/*
* create a new entry if possible
*/
......
......@@ -128,6 +128,8 @@ extern uint32 get_hash_value(HTAB *hashp, const void *keyPtr);
extern void *hash_search_with_hash_value(HTAB *hashp, const void *keyPtr,
uint32 hashvalue, HASHACTION action,
bool *foundPtr);
extern bool hash_update_hash_key(HTAB *hashp, void *existingEntry,
const void *newKeyPtr);
extern long hash_get_num_entries(HTAB *hashp);
extern void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp);
extern void *hash_seq_search(HASH_SEQ_STATUS *status);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment