Commit 558a9165 authored by Andres Freund's avatar Andres Freund

Compute XID horizon for page level index vacuum on primary.

Previously the xid horizon was only computed during WAL replay. That
had two major problems:
1) It relied on knowing what the table pointed to looks like. That was
   easy enough before the introducing of tableam (we knew it had to be
   heap, although some trickery around logging the heap relfilenodes
   was required). But to properly handle table AMs we need
   per-database catalog access to look up the AM handler, which
   recovery doesn't allow.
2) Not knowing the xid horizon also makes it hard to support logical
   decoding on standbys. When on a catalog table, we need to be able
   to conflict with slots that have an xid horizon that's too old. But
   computing the horizon by visiting the heap only works once
   consistency is reached, but we always need to be able to detect
   conflicts.

There's also a secondary problem, in that the current method performs
redundant work on every standby. But that's counterbalanced by
potentially computing the value when not necessary (either because
there's no standby, or because there's no connected backends).

Solve 1) and 2) by moving computation of the xid horizon to the
primary and by involving tableam in the computation of the horizon.

To address the potentially increased overhead, increase the efficiency
of the xid horizon computation for heap by sorting the tids, and
eliminating redundant buffer accesses. When prefetching is available,
additionally perform prefetching of buffers.  As this is more of a
maintenance task, rather than something routinely done in every read
only query, we add an arbitrary 10 to the effective concurrency -
thereby using IO concurrency, when not globally enabled.  That's
possibly not the perfect formula, but seems good enough for now.

Bumps WAL format, as latestRemovedXid is now part of the records, and
the heap's relfilenode isn't anymore.

Author: Andres Freund, Amit Khandekar, Robert Haas
Reviewed-By: Robert Haas
Discussion:
    https://postgr.es/m/20181212204154.nsxf3gzqv3gesl32@alap3.anarazel.de
    https://postgr.es/m/20181214014235.dal5ogljs3bmlq44@alap3.anarazel.de
    https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
parent 126d6312
......@@ -969,155 +969,6 @@ hash_xlog_update_meta_page(XLogReaderState *record)
UnlockReleaseBuffer(metabuf);
}
/*
* Get the latestRemovedXid from the heap pages pointed at by the index
* tuples being deleted. See also btree_xlog_delete_get_latestRemovedXid,
* on which this function is based.
*/
static TransactionId
hash_xlog_vacuum_get_latestRemovedXid(XLogReaderState *record)
{
xl_hash_vacuum_one_page *xlrec;
OffsetNumber *unused;
Buffer ibuffer,
hbuffer;
Page ipage,
hpage;
RelFileNode rnode;
BlockNumber blkno;
ItemId iitemid,
hitemid;
IndexTuple itup;
HeapTupleHeader htuphdr;
BlockNumber hblkno;
OffsetNumber hoffnum;
TransactionId latestRemovedXid = InvalidTransactionId;
int i;
xlrec = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
/*
* If there's nothing running on the standby we don't need to derive a
* full latestRemovedXid value, so use a fast path out of here. This
* returns InvalidTransactionId, and so will conflict with all HS
* transactions; but since we just worked out that that's zero people,
* it's OK.
*
* XXX There is a race condition here, which is that a new backend might
* start just after we look. If so, it cannot need to conflict, but this
* coding will result in throwing a conflict anyway.
*/
if (CountDBBackends(InvalidOid) == 0)
return latestRemovedXid;
/*
* Check if WAL replay has reached a consistent database state. If not, we
* must PANIC. See the definition of
* btree_xlog_delete_get_latestRemovedXid for more details.
*/
if (!reachedConsistency)
elog(PANIC, "hash_xlog_vacuum_get_latestRemovedXid: cannot operate with inconsistent data");
/*
* Get index page. If the DB is consistent, this should not fail, nor
* should any of the heap page fetches below. If one does, we return
* InvalidTransactionId to cancel all HS transactions. That's probably
* overkill, but it's safe, and certainly better than panicking here.
*/
XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
if (!BufferIsValid(ibuffer))
return InvalidTransactionId;
LockBuffer(ibuffer, HASH_READ);
ipage = (Page) BufferGetPage(ibuffer);
/*
* Loop through the deleted index items to obtain the TransactionId from
* the heap items they point to.
*/
unused = (OffsetNumber *) ((char *) xlrec + SizeOfHashVacuumOnePage);
for (i = 0; i < xlrec->ntuples; i++)
{
/*
* Identify the index tuple about to be deleted.
*/
iitemid = PageGetItemId(ipage, unused[i]);
itup = (IndexTuple) PageGetItem(ipage, iitemid);
/*
* Locate the heap page that the index tuple points at
*/
hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM,
hblkno, RBM_NORMAL);
if (!BufferIsValid(hbuffer))
{
UnlockReleaseBuffer(ibuffer);
return InvalidTransactionId;
}
LockBuffer(hbuffer, HASH_READ);
hpage = (Page) BufferGetPage(hbuffer);
/*
* Look up the heap tuple header that the index tuple points at by
* using the heap node supplied with the xlrec. We can't use
* heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
* Note that we are not looking at tuple data here, just headers.
*/
hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
hitemid = PageGetItemId(hpage, hoffnum);
/*
* Follow any redirections until we find something useful.
*/
while (ItemIdIsRedirected(hitemid))
{
hoffnum = ItemIdGetRedirect(hitemid);
hitemid = PageGetItemId(hpage, hoffnum);
CHECK_FOR_INTERRUPTS();
}
/*
* If the heap item has storage, then read the header and use that to
* set latestRemovedXid.
*
* Some LP_DEAD items may not be accessible, so we ignore them.
*/
if (ItemIdHasStorage(hitemid))
{
htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
}
else if (ItemIdIsDead(hitemid))
{
/*
* Conjecture: if hitemid is dead then it had xids before the xids
* marked on LP_NORMAL items. So we just ignore this item and move
* onto the next, for the purposes of calculating
* latestRemovedxids.
*/
}
else
Assert(!ItemIdIsUsed(hitemid));
UnlockReleaseBuffer(hbuffer);
}
UnlockReleaseBuffer(ibuffer);
/*
* If all heap tuples were LP_DEAD then we will be returning
* InvalidTransactionId here, which avoids conflicts. This matches
* existing logic which assumes that LP_DEAD tuples must already be older
* than the latestRemovedXid on the cleanup record that set them as
* LP_DEAD, hence must already have generated a conflict.
*/
return latestRemovedXid;
}
/*
* replay delete operation in hash index to remove
* tuples marked as DEAD during index tuple insertion.
......@@ -1149,12 +1000,10 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
*/
if (InHotStandby)
{
TransactionId latestRemovedXid =
hash_xlog_vacuum_get_latestRemovedXid(record);
RelFileNode rnode;
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
}
action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
......
......@@ -23,8 +23,8 @@
#include "storage/buf_internals.h"
#include "storage/predicate.h"
static void _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
RelFileNode hnode);
static void _hash_vacuum_one_page(Relation rel, Relation hrel,
Buffer metabuf, Buffer buf);
/*
* _hash_doinsert() -- Handle insertion of a single index tuple.
......@@ -137,7 +137,7 @@ restart_insert:
if (IsBufferCleanupOK(buf))
{
_hash_vacuum_one_page(rel, metabuf, buf, heapRel->rd_node);
_hash_vacuum_one_page(rel, heapRel, metabuf, buf);
if (PageGetFreeSpace(page) >= itemsz)
break; /* OK, now we have enough space */
......@@ -335,8 +335,7 @@ _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
*/
static void
_hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
RelFileNode hnode)
_hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
{
OffsetNumber deletable[MaxOffsetNumber];
int ndeletable = 0;
......@@ -360,6 +359,12 @@ _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
if (ndeletable > 0)
{
TransactionId latestRemovedXid;
latestRemovedXid =
index_compute_xid_horizon_for_tuples(rel, hrel, buf,
deletable, ndeletable);
/*
* Write-lock the meta page so that we can decrement tuple count.
*/
......@@ -393,7 +398,7 @@ _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
xl_hash_vacuum_one_page xlrec;
XLogRecPtr recptr;
xlrec.hnode = hnode;
xlrec.latestRemovedXid = latestRemovedXid;
xlrec.ntuples = ndeletable;
XLogBeginInsert();
......
......@@ -67,6 +67,7 @@
#include "utils/lsyscache.h"
#include "utils/relcache.h"
#include "utils/snapmgr.h"
#include "utils/spccache.h"
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
......@@ -162,6 +163,20 @@ static const struct
#define ConditionalLockTupleTuplock(rel, tup, mode) \
ConditionalLockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
#ifdef USE_PREFETCH
/*
* heap_compute_xid_horizon_for_tuples and xid_horizon_prefetch_buffer use
* this structure to coordinate prefetching activity.
*/
typedef struct
{
BlockNumber cur_hblkno;
int next_item;
int nitems;
ItemPointerData *tids;
} XidHorizonPrefetchState;
#endif
/*
* This table maps tuple lock strength values for each particular
* MultiXactStatus value.
......@@ -6861,6 +6876,212 @@ HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
/* *latestRemovedXid may still be invalid at end */
}
#ifdef USE_PREFETCH
/*
* Helper function for heap_compute_xid_horizon_for_tuples. Issue prefetch
* requests for the number of buffers indicated by prefetch_count. The
* prefetch_state keeps track of all the buffers that we can prefetch and
* which ones have already been prefetched; each call to this function picks
* up where the previous call left off.
*/
static void
xid_horizon_prefetch_buffer(Relation rel,
XidHorizonPrefetchState *prefetch_state,
int prefetch_count)
{
BlockNumber cur_hblkno = prefetch_state->cur_hblkno;
int count = 0;
int i;
int nitems = prefetch_state->nitems;
ItemPointerData *tids = prefetch_state->tids;
for (i = prefetch_state->next_item;
i < nitems && count < prefetch_count;
i++)
{
ItemPointer htid = &tids[i];
if (cur_hblkno == InvalidBlockNumber ||
ItemPointerGetBlockNumber(htid) != cur_hblkno)
{
cur_hblkno = ItemPointerGetBlockNumber(htid);
PrefetchBuffer(rel, MAIN_FORKNUM, cur_hblkno);
count++;
}
}
/*
* Save the prefetch position so that next time we can continue from that
* position.
*/
prefetch_state->next_item = i;
prefetch_state->cur_hblkno = cur_hblkno;
}
#endif
/*
* Get the latestRemovedXid from the heap pages pointed at by the index
* tuples being deleted.
*
* We used to do this during recovery rather than on the primary, but that
* approach now appears inferior. It meant that the master could generate
* a lot of work for the standby without any back-pressure to slow down the
* master, and it required the standby to have reached consistency, whereas
* we want to have correct information available even before that point.
*
* It's possible for this to generate a fair amount of I/O, since we may be
* deleting hundreds of tuples from a single index block. To amortize that
* cost to some degree, this uses prefetching and combines repeat accesses to
* the same block.
*/
TransactionId
heap_compute_xid_horizon_for_tuples(Relation rel,
ItemPointerData *tids,
int nitems)
{
TransactionId latestRemovedXid = InvalidTransactionId;
BlockNumber hblkno;
Buffer buf = InvalidBuffer;
Page hpage;
#ifdef USE_PREFETCH
XidHorizonPrefetchState prefetch_state;
int io_concurrency;
int prefetch_distance;
#endif
/*
* Sort to avoid repeated lookups for the same page, and to make it more
* likely to access items in an efficient order. In particular, this
* ensures that if there are multiple pointers to the same page, they all
* get processed looking up and locking the page just once.
*/
qsort((void *) tids, nitems, sizeof(ItemPointerData),
(int (*) (const void *, const void *)) ItemPointerCompare);
#ifdef USE_PREFETCH
/* Initialize prefetch state. */
prefetch_state.cur_hblkno = InvalidBlockNumber;
prefetch_state.next_item = 0;
prefetch_state.nitems = nitems;
prefetch_state.tids = tids;
/*
* Compute the prefetch distance that we will attempt to maintain.
*
* We don't use the regular formula to determine how much to prefetch
* here, but instead just add a constant to effective_io_concurrency.
* That's because it seems best to do some prefetching here even when
* effective_io_concurrency is set to 0, but if the DBA thinks it's OK to
* do more prefetching for other operations, then it's probably OK to do
* more prefetching in this case, too. It may be that this formula is too
* simplistic, but at the moment there is no evidence of that or any idea
* about what would work better.
*/
io_concurrency = get_tablespace_io_concurrency(rel->rd_rel->reltablespace);
prefetch_distance = Min((io_concurrency) + 10, MAX_IO_CONCURRENCY);
/* Start prefetching. */
xid_horizon_prefetch_buffer(rel, &prefetch_state, prefetch_distance);
#endif
/* Iterate over all tids, and check their horizon */
hblkno = InvalidBlockNumber;
for (int i = 0; i < nitems; i++)
{
ItemPointer htid = &tids[i];
ItemId hitemid;
OffsetNumber hoffnum;
/*
* Read heap buffer, but avoid refetching if it's the same block as
* required for the last tid.
*/
if (hblkno == InvalidBlockNumber ||
ItemPointerGetBlockNumber(htid) != hblkno)
{
/* release old buffer */
if (BufferIsValid(buf))
{
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buf);
}
hblkno = ItemPointerGetBlockNumber(htid);
buf = ReadBuffer(rel, hblkno);
#ifdef USE_PREFETCH
/*
* To maintain the prefetch distance, prefetch one more page for
* each page we read.
*/
xid_horizon_prefetch_buffer(rel, &prefetch_state, 1);
#endif
hpage = BufferGetPage(buf);
LockBuffer(buf, BUFFER_LOCK_SHARE);
}
hoffnum = ItemPointerGetOffsetNumber(htid);
hitemid = PageGetItemId(hpage, hoffnum);
/*
* Follow any redirections until we find something useful.
*/
while (ItemIdIsRedirected(hitemid))
{
hoffnum = ItemIdGetRedirect(hitemid);
hitemid = PageGetItemId(hpage, hoffnum);
CHECK_FOR_INTERRUPTS();
}
/*
* If the heap item has storage, then read the header and use that to
* set latestRemovedXid.
*
* Some LP_DEAD items may not be accessible, so we ignore them.
*/
if (ItemIdHasStorage(hitemid))
{
HeapTupleHeader htuphdr;
htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
}
else if (ItemIdIsDead(hitemid))
{
/*
* Conjecture: if hitemid is dead then it had xids before the xids
* marked on LP_NORMAL items. So we just ignore this item and move
* onto the next, for the purposes of calculating
* latestRemovedxids.
*/
}
else
Assert(!ItemIdIsUsed(hitemid));
}
if (BufferIsValid(buf))
{
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buf);
}
/*
* If all heap tuples were LP_DEAD then we will be returning
* InvalidTransactionId here, which avoids conflicts. This matches
* existing logic which assumes that LP_DEAD tuples must already be older
* than the latestRemovedXid on the cleanup record that set them as
* LP_DEAD, hence must already have generated a conflict.
*/
return latestRemovedXid;
}
/*
* Perform XLogInsert to register a heap cleanup info message. These
* messages are sent once per VACUUM and are required because
......
......@@ -544,6 +544,7 @@ static const TableAmRoutine heapam_methods = {
.tuple_fetch_row_version = heapam_fetch_row_version,
.tuple_get_latest_tid = heap_get_latest_tid,
.tuple_satisfies_snapshot = heapam_tuple_satisfies_snapshot,
.compute_xid_horizon_for_tuples = heap_compute_xid_horizon_for_tuples,
};
......
......@@ -273,6 +273,43 @@ BuildIndexValueDescription(Relation indexRelation,
return buf.data;
}
/*
* Get the latestRemovedXid from the table entries pointed at by the index
* tuples being deleted.
*/
TransactionId
index_compute_xid_horizon_for_tuples(Relation irel,
Relation hrel,
Buffer ibuf,
OffsetNumber *itemnos,
int nitems)
{
ItemPointerData *ttids =
(ItemPointerData *) palloc(sizeof(ItemPointerData) * nitems);
TransactionId latestRemovedXid = InvalidTransactionId;
Page ipage = BufferGetPage(ibuf);
IndexTuple itup;
/* identify what the index tuples about to be deleted point to */
for (int i = 0; i < nitems; i++)
{
ItemId iitemid;
iitemid = PageGetItemId(ipage, itemnos[i]);
itup = (IndexTuple) PageGetItem(ipage, iitemid);
ItemPointerCopy(&itup->t_tid, &ttids[i]);
}
/* determine the actual xid horizon */
latestRemovedXid =
table_compute_xid_horizon_for_tuples(hrel, ttids, nitems);
pfree(ttids);
return latestRemovedXid;
}
/* ----------------------------------------------------------------
* heap-or-index-scan access to system catalogs
......
......@@ -1101,10 +1101,16 @@ _bt_delitems_delete(Relation rel, Buffer buf,
{
Page page = BufferGetPage(buf);
BTPageOpaque opaque;
TransactionId latestRemovedXid = InvalidTransactionId;
/* Shouldn't be called unless there's something to do */
Assert(nitems > 0);
if (XLogStandbyInfoActive() && RelationNeedsWAL(rel))
latestRemovedXid =
index_compute_xid_horizon_for_tuples(rel, heapRel, buf,
itemnos, nitems);
/* No ereport(ERROR) until changes are logged */
START_CRIT_SECTION();
......@@ -1134,7 +1140,7 @@ _bt_delitems_delete(Relation rel, Buffer buf,
XLogRecPtr recptr;
xl_btree_delete xlrec_delete;
xlrec_delete.hnode = heapRel->rd_node;
xlrec_delete.latestRemovedXid = latestRemovedXid;
xlrec_delete.nitems = nitems;
XLogBeginInsert();
......
......@@ -501,159 +501,6 @@ btree_xlog_vacuum(XLogReaderState *record)
UnlockReleaseBuffer(buffer);
}
/*
* Get the latestRemovedXid from the heap pages pointed at by the index
* tuples being deleted. This puts the work for calculating latestRemovedXid
* into the recovery path rather than the primary path.
*
* It's possible that this generates a fair amount of I/O, since an index
* block may have hundreds of tuples being deleted. Repeat accesses to the
* same heap blocks are common, though are not yet optimised.
*
* XXX optimise later with something like XLogPrefetchBuffer()
*/
static TransactionId
btree_xlog_delete_get_latestRemovedXid(XLogReaderState *record)
{
xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
OffsetNumber *unused;
Buffer ibuffer,
hbuffer;
Page ipage,
hpage;
RelFileNode rnode;
BlockNumber blkno;
ItemId iitemid,
hitemid;
IndexTuple itup;
HeapTupleHeader htuphdr;
BlockNumber hblkno;
OffsetNumber hoffnum;
TransactionId latestRemovedXid = InvalidTransactionId;
int i;
/*
* If there's nothing running on the standby we don't need to derive a
* full latestRemovedXid value, so use a fast path out of here. This
* returns InvalidTransactionId, and so will conflict with all HS
* transactions; but since we just worked out that that's zero people,
* it's OK.
*
* XXX There is a race condition here, which is that a new backend might
* start just after we look. If so, it cannot need to conflict, but this
* coding will result in throwing a conflict anyway.
*/
if (CountDBBackends(InvalidOid) == 0)
return latestRemovedXid;
/*
* In what follows, we have to examine the previous state of the index
* page, as well as the heap page(s) it points to. This is only valid if
* WAL replay has reached a consistent database state; which means that
* the preceding check is not just an optimization, but is *necessary*. We
* won't have let in any user sessions before we reach consistency.
*/
if (!reachedConsistency)
elog(PANIC, "btree_xlog_delete_get_latestRemovedXid: cannot operate with inconsistent data");
/*
* Get index page. If the DB is consistent, this should not fail, nor
* should any of the heap page fetches below. If one does, we return
* InvalidTransactionId to cancel all HS transactions. That's probably
* overkill, but it's safe, and certainly better than panicking here.
*/
XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
if (!BufferIsValid(ibuffer))
return InvalidTransactionId;
LockBuffer(ibuffer, BT_READ);
ipage = (Page) BufferGetPage(ibuffer);
/*
* Loop through the deleted index items to obtain the TransactionId from
* the heap items they point to.
*/
unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
for (i = 0; i < xlrec->nitems; i++)
{
/*
* Identify the index tuple about to be deleted
*/
iitemid = PageGetItemId(ipage, unused[i]);
itup = (IndexTuple) PageGetItem(ipage, iitemid);
/*
* Locate the heap page that the index tuple points at
*/
hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM, hblkno, RBM_NORMAL);
if (!BufferIsValid(hbuffer))
{
UnlockReleaseBuffer(ibuffer);
return InvalidTransactionId;
}
LockBuffer(hbuffer, BT_READ);
hpage = (Page) BufferGetPage(hbuffer);
/*
* Look up the heap tuple header that the index tuple points at by
* using the heap node supplied with the xlrec. We can't use
* heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
* Note that we are not looking at tuple data here, just headers.
*/
hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
hitemid = PageGetItemId(hpage, hoffnum);
/*
* Follow any redirections until we find something useful.
*/
while (ItemIdIsRedirected(hitemid))
{
hoffnum = ItemIdGetRedirect(hitemid);
hitemid = PageGetItemId(hpage, hoffnum);
CHECK_FOR_INTERRUPTS();
}
/*
* If the heap item has storage, then read the header and use that to
* set latestRemovedXid.
*
* Some LP_DEAD items may not be accessible, so we ignore them.
*/
if (ItemIdHasStorage(hitemid))
{
htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
}
else if (ItemIdIsDead(hitemid))
{
/*
* Conjecture: if hitemid is dead then it had xids before the xids
* marked on LP_NORMAL items. So we just ignore this item and move
* onto the next, for the purposes of calculating
* latestRemovedxids.
*/
}
else
Assert(!ItemIdIsUsed(hitemid));
UnlockReleaseBuffer(hbuffer);
}
UnlockReleaseBuffer(ibuffer);
/*
* If all heap tuples were LP_DEAD then we will be returning
* InvalidTransactionId here, which avoids conflicts. This matches
* existing logic which assumes that LP_DEAD tuples must already be older
* than the latestRemovedXid on the cleanup record that set them as
* LP_DEAD, hence must already have generated a conflict.
*/
return latestRemovedXid;
}
static void
btree_xlog_delete(XLogReaderState *record)
{
......@@ -676,12 +523,11 @@ btree_xlog_delete(XLogReaderState *record)
*/
if (InHotStandby)
{
TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(record);
RelFileNode rnode;
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
}
/*
......
......@@ -113,8 +113,9 @@ hash_desc(StringInfo buf, XLogReaderState *record)
{
xl_hash_vacuum_one_page *xlrec = (xl_hash_vacuum_one_page *) rec;
appendStringInfo(buf, "ntuples %d",
xlrec->ntuples);
appendStringInfo(buf, "ntuples %d, latest removed xid %u",
xlrec->ntuples,
xlrec->latestRemovedXid);
break;
}
}
......
......@@ -54,7 +54,8 @@ btree_desc(StringInfo buf, XLogReaderState *record)
{
xl_btree_delete *xlrec = (xl_btree_delete *) rec;
appendStringInfo(buf, "%d items", xlrec->nitems);
appendStringInfo(buf, "%d items, latest removed xid %u",
xlrec->nitems, xlrec->latestRemovedXid);
break;
}
case XLOG_BTREE_MARK_PAGE_HALFDEAD:
......
......@@ -188,6 +188,11 @@ extern IndexScanDesc RelationGetIndexScan(Relation indexRelation,
extern void IndexScanEnd(IndexScanDesc scan);
extern char *BuildIndexValueDescription(Relation indexRelation,
Datum *values, bool *isnull);
extern TransactionId index_compute_xid_horizon_for_tuples(Relation irel,
Relation hrel,
Buffer ibuf,
OffsetNumber *itemnos,
int nitems);
/*
* heap-or-index access to system catalogs (in genam.c)
......
......@@ -263,7 +263,7 @@ typedef struct xl_hash_init_bitmap_page
*/
typedef struct xl_hash_vacuum_one_page
{
RelFileNode hnode;
TransactionId latestRemovedXid;
int ntuples;
/* TARGET OFFSET NUMBERS FOLLOW AT THE END */
......
......@@ -174,6 +174,10 @@ extern void simple_heap_update(Relation relation, ItemPointer otid,
extern void heap_sync(Relation relation);
extern TransactionId heap_compute_xid_horizon_for_tuples(Relation rel,
ItemPointerData *items,
int nitems);
/* in heap/pruneheap.c */
extern void heap_page_prune_opt(Relation relation, Buffer buffer);
extern int heap_page_prune(Relation relation, Buffer buffer,
......
......@@ -126,8 +126,7 @@ typedef struct xl_btree_split
*/
typedef struct xl_btree_delete
{
RelFileNode hnode; /* RelFileNode of the heap the index currently
* points at */
TransactionId latestRemovedXid;
int nitems;
/* TARGET OFFSET NUMBERS FOLLOW AT THE END */
......
......@@ -299,6 +299,12 @@ typedef struct TableAmRoutine
TupleTableSlot *slot,
Snapshot snapshot);
/* see table_compute_xid_horizon_for_tuples() */
TransactionId (*compute_xid_horizon_for_tuples) (Relation rel,
ItemPointerData *items,
int nitems);
/* ------------------------------------------------------------------------
* Manipulations of physical tuples.
* ------------------------------------------------------------------------
......@@ -689,6 +695,19 @@ table_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, Snapshot snap
return rel->rd_tableam->tuple_satisfies_snapshot(rel, slot, snapshot);
}
/*
* Compute the newest xid among the tuples pointed to by items. This is used
* to compute what snapshots to conflict with when replaying WAL records for
* page-level index vacuums.
*/
static inline TransactionId
table_compute_xid_horizon_for_tuples(Relation rel,
ItemPointerData *items,
int nitems)
{
return rel->rd_tableam->compute_xid_horizon_for_tuples(rel, items, nitems);
}
/* ----------------------------------------------------------------------------
* Functions for manipulations of physical tuples.
......
......@@ -31,7 +31,7 @@
/*
* Each page of XLOG file has a header like this:
*/
#define XLOG_PAGE_MAGIC 0xD099 /* can be used as WAL version indicator */
#define XLOG_PAGE_MAGIC 0xD100 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
......
......@@ -2624,6 +2624,7 @@ XactCallback
XactCallbackItem
XactEvent
XactLockTableWaitInfo
XidHorizonPrefetchState
XidStatus
XmlExpr
XmlExprOp
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment