Commit 62401db4 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Support unlogged GiST index.

The reason this wasn't supported before was that GiST indexes need an
increasing sequence to detect concurrent page-splits. In a regular WAL-
logged GiST index, the LSN of the page-split record is used for that
purpose, and in a temporary index, we can get away with a backend-local
counter. Neither of those methods works for an unlogged relation.

To provide such an increasing sequence of numbers, create a "fake LSN"
counter that is saved and restored across shutdowns. On recovery, unlogged
relations are blown away, so the counter doesn't need to survive that
either.

Jeevan Chalke, based on discussions with Robert Haas, Tom Lane and me.
parent b669f416
...@@ -182,8 +182,7 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI ...@@ -182,8 +182,7 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI
automatically truncated after a crash or unclean shutdown. The contents automatically truncated after a crash or unclean shutdown. The contents
of an unlogged table are also not replicated to standby servers. of an unlogged table are also not replicated to standby servers.
Any indexes created on an unlogged table are automatically unlogged as Any indexes created on an unlogged table are automatically unlogged as
well; however, unlogged <link linkend="GiST">GiST indexes</link> are well.
currently not supported and cannot be created on an unlogged table.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "access/genam.h" #include "access/genam.h"
#include "access/gist_private.h" #include "access/gist_private.h"
#include "access/heapam_xlog.h"
#include "catalog/index.h" #include "catalog/index.h"
#include "catalog/pg_collation.h" #include "catalog/pg_collation.h"
#include "miscadmin.h" #include "miscadmin.h"
...@@ -71,9 +72,22 @@ createTempGistContext(void) ...@@ -71,9 +72,22 @@ createTempGistContext(void)
Datum Datum
gistbuildempty(PG_FUNCTION_ARGS) gistbuildempty(PG_FUNCTION_ARGS)
{ {
ereport(ERROR, Relation index = (Relation) PG_GETARG_POINTER(0);
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), Buffer buffer;
errmsg("unlogged GiST indexes are not supported")));
/* Initialize the root page */
buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
/* Initialize and xlog buffer */
START_CRIT_SECTION();
GISTInitBuffer(buffer, F_LEAF);
MarkBufferDirty(buffer);
log_newpage_buffer(buffer);
END_CRIT_SECTION();
/* Unlock and release the buffer */
UnlockReleaseBuffer(buffer);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
...@@ -391,7 +405,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, ...@@ -391,7 +405,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
dist, oldrlink, oldnsn, leftchildbuf, dist, oldrlink, oldnsn, leftchildbuf,
markfollowright); markfollowright);
else else
recptr = GetXLogRecPtrForTemp(); recptr = gistGetFakeLSN(rel);
for (ptr = dist; ptr; ptr = ptr->next) for (ptr = dist; ptr; ptr = ptr->next)
{ {
...@@ -448,7 +462,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, ...@@ -448,7 +462,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
} }
else else
{ {
recptr = GetXLogRecPtrForTemp(); recptr = gistGetFakeLSN(rel);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
} }
......
...@@ -158,16 +158,6 @@ gistbuild(PG_FUNCTION_ARGS) ...@@ -158,16 +158,6 @@ gistbuild(PG_FUNCTION_ARGS)
elog(ERROR, "index \"%s\" already contains data", elog(ERROR, "index \"%s\" already contains data",
RelationGetRelationName(index)); RelationGetRelationName(index));
/*
* We can't yet handle unlogged GiST indexes, because we depend on LSNs.
* This is duplicative of an error in gistbuildempty, but we want to check
* here so as to throw error before doing all the index-build work.
*/
if (heap->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unlogged GiST indexes are not supported")));
/* no locking is needed */ /* no locking is needed */
buildstate.giststate = initGISTstate(index); buildstate.giststate = initGISTstate(index);
...@@ -204,7 +194,7 @@ gistbuild(PG_FUNCTION_ARGS) ...@@ -204,7 +194,7 @@ gistbuild(PG_FUNCTION_ARGS)
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
} }
else else
PageSetLSN(page, GetXLogRecPtrForTemp()); PageSetLSN(page, gistGetFakeLSN(heap));
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
......
...@@ -798,16 +798,30 @@ gistoptions(PG_FUNCTION_ARGS) ...@@ -798,16 +798,30 @@ gistoptions(PG_FUNCTION_ARGS)
} }
/* /*
* Temporary GiST indexes are not WAL-logged, but we need LSNs to detect * Temporary and unlogged GiST indexes are not WAL-logged, but we need LSNs
* concurrent page splits anyway. GetXLogRecPtrForTemp() provides a fake * to detect concurrent page splits anyway. This function provides a fake
* sequence of LSNs for that purpose. Each call generates an LSN that is * sequence of LSNs for that purpose.
* greater than any previous value returned by this function in the same
* session.
*/ */
XLogRecPtr XLogRecPtr
GetXLogRecPtrForTemp(void) gistGetFakeLSN(Relation rel)
{ {
static XLogRecPtr counter = 1; static XLogRecPtr counter = 1;
counter++;
return counter; if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
{
/*
* Temporary relations are only accessible in our session, so a
* simple backend-local counter will do.
*/
return counter++;
}
else
{
/*
* Unlogged relations are accessible from other backends, and survive
* (clean) restarts. GetFakeLSNForUnloggedRel() handles that for us.
*/
Assert(rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED);
return GetFakeLSNForUnloggedRel();
}
} }
...@@ -238,7 +238,7 @@ gistbulkdelete(PG_FUNCTION_ARGS) ...@@ -238,7 +238,7 @@ gistbulkdelete(PG_FUNCTION_ARGS)
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
} }
else else
PageSetLSN(page, GetXLogRecPtrForTemp()); PageSetLSN(page, gistGetFakeLSN(rel));
END_CRIT_SECTION(); END_CRIT_SECTION();
} }
......
...@@ -391,6 +391,10 @@ typedef struct XLogCtlData ...@@ -391,6 +391,10 @@ typedef struct XLogCtlData
XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */ XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */
XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG segment */ XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG segment */
/* Fake LSN counter, for unlogged relations. Protected by ulsn_lck */
XLogRecPtr unloggedLSN;
slock_t ulsn_lck;
/* Protected by WALWriteLock: */ /* Protected by WALWriteLock: */
XLogCtlWrite Write; XLogCtlWrite Write;
...@@ -3696,6 +3700,31 @@ GetSystemIdentifier(void) ...@@ -3696,6 +3700,31 @@ GetSystemIdentifier(void)
return ControlFile->system_identifier; return ControlFile->system_identifier;
} }
/*
* Returns a fake LSN for unlogged relations.
*
* Each call generates an LSN that is greater than any previous value
* returned. The current counter value is saved and restored across clean
* shutdowns, but like unlogged relations, does not survive a crash. This can
* be used in lieu of real LSN values returned by XLogInsert, if you need an
* LSN-like increasing sequence of numbers without writing any WAL.
*/
XLogRecPtr
GetFakeLSNForUnloggedRel(void)
{
XLogRecPtr nextUnloggedLSN;
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
/* increment the unloggedLSN counter, need SpinLock */
SpinLockAcquire(&xlogctl->ulsn_lck);
nextUnloggedLSN = xlogctl->unloggedLSN++;
SpinLockRelease(&xlogctl->ulsn_lck);
return nextUnloggedLSN;
}
/* /*
* Auto-tune the number of XLOG buffers. * Auto-tune the number of XLOG buffers.
* *
...@@ -3844,6 +3873,7 @@ XLOGShmemInit(void) ...@@ -3844,6 +3873,7 @@ XLOGShmemInit(void)
XLogCtl->WalWriterSleeping = false; XLogCtl->WalWriterSleeping = false;
XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages); XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
SpinLockInit(&XLogCtl->info_lck); SpinLockInit(&XLogCtl->info_lck);
SpinLockInit(&XLogCtl->ulsn_lck);
InitSharedLatch(&XLogCtl->recoveryWakeupLatch); InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
/* /*
...@@ -3989,6 +4019,7 @@ BootStrapXLOG(void) ...@@ -3989,6 +4019,7 @@ BootStrapXLOG(void)
ControlFile->time = checkPoint.time; ControlFile->time = checkPoint.time;
ControlFile->checkPoint = checkPoint.redo; ControlFile->checkPoint = checkPoint.redo;
ControlFile->checkPointCopy = checkPoint; ControlFile->checkPointCopy = checkPoint;
ControlFile->unloggedLSN = 1;
/* Set important parameter values for use when replaying WAL */ /* Set important parameter values for use when replaying WAL */
ControlFile->MaxConnections = MaxConnections; ControlFile->MaxConnections = MaxConnections;
...@@ -5032,6 +5063,16 @@ StartupXLOG(void) ...@@ -5032,6 +5063,16 @@ StartupXLOG(void)
XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch; XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
XLogCtl->ckptXid = checkPoint.nextXid; XLogCtl->ckptXid = checkPoint.nextXid;
/*
* Initialize unlogged LSN. On a clean shutdown, it's restored from the
* control file. On recovery, all unlogged relations are blown away, so
* the unlogged LSN counter can be reset too.
*/
if (ControlFile->state == DB_SHUTDOWNED)
XLogCtl->unloggedLSN = ControlFile->unloggedLSN;
else
XLogCtl->unloggedLSN = 1;
/* /*
* We must replay WAL entries using the same TimeLineID they were created * We must replay WAL entries using the same TimeLineID they were created
* under, so temporarily adopt the TLI indicated by the checkpoint (see * under, so temporarily adopt the TLI indicated by the checkpoint (see
...@@ -6916,6 +6957,16 @@ CreateCheckPoint(int flags) ...@@ -6916,6 +6957,16 @@ CreateCheckPoint(int flags)
/* crash recovery should always recover to the end of WAL */ /* crash recovery should always recover to the end of WAL */
ControlFile->minRecoveryPoint = InvalidXLogRecPtr; ControlFile->minRecoveryPoint = InvalidXLogRecPtr;
ControlFile->minRecoveryPointTLI = 0; ControlFile->minRecoveryPointTLI = 0;
/*
* Persist unloggedLSN value. It's reset on crash recovery, so this goes
* unused on non-shutdown checkpoints, but seems useful to store it always
* for debugging purposes.
*/
SpinLockAcquire(&XLogCtl->ulsn_lck);
ControlFile->unloggedLSN = XLogCtl->unloggedLSN;
SpinLockRelease(&XLogCtl->ulsn_lck);
UpdateControlFile(); UpdateControlFile();
LWLockRelease(ControlFileLock); LWLockRelease(ControlFileLock);
......
...@@ -1922,9 +1922,24 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln) ...@@ -1922,9 +1922,24 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
* Force XLOG flush up to buffer's LSN. This implements the basic WAL * Force XLOG flush up to buffer's LSN. This implements the basic WAL
* rule that log updates must hit disk before any of the data-file changes * rule that log updates must hit disk before any of the data-file changes
* they describe do. * they describe do.
*
* However, this rule does not apply to unlogged relations, which will be
* lost after a crash anyway. Most unlogged relation pages do not bear
* LSNs since we never emit WAL records for them, and therefore flushing
* up through the buffer LSN would be useless, but harmless. However, GiST
* indexes use LSNs internally to track page-splits, and therefore unlogged
* GiST pages bear "fake" LSNs generated by GetFakeLSNForUnloggedRel. It
* is unlikely but possible that the fake LSN counter could advance past
* the WAL insertion point; and if it did happen, attempting to flush WAL
* through that location would fail, with disastrous system-wide
* consequences. To make sure that can't happen, skip the flush if the
* buffer isn't permanent.
*/ */
recptr = BufferGetLSN(buf); if (buf->flags & BM_PERMANENT)
XLogFlush(recptr); {
recptr = BufferGetLSN(buf);
XLogFlush(recptr);
}
/* /*
* Now it's safe to write buffer to disk. Note that no one else should * Now it's safe to write buffer to disk. Note that no one else should
......
...@@ -240,6 +240,9 @@ main(int argc, char *argv[]) ...@@ -240,6 +240,9 @@ main(int argc, char *argv[])
ControlFile.checkPointCopy.oldestMultiDB); ControlFile.checkPointCopy.oldestMultiDB);
printf(_("Time of latest checkpoint: %s\n"), printf(_("Time of latest checkpoint: %s\n"),
ckpttime_str); ckpttime_str);
printf(_("Fake LSN counter for unlogged rels: %X/%X\n"),
(uint32) (ControlFile.unloggedLSN >> 32),
(uint32) ControlFile.unloggedLSN);
printf(_("Min recovery ending location: %X/%X\n"), printf(_("Min recovery ending location: %X/%X\n"),
(uint32) (ControlFile.minRecoveryPoint >> 32), (uint32) (ControlFile.minRecoveryPoint >> 32),
(uint32) ControlFile.minRecoveryPoint); (uint32) ControlFile.minRecoveryPoint);
......
...@@ -510,6 +510,7 @@ GuessControlValues(void) ...@@ -510,6 +510,7 @@ GuessControlValues(void)
ControlFile.state = DB_SHUTDOWNED; ControlFile.state = DB_SHUTDOWNED;
ControlFile.time = (pg_time_t) time(NULL); ControlFile.time = (pg_time_t) time(NULL);
ControlFile.checkPoint = ControlFile.checkPointCopy.redo; ControlFile.checkPoint = ControlFile.checkPointCopy.redo;
ControlFile.unloggedLSN = 1;
/* minRecoveryPoint, backupStartPoint and backupEndPoint can be left zero */ /* minRecoveryPoint, backupStartPoint and backupEndPoint can be left zero */
......
...@@ -512,7 +512,7 @@ extern void gistMakeUnionKey(GISTSTATE *giststate, int attno, ...@@ -512,7 +512,7 @@ extern void gistMakeUnionKey(GISTSTATE *giststate, int attno,
GISTENTRY *entry2, bool isnull2, GISTENTRY *entry2, bool isnull2,
Datum *dst, bool *dstisnull); Datum *dst, bool *dstisnull);
extern XLogRecPtr GetXLogRecPtrForTemp(void); extern XLogRecPtr gistGetFakeLSN(Relation rel);
/* gistvacuum.c */ /* gistvacuum.c */
extern Datum gistbulkdelete(PG_FUNCTION_ARGS); extern Datum gistbulkdelete(PG_FUNCTION_ARGS);
......
...@@ -294,6 +294,7 @@ extern char *XLogFileNameP(TimeLineID tli, XLogSegNo segno); ...@@ -294,6 +294,7 @@ extern char *XLogFileNameP(TimeLineID tli, XLogSegNo segno);
extern void UpdateControlFile(void); extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void); extern uint64 GetSystemIdentifier(void);
extern XLogRecPtr GetFakeLSNForUnloggedRel(void);
extern Size XLOGShmemSize(void); extern Size XLOGShmemSize(void);
extern void XLOGShmemInit(void); extern void XLOGShmemInit(void);
extern void BootStrapXLOG(void); extern void BootStrapXLOG(void);
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
/* Version identifier for this pg_control format */ /* Version identifier for this pg_control format */
#define PG_CONTROL_VERSION 934 #define PG_CONTROL_VERSION 935
/* /*
* Body of CheckPoint XLOG records. This is declared here because we keep * Body of CheckPoint XLOG records. This is declared here because we keep
...@@ -126,6 +126,8 @@ typedef struct ControlFileData ...@@ -126,6 +126,8 @@ typedef struct ControlFileData
CheckPoint checkPointCopy; /* copy of last check point record */ CheckPoint checkPointCopy; /* copy of last check point record */
XLogRecPtr unloggedLSN; /* current fake LSN value, for unlogged rels */
/* /*
* These two values determine the minimum point we must recover up to * These two values determine the minimum point we must recover up to
* before starting up: * before starting up:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment