Commit a5b9a000 authored by Noah Misch's avatar Noah Misch

Fix CREATE INDEX CONCURRENTLY for the newest prepared transactions.

The purpose of commit 8a54e12a was to
fix this, and it sufficed when the PREPARE TRANSACTION completed before
the CIC looked for lock conflicts.  Otherwise, things still broke.  As
before, in a cluster having used CIC while having enabled prepared
transactions, queries that use the resulting index can silently fail to
find rows.  It may be necessary to reindex to recover from past
occurrences; REINDEX CONCURRENTLY suffices.  Fix this for future index
builds by making CIC wait for arbitrarily-recent prepared transactions
and for ordinary transactions that may yet PREPARE TRANSACTION.  As part
of that, have PREPARE TRANSACTION transfer locks to its dummy PGPROC
before it calls ProcArrayClearTransaction().  Back-patch to 9.6 (all
supported versions).

Andrey Borodin, reviewed (in earlier versions) by Andres Freund.

Discussion: https://postgr.es/m/01824242-AA92-4FE9-9BA7-AEBAFFEA3D0C@yandex-team.ru
parent dde966ef
# Copyright (c) 2021, PostgreSQL Global Development Group
# Test CREATE INDEX CONCURRENTLY with concurrent prepared-xact modifications
use strict;
use warnings;
use Config;
use PostgresNode;
use TestLib;
use Test::More tests => 6;
my ($node, $result);
#
# Test set-up
#
$node = get_new_node('CIC_2PC_test');
$node->init;
$node->append_conf('postgresql.conf', 'max_prepared_transactions = 10');
$node->append_conf('postgresql.conf', 'lock_timeout = 180000');
$node->start;
$node->safe_psql('postgres', q(CREATE EXTENSION amcheck));
$node->safe_psql('postgres', q(CREATE TABLE tbl(i int)));
#
# Run 3 overlapping 2PC transactions with CIC
#
# We have two concurrent background psql processes: $main_h for INSERTs and
# $cic_h for CIC. Also, we use non-background psql for some COMMIT PREPARED
# statements.
#
my $main_in = '';
my $main_out = '';
my $main_timer = IPC::Run::timeout(180);
my $main_h =
$node->background_psql('postgres', \$main_in, \$main_out,
$main_timer, on_error_stop => 1);
$main_in .= q(
BEGIN;
INSERT INTO tbl VALUES(0);
\echo syncpoint1
);
pump $main_h until $main_out =~ /syncpoint1/ || $main_timer->is_expired;
my $cic_in = '';
my $cic_out = '';
my $cic_timer = IPC::Run::timeout(180);
my $cic_h =
$node->background_psql('postgres', \$cic_in, \$cic_out,
$cic_timer, on_error_stop => 1);
$cic_in .= q(
\echo start
CREATE INDEX CONCURRENTLY idx ON tbl(i);
);
pump $cic_h until $cic_out =~ /start/ || $cic_timer->is_expired;
$main_in .= q(
PREPARE TRANSACTION 'a';
);
$main_in .= q(
BEGIN;
INSERT INTO tbl VALUES(0);
\echo syncpoint2
);
pump $main_h until $main_out =~ /syncpoint2/ || $main_timer->is_expired;
$node->safe_psql('postgres', q(COMMIT PREPARED 'a';));
$main_in .= q(
PREPARE TRANSACTION 'b';
BEGIN;
INSERT INTO tbl VALUES(0);
\echo syncpoint3
);
pump $main_h until $main_out =~ /syncpoint3/ || $main_timer->is_expired;
$node->safe_psql('postgres', q(COMMIT PREPARED 'b';));
$main_in .= q(
PREPARE TRANSACTION 'c';
COMMIT PREPARED 'c';
);
$main_h->pump_nb;
$main_h->finish;
$cic_h->finish;
$result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
is($result, '0', 'bt_index_check after overlapping 2PC');
#
# Server restart shall not change whether prepared xact blocks CIC
#
$node->safe_psql(
'postgres', q(
BEGIN;
INSERT INTO tbl VALUES(0);
PREPARE TRANSACTION 'spans_restart';
BEGIN;
CREATE TABLE unused ();
PREPARE TRANSACTION 'persists_forever';
));
$node->restart;
my $reindex_in = '';
my $reindex_out = '';
my $reindex_timer = IPC::Run::timeout(180);
my $reindex_h =
$node->background_psql('postgres', \$reindex_in, \$reindex_out,
$reindex_timer, on_error_stop => 1);
$reindex_in .= q(
\echo start
DROP INDEX CONCURRENTLY idx;
CREATE INDEX CONCURRENTLY idx ON tbl(i);
);
pump $reindex_h until $reindex_out =~ /start/ || $reindex_timer->is_expired;
$node->safe_psql('postgres', "COMMIT PREPARED 'spans_restart'");
$reindex_h->finish;
$result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
is($result, '0', 'bt_index_check after 2PC and restart');
#
# Stress CIC+2PC with pgbench
#
# Fix broken index first
$node->safe_psql('postgres', q(REINDEX TABLE tbl;));
# Run background pgbench with CIC. We cannot mix-in this script into single
# pgbench: CIC will deadlock with itself occasionally.
my $pgbench_out = '';
my $pgbench_timer = IPC::Run::timeout(180);
my $pgbench_h = $node->background_pgbench(
'--no-vacuum --client=1 --transactions=100',
{
'002_pgbench_concurrent_cic' => q(
DROP INDEX CONCURRENTLY idx;
CREATE INDEX CONCURRENTLY idx ON tbl(i);
SELECT bt_index_check('idx',true);
)
},
\$pgbench_out,
$pgbench_timer);
# Run pgbench.
$node->pgbench(
'--no-vacuum --client=5 --transactions=100',
0,
[qr{actually processed}],
[qr{^$}],
'concurrent INSERTs w/ 2PC',
{
'002_pgbench_concurrent_2pc' => q(
BEGIN;
INSERT INTO tbl VALUES(0);
PREPARE TRANSACTION 'c:client_id';
COMMIT PREPARED 'c:client_id';
),
'002_pgbench_concurrent_2pc_savepoint' => q(
BEGIN;
SAVEPOINT s1;
INSERT INTO tbl VALUES(0);
PREPARE TRANSACTION 'c:client_id';
COMMIT PREPARED 'c:client_id';
)
});
$pgbench_h->pump_nb;
$pgbench_h->finish();
$result =
($Config{osname} eq "MSWin32")
? ($pgbench_h->full_results)[0]
: $pgbench_h->result(0);
is($result, 0, "pgbench with CIC works");
# done
$node->stop;
done_testing();
......@@ -459,14 +459,24 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
proc->pgprocno = gxact->pgprocno;
SHMQueueElemInit(&(proc->links));
proc->waitStatus = PROC_WAIT_STATUS_OK;
/* We set up the gxact's VXID as InvalidBackendId/XID */
proc->lxid = (LocalTransactionId) xid;
if (LocalTransactionIdIsValid(MyProc->lxid))
{
/* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
proc->lxid = MyProc->lxid;
proc->backendId = MyBackendId;
}
else
{
Assert(AmStartupProcess() || !IsPostmasterEnvironment);
/* GetLockConflicts() uses this to specify a wait on the XID */
proc->lxid = xid;
proc->backendId = InvalidBackendId;
}
proc->xid = xid;
Assert(proc->xmin == InvalidTransactionId);
proc->delayChkpt = false;
proc->statusFlags = 0;
proc->pid = 0;
proc->backendId = InvalidBackendId;
proc->databaseId = databaseid;
proc->roleId = owner;
proc->tempNamespaceId = InvalidOid;
......@@ -846,6 +856,53 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held)
return result;
}
/*
* TwoPhaseGetXidByVirtualXID
* Lookup VXID among xacts prepared since last startup.
*
* (This won't find recovered xacts.) If more than one matches, return any
* and set "have_more" to true. To witness multiple matches, a single
* BackendId must consume 2^32 LXIDs, with no intervening database restart.
*/
TransactionId
TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
bool *have_more)
{
int i;
TransactionId result = InvalidTransactionId;
Assert(VirtualTransactionIdIsValid(vxid));
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
PGPROC *proc;
VirtualTransactionId proc_vxid;
if (!gxact->valid)
continue;
proc = &ProcGlobal->allProcs[gxact->pgprocno];
GET_VXID_FROM_PGPROC(proc_vxid, *proc);
if (VirtualTransactionIdEquals(vxid, proc_vxid))
{
/* Startup process sets proc->backendId to InvalidBackendId. */
Assert(!gxact->inredo);
if (result != InvalidTransactionId)
{
*have_more = true;
break;
}
result = gxact->xid;
}
}
LWLockRelease(TwoPhaseStateLock);
return result;
}
/*
* TwoPhaseGetDummyBackendId
* Get the dummy backend ID for prepared transaction specified by XID
......
......@@ -2506,6 +2506,13 @@ PrepareTransaction(void)
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
/*
* Transfer our locks to a dummy PGPROC. This has to be done before
* ProcArrayClearTransaction(). Otherwise, a GetLockConflicts() would
* conclude "xact already committed or aborted" for our locks.
*/
PostPrepare_Locks(xid);
/*
* Let others know about no transaction in progress by me. This has to be
* done *after* the prepared transaction has been marked valid, else
......@@ -2545,7 +2552,6 @@ PrepareTransaction(void)
PostPrepare_MultiXact(xid);
PostPrepare_Locks(xid);
PostPrepare_PredicateLocks(xid);
ResourceOwnerRelease(TopTransactionResourceOwner,
......
......@@ -871,9 +871,10 @@ XactLockTableWaitErrorCb(void *arg)
* To do this, obtain the current list of lockers, and wait on their VXIDs
* until they are finished.
*
* Note we don't try to acquire the locks on the given locktags, only the VXIDs
* of its lock holders; if somebody grabs a conflicting lock on the objects
* after we obtained our initial list of lockers, we will not wait for them.
* Note we don't try to acquire the locks on the given locktags, only the
* VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock
* on the objects after we obtained our initial list of lockers, we will not
* wait for them.
*/
void
WaitForLockersMultiple(List *locktags, LOCKMODE lockmode, bool progress)
......
......@@ -2899,8 +2899,12 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock)
* The result array is palloc'd and is terminated with an invalid VXID.
* *countp, if not null, is updated to the number of items set.
*
* Of course, the result could be out of date by the time it's returned,
* so use of this function has to be thought about carefully.
* Of course, the result could be out of date by the time it's returned, so
* use of this function has to be thought about carefully. Similarly, a
* PGPROC with no "lxid" will be considered non-conflicting regardless of any
* lock it holds. Existing callers don't care about a locker after that
* locker's pg_xact updates complete. CommitTransaction() clears "lxid" after
* pg_xact updates and before releasing locks.
*
* Note we never include the current xact's vxid in the result array,
* since an xact never blocks itself.
......@@ -4528,37 +4532,80 @@ VirtualXactLockTableCleanup(void)
}
}
/*
* XactLockForVirtualXact
*
* If TransactionIdIsValid(xid), this is essentially XactLockTableWait(xid,
* NULL, NULL, XLTW_None) or ConditionalXactLockTableWait(xid). Unlike those
* functions, it assumes "xid" is never a subtransaction and that "xid" is
* prepared, committed, or aborted.
*
* If !TransactionIdIsValid(xid), this locks every prepared XID having been
* known as "vxid" before its PREPARE TRANSACTION.
*/
static bool
XactLockForVirtualXact(VirtualTransactionId vxid,
TransactionId xid, bool wait)
{
bool more = false;
/* There is no point to wait for 2PCs if you have no 2PCs. */
if (max_prepared_xacts == 0)
return true;
do
{
LockAcquireResult lar;
LOCKTAG tag;
/* Clear state from previous iterations. */
if (more)
{
xid = InvalidTransactionId;
more = false;
}
/* If we have no xid, try to find one. */
if (!TransactionIdIsValid(xid))
xid = TwoPhaseGetXidByVirtualXID(vxid, &more);
if (!TransactionIdIsValid(xid))
{
Assert(!more);
return true;
}
/* Check or wait for XID completion. */
SET_LOCKTAG_TRANSACTION(tag, xid);
lar = LockAcquire(&tag, ShareLock, false, !wait);
if (lar == LOCKACQUIRE_NOT_AVAIL)
return false;
LockRelease(&tag, ShareLock, false);
} while (more);
return true;
}
/*
* VirtualXactLock
*
* If wait = true, wait until the given VXID has been released, and then
* return true.
* If wait = true, wait as long as the given VXID or any XID acquired by the
* same transaction is still running. Then, return true.
*
* If wait = false, just check whether the VXID is still running, and return
* true or false.
* If wait = false, just check whether that VXID or one of those XIDs is still
* running, and return true or false.
*/
bool
VirtualXactLock(VirtualTransactionId vxid, bool wait)
{
LOCKTAG tag;
PGPROC *proc;
TransactionId xid = InvalidTransactionId;
Assert(VirtualTransactionIdIsValid(vxid));
if (VirtualTransactionIdIsPreparedXact(vxid))
{
LockAcquireResult lar;
/*
* Prepared transactions don't hold vxid locks. The
* LocalTransactionId is always a normal, locked XID.
*/
SET_LOCKTAG_TRANSACTION(tag, vxid.localTransactionId);
lar = LockAcquire(&tag, ShareLock, false, !wait);
if (lar != LOCKACQUIRE_NOT_AVAIL)
LockRelease(&tag, ShareLock, false);
return lar != LOCKACQUIRE_NOT_AVAIL;
}
if (VirtualTransactionIdIsRecoveredPreparedXact(vxid))
/* no vxid lock; localTransactionId is a normal, locked XID */
return XactLockForVirtualXact(vxid, vxid.localTransactionId, wait);
SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
......@@ -4572,7 +4619,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
*/
proc = BackendIdGetProc(vxid.backendId);
if (proc == NULL)
return true;
return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
/*
* We must acquire this lock before checking the backendId and lxid
......@@ -4581,12 +4628,12 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
*/
LWLockAcquire(&proc->fpInfoLock, LW_EXCLUSIVE);
/* If the transaction has ended, our work here is done. */
if (proc->backendId != vxid.backendId
|| proc->fpLocalTransactionId != vxid.localTransactionId)
{
/* VXID ended */
LWLockRelease(&proc->fpInfoLock);
return true;
return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
}
/*
......@@ -4633,6 +4680,16 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
proc->fpVXIDLock = false;
}
/*
* If the proc has an XID now, we'll avoid a TwoPhaseGetXidByVirtualXID()
* search. The proc might have assigned this XID but not yet locked it,
* in which case the proc will lock this XID before releasing the VXID.
* The fpInfoLock critical section excludes VirtualXactLockTableCleanup(),
* so we won't save an XID of a different VXID. It doesn't matter whether
* we save this before or after setting up the primary lock table entry.
*/
xid = proc->xid;
/* Done with proc->fpLockBits */
LWLockRelease(&proc->fpInfoLock);
......@@ -4640,7 +4697,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
(void) LockAcquire(&tag, ShareLock, false, false);
LockRelease(&tag, ShareLock, false);
return true;
return XactLockForVirtualXact(vxid, xid, wait);
}
/*
......
......@@ -65,6 +65,20 @@
* (XXX is it worth testing likewise for duplicate catcache flush entries?
* Probably not.)
*
* Many subsystems own higher-level caches that depend on relcache and/or
* catcache, and they register callbacks here to invalidate their caches.
* While building a higher-level cache entry, a backend may receive a
* callback for the being-built entry or one of its dependencies. This
* implies the new higher-level entry would be born stale, and it might
* remain stale for the life of the backend. Many caches do not prevent
* that. They rely on DDL for can't-miss catalog changes taking
* AccessExclusiveLock on suitable objects. (For a change made with less
* locking, backends might never read the change.) The relation cache,
* however, needs to reflect changes from CREATE INDEX CONCURRENTLY no later
* than the beginning of the next transaction. Hence, when a relevant
* invalidation callback arrives during a build, relcache.c reattempts that
* build. Caches with similar needs could do likewise.
*
* If a relcache flush is issued for a system relation that we preload
* from the relcache init file, we must also delete the init file so that
* it will be rebuilt during the next backend restart. The actual work of
......
......@@ -34,6 +34,8 @@ extern void TwoPhaseShmemInit(void);
extern void AtAbort_Twophase(void);
extern void PostPrepare_Twophase(void);
extern TransactionId TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
bool *have_more);
extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid, bool lock_held);
extern BackendId TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held);
......
......@@ -47,10 +47,11 @@ extern bool Debug_deadlocks;
/*
* Top-level transactions are identified by VirtualTransactionIDs comprising
* PGPROC fields backendId and lxid. For prepared transactions, the
* LocalTransactionId is an ordinary XID. These are guaranteed unique over
* the short term, but will be reused after a database restart or XID
* wraparound; hence they should never be stored on disk.
* PGPROC fields backendId and lxid. For recovered prepared transactions, the
* LocalTransactionId is an ordinary XID; LOCKTAG_VIRTUALTRANSACTION never
* refers to that kind. These are guaranteed unique over the short term, but
* will be reused after a database restart or XID wraparound; hence they
* should never be stored on disk.
*
* Note that struct VirtualTransactionId can not be assumed to be atomically
* assignable as a whole. However, type LocalTransactionId is assumed to
......@@ -70,7 +71,7 @@ typedef struct
#define LocalTransactionIdIsValid(lxid) ((lxid) != InvalidLocalTransactionId)
#define VirtualTransactionIdIsValid(vxid) \
(LocalTransactionIdIsValid((vxid).localTransactionId))
#define VirtualTransactionIdIsPreparedXact(vxid) \
#define VirtualTransactionIdIsRecoveredPreparedXact(vxid) \
((vxid).backendId == InvalidBackendId)
#define VirtualTransactionIdEquals(vxid1, vxid2) \
((vxid1).backendId == (vxid2).backendId && \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment