Commit 7b78474d authored by Tom Lane's avatar Tom Lane

Make CLUSTER MVCC-safe. Heikki Linnakangas

parent 2fca2c05
......@@ -4,7 +4,7 @@
# Makefile for access/heap
#
# IDENTIFICATION
# $PostgreSQL: pgsql/src/backend/access/heap/Makefile,v 1.14 2007/01/20 17:16:10 petere Exp $
# $PostgreSQL: pgsql/src/backend/access/heap/Makefile,v 1.15 2007/04/08 01:26:27 tgl Exp $
#
#-------------------------------------------------------------------------
......@@ -12,7 +12,7 @@ subdir = src/backend/access/heap
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = heapam.o hio.o tuptoaster.o
OBJS = heapam.o hio.o rewriteheap.o tuptoaster.o
all: SUBSYS.o
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.231 2007/04/03 04:14:26 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.232 2007/04/08 01:26:27 tgl Exp $
*
*
* INTERFACE ROUTINES
......@@ -3299,6 +3299,51 @@ log_heap_move(Relation reln, Buffer oldbuf, ItemPointerData from,
return log_heap_update(reln, oldbuf, from, newbuf, newtup, true);
}
/*
* Perform XLogInsert of a HEAP_NEWPAGE record to WAL. Caller is responsible
* for writing the page to disk after calling this routine.
*
* Note: all current callers build pages in private memory and write them
* directly to smgr, rather than using bufmgr. Therefore there is no need
* to pass a buffer ID to XLogInsert, nor to perform MarkBufferDirty within
* the critical section.
*
* Note: the NEWPAGE log record is used for both heaps and indexes, so do
* not do anything that assumes we are touching a heap.
*/
XLogRecPtr
log_newpage(RelFileNode *rnode, BlockNumber blkno, Page page)
{
xl_heap_newpage xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
/* NO ELOG(ERROR) from here till newpage op is logged */
START_CRIT_SECTION();
xlrec.node = *rnode;
xlrec.blkno = blkno;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfHeapNewpage;
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) page;
rdata[1].len = BLCKSZ;
rdata[1].buffer = InvalidBuffer;
rdata[1].next = NULL;
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
return recptr;
}
static void
heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
{
......
This diff is collapsed.
......@@ -57,13 +57,14 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/nbtree/nbtsort.c,v 1.110 2007/01/09 02:14:10 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/nbtree/nbtsort.c,v 1.111 2007/04/08 01:26:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "access/nbtree.h"
#include "miscadmin.h"
#include "storage/smgr.h"
......@@ -265,32 +266,7 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
if (wstate->btws_use_wal)
{
/* We use the heap NEWPAGE record type for this */
xl_heap_newpage xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
/* NO ELOG(ERROR) from here till newpage op is logged */
START_CRIT_SECTION();
xlrec.node = wstate->index->rd_node;
xlrec.blkno = blkno;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfHeapNewpage;
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) page;
rdata[1].len = BLCKSZ;
rdata[1].buffer = InvalidBuffer;
rdata[1].next = NULL;
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
log_newpage(&wstate->index->rd_node, blkno, page);
}
else
{
......
......@@ -11,7 +11,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.158 2007/03/29 00:15:37 tgl Exp $
* $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.159 2007/04/08 01:26:28 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -19,6 +19,7 @@
#include "access/genam.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
......@@ -29,13 +30,14 @@
#include "catalog/toasting.h"
#include "commands/cluster.h"
#include "miscadmin.h"
#include "storage/procarray.h"
#include "utils/acl.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
/*
......@@ -76,7 +78,7 @@ static List *get_tables_to_cluster(MemoryContext cluster_context);
*
* The single-relation case does not have any such overhead.
*
* We also allow a relation being specified without index. In that case,
* We also allow a relation to be specified without index. In that case,
* the indisclustered bit will be looked up, and an ERROR will be thrown
* if there is no index with the bit set.
*---------------------------------------------------------------------------
......@@ -650,11 +652,12 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
TupleDesc newTupDesc;
int natts;
Datum *values;
char *nulls;
bool *isnull;
IndexScanDesc scan;
HeapTuple tuple;
CommandId mycid = GetCurrentCommandId();
bool use_wal;
TransactionId OldestXmin;
RewriteState rwstate;
/*
* Open the relations we need.
......@@ -671,84 +674,137 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
newTupDesc = RelationGetDescr(NewHeap);
Assert(newTupDesc->natts == oldTupDesc->natts);
/* Preallocate values/nulls arrays */
/* Preallocate values/isnull arrays */
natts = newTupDesc->natts;
values = (Datum *) palloc0(natts * sizeof(Datum));
nulls = (char *) palloc(natts * sizeof(char));
memset(nulls, 'n', natts * sizeof(char));
values = (Datum *) palloc(natts * sizeof(Datum));
isnull = (bool *) palloc(natts * sizeof(bool));
/*
* We need to log the copied data in WAL iff WAL archiving is enabled AND
* it's not a temp rel. (Since we know the target relation is new and
* can't have any FSM data, we can always tell heap_insert to ignore FSM,
* even when using WAL.)
* it's not a temp rel.
*/
use_wal = XLogArchivingActive() && !NewHeap->rd_istemp;
/* use_wal off requires rd_targblock be initially invalid */
Assert(NewHeap->rd_targblock == InvalidBlockNumber);
/* Get the cutoff xmin we'll use to weed out dead tuples */
OldestXmin = GetOldestXmin(OldHeap->rd_rel->relisshared, true);
/* Initialize the rewrite operation */
rwstate = begin_heap_rewrite(NewHeap, OldestXmin, use_wal);
/*
* Scan through the OldHeap on the OldIndex and copy each tuple into the
* NewHeap.
* Scan through the OldHeap in OldIndex order and copy each tuple into the
* NewHeap. To ensure we see recently-dead tuples that still need to be
* copied, we scan with SnapshotAny and use HeapTupleSatisfiesVacuum
* for the visibility test.
*/
scan = index_beginscan(OldHeap, OldIndex,
SnapshotNow, 0, (ScanKey) NULL);
SnapshotAny, 0, (ScanKey) NULL);
while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
{
HeapTuple copiedTuple;
bool isdead;
int i;
CHECK_FOR_INTERRUPTS();
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
switch (HeapTupleSatisfiesVacuum(tuple->t_data, OldestXmin,
scan->xs_cbuf))
{
case HEAPTUPLE_DEAD:
/* Definitely dead */
isdead = true;
break;
case HEAPTUPLE_LIVE:
case HEAPTUPLE_RECENTLY_DEAD:
/* Live or recently dead, must copy it */
isdead = false;
break;
case HEAPTUPLE_INSERT_IN_PROGRESS:
/*
* We should not see this unless it's been inserted earlier
* in our own transaction.
*/
if (!TransactionIdIsCurrentTransactionId(
HeapTupleHeaderGetXmin(tuple->t_data)))
elog(ERROR, "concurrent insert in progress");
/* treat as live */
isdead = false;
break;
case HEAPTUPLE_DELETE_IN_PROGRESS:
/*
* We should not see this unless it's been deleted earlier
* in our own transaction.
*/
Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI));
if (!TransactionIdIsCurrentTransactionId(
HeapTupleHeaderGetXmax(tuple->t_data)))
elog(ERROR, "concurrent delete in progress");
/* treat as recently dead */
isdead = false;
break;
default:
elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
isdead = false; /* keep compiler quiet */
break;
}
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
if (isdead)
{
/* heap rewrite module still needs to see it... */
rewrite_heap_dead_tuple(rwstate, tuple);
continue;
}
/*
* We cannot simply pass the tuple to heap_insert(), for several
* reasons:
*
* 1. heap_insert() will overwrite the commit-status fields of the
* tuple it's handed. This would trash the source relation, which is
* bad news if we abort later on. (This was a bug in releases thru
* 7.0)
* We cannot simply copy the tuple as-is, for several reasons:
*
* 2. We'd like to squeeze out the values of any dropped columns, both
* 1. We'd like to squeeze out the values of any dropped columns, both
* to save space and to ensure we have no corner-case failures. (It's
* possible for example that the new table hasn't got a TOAST table
* and so is unable to store any large values of dropped cols.)
*
* 3. The tuple might not even be legal for the new table; this is
* 2. The tuple might not even be legal for the new table; this is
* currently only known to happen as an after-effect of ALTER TABLE
* SET WITHOUT OIDS.
*
* So, we must reconstruct the tuple from component Datums.
*/
HeapTuple copiedTuple;
int i;
heap_deformtuple(tuple, oldTupDesc, values, nulls);
heap_deform_tuple(tuple, oldTupDesc, values, isnull);
/* Be sure to null out any dropped columns */
for (i = 0; i < natts; i++)
{
if (newTupDesc->attrs[i]->attisdropped)
nulls[i] = 'n';
isnull[i] = true;
}
copiedTuple = heap_formtuple(newTupDesc, values, nulls);
copiedTuple = heap_form_tuple(newTupDesc, values, isnull);
/* Preserve OID, if any */
if (NewHeap->rd_rel->relhasoids)
HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));
heap_insert(NewHeap, copiedTuple, mycid, use_wal, false);
/* The heap rewrite module does the rest */
rewrite_heap_tuple(rwstate, tuple, copiedTuple);
heap_freetuple(copiedTuple);
CHECK_FOR_INTERRUPTS();
}
index_endscan(scan);
pfree(values);
pfree(nulls);
/* Write out any remaining tuples, and fsync if needed */
end_heap_rewrite(rwstate);
if (!use_wal)
heap_sync(NewHeap);
pfree(values);
pfree(isnull);
index_close(OldIndex, NoLock);
heap_close(OldHeap, NoLock);
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.218 2007/03/19 23:38:29 wieck Exp $
* $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.219 2007/04/08 01:26:32 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -5857,34 +5857,7 @@ copy_relation_data(Relation rel, SMgrRelation dst)
/* XLOG stuff */
if (use_wal)
{
xl_heap_newpage xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
/* NO ELOG(ERROR) from here till newpage op is logged */
START_CRIT_SECTION();
xlrec.node = dst->smgr_rnode;
xlrec.blkno = blkno;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfHeapNewpage;
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) page;
rdata[1].len = BLCKSZ;
rdata[1].buffer = InvalidBuffer;
rdata[1].next = NULL;
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
}
log_newpage(&dst->smgr_rnode, blkno, page);
/*
* Now write the page. We say isTemp = true even if it's not a temp
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/access/heapam.h,v 1.122 2007/04/06 04:21:43 tgl Exp $
* $PostgreSQL: pgsql/src/include/access/heapam.h,v 1.123 2007/04/08 01:26:33 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -194,6 +194,7 @@ extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
TransactionId cutoff_xid,
OffsetNumber *offsets, int offcnt);
extern XLogRecPtr log_newpage(RelFileNode *rnode, BlockNumber blk, Page page);
/* in common/heaptuple.c */
extern Size heap_compute_data_size(TupleDesc tupleDesc,
......
/*-------------------------------------------------------------------------
*
* rewriteheap.h
* Declarations for heap rewrite support functions
*
* Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
* Portions Copyright (c) 1994-5, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/access/rewriteheap.h,v 1.1 2007/04/08 01:26:33 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#ifndef REWRITE_HEAP_H
#define REWRITE_HEAP_H
#include "access/htup.h"
#include "utils/rel.h"
/* struct definition is private to rewriteheap.c */
typedef struct RewriteStateData *RewriteState;
extern RewriteState begin_heap_rewrite(Relation NewHeap,
TransactionId OldestXmin, bool use_wal);
extern void end_heap_rewrite(RewriteState state);
extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple,
HeapTuple newTuple);
extern void rewrite_heap_dead_tuple(RewriteState state, HeapTuple oldTuple);
#endif /* REWRITE_HEAP_H */
......@@ -382,8 +382,60 @@ SELECT * FROM clstr_1;
2
(2 rows)
-- Test MVCC-safety of cluster. There isn't much we can do to verify the
-- results with a single backend...
CREATE TABLE clustertest (key int PRIMARY KEY);
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "clustertest_pkey" for table "clustertest"
INSERT INTO clustertest VALUES (10);
INSERT INTO clustertest VALUES (20);
INSERT INTO clustertest VALUES (30);
INSERT INTO clustertest VALUES (40);
INSERT INTO clustertest VALUES (50);
-- Use a transaction so that updates are not committed when CLUSTER sees 'em
BEGIN;
-- Test update where the old row version is found first in the scan
UPDATE clustertest SET key = 100 WHERE key = 10;
-- Test update where the new row version is found first in the scan
UPDATE clustertest SET key = 35 WHERE key = 40;
-- Test longer update chain
UPDATE clustertest SET key = 60 WHERE key = 50;
UPDATE clustertest SET key = 70 WHERE key = 60;
UPDATE clustertest SET key = 80 WHERE key = 70;
SELECT * FROM clustertest;
key
-----
20
30
100
35
80
(5 rows)
CLUSTER clustertest_pkey ON clustertest;
SELECT * FROM clustertest;
key
-----
20
30
35
80
100
(5 rows)
COMMIT;
SELECT * FROM clustertest;
key
-----
20
30
35
80
100
(5 rows)
-- clean up
\c -
DROP TABLE clustertest;
DROP TABLE clstr_1;
DROP TABLE clstr_2;
DROP TABLE clstr_3;
......
......@@ -153,8 +153,42 @@ INSERT INTO clstr_1 VALUES (1);
CLUSTER clstr_1;
SELECT * FROM clstr_1;
-- Test MVCC-safety of cluster. There isn't much we can do to verify the
-- results with a single backend...
CREATE TABLE clustertest (key int PRIMARY KEY);
INSERT INTO clustertest VALUES (10);
INSERT INTO clustertest VALUES (20);
INSERT INTO clustertest VALUES (30);
INSERT INTO clustertest VALUES (40);
INSERT INTO clustertest VALUES (50);
-- Use a transaction so that updates are not committed when CLUSTER sees 'em
BEGIN;
-- Test update where the old row version is found first in the scan
UPDATE clustertest SET key = 100 WHERE key = 10;
-- Test update where the new row version is found first in the scan
UPDATE clustertest SET key = 35 WHERE key = 40;
-- Test longer update chain
UPDATE clustertest SET key = 60 WHERE key = 50;
UPDATE clustertest SET key = 70 WHERE key = 60;
UPDATE clustertest SET key = 80 WHERE key = 70;
SELECT * FROM clustertest;
CLUSTER clustertest_pkey ON clustertest;
SELECT * FROM clustertest;
COMMIT;
SELECT * FROM clustertest;
-- clean up
\c -
DROP TABLE clustertest;
DROP TABLE clstr_1;
DROP TABLE clstr_2;
DROP TABLE clstr_3;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment