Commit 5e926cbb authored by Bruce Momjian's avatar Bruce Momjian

vacuum.c refactoring

   . rename variables
     . cur_buffer -> dst_buffer
     . ToPage -> dst_page
     . cur_page -> dst_vacpage
   . move variable declarations into block where variable is used
   . various Asserts instead of elog(ERROR, ...)
   . extract functionality from repair_frag() into new routines
     . move_chain_tuple()
     . move_plain_tuple()
     . update_hint_bits()
   . create type ExecContext
   . add comments

Manfred Koizar
parent cd8b0fc5
......@@ -13,7 +13,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.280 2004/06/05 19:48:07 tgl Exp $
* $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.281 2004/06/08 13:59:36 momjian Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -99,6 +99,64 @@ typedef struct VRelStats
VTupleLink vtlinks;
} VRelStats;
/*----------------------------------------------------------------------
* ExecContext:
*
* As these variables always appear together, we put them into one struct
* and pull initialization and cleanup into separate routines.
* ExecContext is used by repair_frag() and move_xxx_tuple(). More
* accurately: It is *used* only in move_xxx_tuple(), but because this
* routine is called many times, we initialize the struct just once in
* repair_frag() and pass it on to move_xxx_tuple().
*/
typedef struct ExecContextData
{
ResultRelInfo *resultRelInfo;
EState *estate;
TupleTable tupleTable;
TupleTableSlot *slot;
} ExecContextData;
typedef ExecContextData *ExecContext;
static void
ExecContext_Init(ExecContext ec, Relation rel)
{
TupleDesc tupdesc = RelationGetDescr(rel);
/*
* We need a ResultRelInfo and an EState so we can use the regular
* executor's index-entry-making machinery.
*/
ec->estate = CreateExecutorState();
ec->resultRelInfo = makeNode(ResultRelInfo);
ec->resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
ec->resultRelInfo->ri_RelationDesc = rel;
ec->resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */
ExecOpenIndices(ec->resultRelInfo);
ec->estate->es_result_relations = ec->resultRelInfo;
ec->estate->es_num_result_relations = 1;
ec->estate->es_result_relation_info = ec->resultRelInfo;
/* Set up a dummy tuple table too */
ec->tupleTable = ExecCreateTupleTable(1);
ec->slot = ExecAllocTableSlot(ec->tupleTable);
ExecSetSlotDescriptor(ec->slot, tupdesc, false);
}
static void
ExecContext_Finish(ExecContext ec)
{
ExecDropTupleTable(ec->tupleTable, true);
ExecCloseIndices(ec->resultRelInfo);
FreeExecutorState(ec->estate);
}
/*
* End of ExecContext Implementation
*----------------------------------------------------------------------
*/
static MemoryContext vac_context = NULL;
......@@ -122,6 +180,17 @@ static void scan_heap(VRelStats *vacrelstats, Relation onerel,
static void repair_frag(VRelStats *vacrelstats, Relation onerel,
VacPageList vacuum_pages, VacPageList fraged_pages,
int nindexes, Relation *Irel);
static void move_chain_tuple(Relation rel,
Buffer old_buf, Page old_page, HeapTuple old_tup,
Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
ExecContext ec, ItemPointer ctid, bool cleanVpd);
static void move_plain_tuple(Relation rel,
Buffer old_buf, Page old_page, HeapTuple old_tup,
Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
ExecContext ec);
static void update_hint_bits(Relation rel, VacPageList fraged_pages,
int num_fraged_pages, BlockNumber last_move_dest_block,
int num_moved);
static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
VacPageList vacpagelist);
static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
......@@ -675,7 +744,7 @@ vac_update_dbstats(Oid dbid,
static void
vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
{
TransactionId myXID;
TransactionId myXID = GetCurrentTransactionId();
Relation relation;
HeapScanDesc scan;
HeapTuple tuple;
......@@ -683,7 +752,6 @@ vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
bool vacuumAlreadyWrapped = false;
bool frozenAlreadyWrapped = false;
myXID = GetCurrentTransactionId();
relation = heap_openr(DatabaseRelationName, AccessShareLock);
......@@ -1059,17 +1127,9 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
{
BlockNumber nblocks,
blkno;
ItemId itemid;
Buffer buf;
HeapTupleData tuple;
OffsetNumber offnum,
maxoff;
bool pgchanged,
tupgone,
notup;
char *relname;
VacPage vacpage,
vacpagecopy;
VacPage vacpage;
BlockNumber empty_pages,
empty_end_pages;
double num_tuples,
......@@ -1080,7 +1140,6 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
usable_free_space;
Size min_tlen = MaxTupleSize;
Size max_tlen = 0;
int i;
bool do_shrinking = true;
VTupleLink vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
int num_vtlinks = 0;
......@@ -1113,6 +1172,11 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
tempPage = NULL;
bool do_reap,
do_frag;
Buffer buf;
OffsetNumber offnum,
maxoff;
bool pgchanged,
notup;
vacuum_delay_point();
......@@ -1125,6 +1189,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
if (PageIsNew(page))
{
VacPage vacpagecopy;
ereport(WARNING,
(errmsg("relation \"%s\" page %u is uninitialized --- fixing",
relname, blkno)));
......@@ -1142,6 +1208,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
if (PageIsEmpty(page))
{
VacPage vacpagecopy;
vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
free_space += vacpage->free;
empty_pages++;
......@@ -1161,8 +1229,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
offnum = OffsetNumberNext(offnum))
{
uint16 sv_infomask;
itemid = PageGetItemId(page, offnum);
ItemId itemid = PageGetItemId(page, offnum);
bool tupgone = false;
/*
* Collect un-used items too - it's possible to have indexes
......@@ -1180,7 +1248,6 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
tuple.t_len = ItemIdGetLength(itemid);
ItemPointerSet(&(tuple.t_self), blkno, offnum);
tupgone = false;
sv_infomask = tuple.t_data->t_infomask;
switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
......@@ -1269,7 +1336,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
do_shrinking = false;
break;
default:
elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
/* unexpected HeapTupleSatisfiesVacuum result */
Assert(false);
break;
}
......@@ -1344,7 +1412,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
if (do_reap || do_frag)
{
vacpagecopy = copy_vac_page(vacpage);
VacPage vacpagecopy = copy_vac_page(vacpage);
if (do_reap)
vpage_insert(vacuum_pages, vacpagecopy);
if (do_frag)
......@@ -1390,6 +1458,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
*/
if (do_shrinking)
{
int i;
Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
fraged_pages->num_pages -= empty_end_pages;
usable_free_space = 0;
......@@ -1453,76 +1523,29 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
VacPageList vacuum_pages, VacPageList fraged_pages,
int nindexes, Relation *Irel)
{
TransactionId myXID;
CommandId myCID;
Buffer buf,
cur_buffer;
TransactionId myXID = GetCurrentTransactionId();
Buffer dst_buffer = InvalidBuffer;
BlockNumber nblocks,
blkno;
BlockNumber last_move_dest_block = 0,
last_vacuum_block;
Page page,
ToPage = NULL;
OffsetNumber offnum,
maxoff,
newoff,
max_offset;
ItemId itemid,
newitemid;
HeapTupleData tuple,
newtup;
TupleDesc tupdesc;
ResultRelInfo *resultRelInfo;
EState *estate;
TupleTable tupleTable;
TupleTableSlot *slot;
Page dst_page = NULL;
ExecContextData ec;
VacPageListData Nvacpagelist;
VacPage cur_page = NULL,
VacPage dst_vacpage = NULL,
last_vacuum_page,
vacpage,
*curpage;
int cur_item = 0;
int i;
Size tuple_len;
int num_moved,
int num_moved = 0,
num_fraged_pages,
vacuumed_pages;
int checked_moved,
num_tuples,
keep_tuples = 0;
bool isempty,
dowrite,
chain_tuple_moved;
int keep_tuples = 0;
VacRUsage ru0;
vac_init_rusage(&ru0);
myXID = GetCurrentTransactionId();
myCID = GetCurrentCommandId();
tupdesc = RelationGetDescr(onerel);
/*
* We need a ResultRelInfo and an EState so we can use the regular
* executor's index-entry-making machinery.
*/
estate = CreateExecutorState();
resultRelInfo = makeNode(ResultRelInfo);
resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
resultRelInfo->ri_RelationDesc = onerel;
resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */
ExecOpenIndices(resultRelInfo);
estate->es_result_relations = resultRelInfo;
estate->es_num_result_relations = 1;
estate->es_result_relation_info = resultRelInfo;
/* Set up a dummy tuple table too */
tupleTable = ExecCreateTupleTable(1);
slot = ExecAllocTableSlot(tupleTable);
ExecSetSlotDescriptor(slot, tupdesc, false);
ExecContext_Init(&ec, onerel);
Nvacpagelist.num_pages = 0;
num_fraged_pages = fraged_pages->num_pages;
......@@ -1539,8 +1562,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
last_vacuum_page = NULL;
last_vacuum_block = InvalidBlockNumber;
}
cur_buffer = InvalidBuffer;
num_moved = 0;
vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
vacpage->offsets_used = vacpage->offsets_free = 0;
......@@ -1560,6 +1581,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
blkno > last_move_dest_block;
blkno--)
{
Buffer buf;
Page page;
OffsetNumber offnum,
maxoff;
bool isempty,
dowrite,
chain_tuple_moved;
vacuum_delay_point();
/*
......@@ -1635,7 +1664,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
itemid = PageGetItemId(page, offnum);
Size tuple_len;
HeapTupleData tuple;
ItemId itemid = PageGetItemId(page, offnum);
if (!ItemIdIsUsed(itemid))
continue;
......@@ -1645,45 +1676,71 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
tuple_len = tuple.t_len = ItemIdGetLength(itemid);
ItemPointerSet(&(tuple.t_self), blkno, offnum);
/*
* VACUUM FULL has an exclusive lock on the relation. So
* normally no other transaction can have pending INSERTs or
* DELETEs in this relation. A tuple is either
* (a) a tuple in a system catalog, inserted or deleted by
* a not yet committed transaction or
* (b) dead (XMIN_INVALID or XMAX_COMMITTED) or
* (c) inserted by a committed xact (XMIN_COMMITTED) or
* (d) moved by the currently running VACUUM.
* In case (a) we wouldn't be in repair_frag() at all.
* In case (b) we cannot be here, because scan_heap() has
* already marked the item as unused, see continue above.
* Case (c) is what normally is to be expected.
* Case (d) is only possible, if a whole tuple chain has been
* moved while processing this or a higher numbered block.
*/
if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
{
if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
elog(ERROR, "HEAP_MOVED_IN was not expected");
/*
* There cannot be another concurrently running VACUUM. If
* the tuple had been moved in by a previous VACUUM, the
* visibility check would have set XMIN_COMMITTED. If the
* tuple had been moved in by the currently running VACUUM,
* the loop would have been terminated. We had
* elog(ERROR, ...) here, but as we are testing for a
* can't-happen condition, Assert() seems more appropriate.
*/
Assert(!(tuple.t_data->t_infomask & HEAP_MOVED_IN));
/*
* If this (chain) tuple is moved by me already then I
* have to check is it in vacpage or not - i.e. is it
* moved while cleaning this page or some previous one.
*/
if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
{
if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
elog(ERROR, "invalid XVAC in tuple header");
if (keep_tuples == 0)
continue;
if (chain_tuple_moved) /* some chains was moved
* while */
{ /* cleaning this page */
Assert(vacpage->offsets_free > 0);
for (i = 0; i < vacpage->offsets_free; i++)
{
if (vacpage->offsets[i] == offnum)
break;
}
if (i >= vacpage->offsets_free) /* not found */
{
vacpage->offsets[vacpage->offsets_free++] = offnum;
keep_tuples--;
}
Assert(tuple.t_data->t_infomask & HEAP_MOVED_OFF);
/*
* MOVED_OFF by another VACUUM would have caused the
* visibility check to set XMIN_COMMITTED or XMIN_INVALID.
*/
Assert(HeapTupleHeaderGetXvac(tuple.t_data) == myXID);
/* Can't we Assert(keep_tuples > 0) here? */
if (keep_tuples == 0)
continue;
if (chain_tuple_moved) /* some chains was moved
* while */
{ /* cleaning this page */
Assert(vacpage->offsets_free > 0);
for (i = 0; i < vacpage->offsets_free; i++)
{
if (vacpage->offsets[i] == offnum)
break;
}
else
if (i >= vacpage->offsets_free) /* not found */
{
vacpage->offsets[vacpage->offsets_free++] = offnum;
keep_tuples--;
}
continue;
}
elog(ERROR, "HEAP_MOVED_OFF was expected");
else
{
vacpage->offsets[vacpage->offsets_free++] = offnum;
keep_tuples--;
}
continue;
}
/*
......@@ -1716,8 +1773,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
Buffer Cbuf = buf;
bool freeCbuf = false;
bool chain_move_failed = false;
Page Cpage;
ItemId Citemid;
ItemPointerData Ctid;
HeapTupleData tp = tuple;
Size tlen = tuple_len;
......@@ -1728,10 +1783,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
int to_item = 0;
int ti;
if (cur_buffer != InvalidBuffer)
if (dst_buffer != InvalidBuffer)
{
WriteBuffer(cur_buffer);
cur_buffer = InvalidBuffer;
WriteBuffer(dst_buffer);
dst_buffer = InvalidBuffer;
}
/* Quick exit if we have no vtlinks to search in */
......@@ -1754,6 +1809,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
!(ItemPointerEquals(&(tp.t_self),
&(tp.t_data->t_ctid))))
{
Page Cpage;
ItemId Citemid;
ItemPointerData Ctid;
Ctid = tp.t_data->t_ctid;
if (freeCbuf)
ReleaseBuffer(Cbuf);
......@@ -1929,12 +1988,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
}
/*
* Okay, move the whle tuple chain
* Okay, move the whole tuple chain
*/
ItemPointerSetInvalid(&Ctid);
for (ti = 0; ti < num_vtmove; ti++)
{
VacPage destvacpage = vtmove[ti].vacpage;
Page Cpage;
ItemId Citemid;
/* Get page to move from */
tuple.t_self = vtmove[ti].tid;
......@@ -1942,13 +2003,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
ItemPointerGetBlockNumber(&(tuple.t_self)));
/* Get page to move to */
cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
dst_buffer = ReadBuffer(onerel, destvacpage->blkno);
LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
if (cur_buffer != Cbuf)
LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
if (dst_buffer != Cbuf)
LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
ToPage = BufferGetPage(cur_buffer);
dst_page = BufferGetPage(dst_buffer);
Cpage = BufferGetPage(Cbuf);
Citemid = PageGetItemId(Cpage,
......@@ -1961,120 +2022,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
* make a copy of the source tuple, and then mark the
* source tuple MOVED_OFF.
*/
heap_copytuple_with_tuple(&tuple, &newtup);
/*
* register invalidation of source tuple in catcaches.
*/
CacheInvalidateHeapTuple(onerel, &tuple);
/* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
START_CRIT_SECTION();
tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
HEAP_XMIN_INVALID |
HEAP_MOVED_IN);
tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
HeapTupleHeaderSetXvac(tuple.t_data, myXID);
/*
* If this page was not used before - clean it.
*
* NOTE: a nasty bug used to lurk here. It is possible
* for the source and destination pages to be the same
* (since this tuple-chain member can be on a page
* lower than the one we're currently processing in
* the outer loop). If that's true, then after
* vacuum_page() the source tuple will have been
* moved, and tuple.t_data will be pointing at
* garbage. Therefore we must do everything that uses
* tuple.t_data BEFORE this step!!
*
* This path is different from the other callers of
* vacuum_page, because we have already incremented
* the vacpage's offsets_used field to account for the
* tuple(s) we expect to move onto the page. Therefore
* vacuum_page's check for offsets_used == 0 is wrong.
* But since that's a good debugging check for all
* other callers, we work around it here rather than
* remove it.
*/
if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
{
int sv_offsets_used = destvacpage->offsets_used;
destvacpage->offsets_used = 0;
vacuum_page(onerel, cur_buffer, destvacpage);
destvacpage->offsets_used = sv_offsets_used;
}
/*
* Update the state of the copied tuple, and store it
* on the destination page.
*/
newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
HEAP_XMIN_INVALID |
HEAP_MOVED_OFF);
newtup.t_data->t_infomask |= HEAP_MOVED_IN;
HeapTupleHeaderSetXvac(newtup.t_data, myXID);
newoff = PageAddItem(ToPage,
(Item) newtup.t_data,
tuple_len,
InvalidOffsetNumber,
LP_USED);
if (newoff == InvalidOffsetNumber)
{
elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
(unsigned long) tuple_len, destvacpage->blkno);
}
newitemid = PageGetItemId(ToPage, newoff);
pfree(newtup.t_data);
newtup.t_datamcxt = NULL;
newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
/* XLOG stuff */
if (!onerel->rd_istemp)
{
XLogRecPtr recptr =
log_heap_move(onerel, Cbuf, tuple.t_self,
cur_buffer, &newtup);
if (Cbuf != cur_buffer)
{
PageSetLSN(Cpage, recptr);
PageSetSUI(Cpage, ThisStartUpID);
}
PageSetLSN(ToPage, recptr);
PageSetSUI(ToPage, ThisStartUpID);
}
else
{
/*
* No XLOG record, but still need to flag that XID
* exists on disk
*/
MyXactMadeTempRelUpdate = true;
}
END_CRIT_SECTION();
move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
dst_buffer, dst_page, destvacpage,
&ec, &Ctid, vtmove[ti].cleanVpd);
num_moved++;
if (destvacpage->blkno > last_move_dest_block)
last_move_dest_block = destvacpage->blkno;
/*
* Set new tuple's t_ctid pointing to itself for last
* tuple in chain, and to next tuple in chain
* otherwise.
*/
if (!ItemPointerIsValid(&Ctid))
newtup.t_data->t_ctid = newtup.t_self;
else
newtup.t_data->t_ctid = Ctid;
Ctid = newtup.t_self;
num_moved++;
/*
* Remember that we moved tuple from the current page
* (corresponding index tuple will be cleaned).
......@@ -2085,23 +2040,11 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
else
keep_tuples++;
LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
if (cur_buffer != Cbuf)
LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
/* Create index entries for the moved tuple */
if (resultRelInfo->ri_NumIndices > 0)
{
ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
ExecInsertIndexTuples(slot, &(newtup.t_self),
estate, true);
}
WriteBuffer(cur_buffer);
WriteBuffer(dst_buffer);
WriteBuffer(Cbuf);
} /* end of move-the-tuple-chain loop */
cur_buffer = InvalidBuffer;
dst_buffer = InvalidBuffer;
pfree(vtmove);
chain_tuple_moved = true;
......@@ -2110,13 +2053,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
} /* end of is-tuple-in-chain test */
/* try to find new page for this tuple */
if (cur_buffer == InvalidBuffer ||
!enough_space(cur_page, tuple_len))
if (dst_buffer == InvalidBuffer ||
!enough_space(dst_vacpage, tuple_len))
{
if (cur_buffer != InvalidBuffer)
if (dst_buffer != InvalidBuffer)
{
WriteBuffer(cur_buffer);
cur_buffer = InvalidBuffer;
WriteBuffer(dst_buffer);
dst_buffer = InvalidBuffer;
}
for (i = 0; i < num_fraged_pages; i++)
{
......@@ -2125,110 +2068,32 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
}
if (i == num_fraged_pages)
break; /* can't move item anywhere */
cur_item = i;
cur_page = fraged_pages->pagedesc[cur_item];
cur_buffer = ReadBuffer(onerel, cur_page->blkno);
LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
ToPage = BufferGetPage(cur_buffer);
dst_vacpage = fraged_pages->pagedesc[i];
dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno);
LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
dst_page = BufferGetPage(dst_buffer);
/* if this page was not used before - clean it */
if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
vacuum_page(onerel, cur_buffer, cur_page);
if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
vacuum_page(onerel, dst_buffer, dst_vacpage);
}
else
LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
/* copy tuple */
heap_copytuple_with_tuple(&tuple, &newtup);
move_plain_tuple(onerel, buf, page, &tuple,
dst_buffer, dst_page, dst_vacpage, &ec);
/*
* register invalidation of source tuple in catcaches.
*
* (Note: we do not need to register the copied tuple, because we
* are not changing the tuple contents and so there cannot be
* any need to flush negative catcache entries.)
*/
CacheInvalidateHeapTuple(onerel, &tuple);
/* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
START_CRIT_SECTION();
/*
* Mark new tuple as MOVED_IN by me.
*/
newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
HEAP_XMIN_INVALID |
HEAP_MOVED_OFF);
newtup.t_data->t_infomask |= HEAP_MOVED_IN;
HeapTupleHeaderSetXvac(newtup.t_data, myXID);
/* add tuple to the page */
newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
InvalidOffsetNumber, LP_USED);
if (newoff == InvalidOffsetNumber)
{
elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
(unsigned long) tuple_len,
cur_page->blkno, (unsigned long) cur_page->free,
cur_page->offsets_used, cur_page->offsets_free);
}
newitemid = PageGetItemId(ToPage, newoff);
pfree(newtup.t_data);
newtup.t_datamcxt = NULL;
newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
newtup.t_self = newtup.t_data->t_ctid;
num_moved++;
if (dst_vacpage->blkno > last_move_dest_block)
last_move_dest_block = dst_vacpage->blkno;
/*
* Mark old tuple as MOVED_OFF by me.
* Remember that we moved tuple from the current page
* (corresponding index tuple will be cleaned).
*/
tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
HEAP_XMIN_INVALID |
HEAP_MOVED_IN);
tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
HeapTupleHeaderSetXvac(tuple.t_data, myXID);
/* XLOG stuff */
if (!onerel->rd_istemp)
{
XLogRecPtr recptr =
log_heap_move(onerel, buf, tuple.t_self,
cur_buffer, &newtup);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
PageSetLSN(ToPage, recptr);
PageSetSUI(ToPage, ThisStartUpID);
}
else
{
/*
* No XLOG record, but still need to flag that XID exists
* on disk
*/
MyXactMadeTempRelUpdate = true;
}
END_CRIT_SECTION();
cur_page->offsets_used++;
num_moved++;
cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
if (cur_page->blkno > last_move_dest_block)
last_move_dest_block = cur_page->blkno;
vacpage->offsets[vacpage->offsets_free++] = offnum;
LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
/* insert index' tuples if needed */
if (resultRelInfo->ri_NumIndices > 0)
{
ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
}
} /* walk along page */
/*
......@@ -2249,36 +2114,31 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
off <= maxoff;
off = OffsetNumberNext(off))
{
itemid = PageGetItemId(page, off);
ItemId itemid = PageGetItemId(page, off);
HeapTupleHeader htup;
if (!ItemIdIsUsed(itemid))
continue;
tuple.t_datamcxt = NULL;
tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
htup = (HeapTupleHeader) PageGetItem(page, itemid);
if (htup->t_infomask & HEAP_XMIN_COMMITTED)
continue;
if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
elog(ERROR, "HEAP_MOVED_IN was not expected");
if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
/*
** See comments in the walk-along-page loop above, why we
** have Asserts here instead of if (...) elog(ERROR).
*/
Assert(!(htup->t_infomask & HEAP_MOVED_IN));
Assert(htup->t_infomask & HEAP_MOVED_OFF);
Assert(HeapTupleHeaderGetXvac(htup) == myXID);
if (chain_tuple_moved)
{
if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
elog(ERROR, "invalid XVAC in tuple header");
/* some chains was moved while */
if (chain_tuple_moved)
{ /* cleaning this page */
Assert(vacpage->offsets_free > 0);
for (i = 0; i < vacpage->offsets_free; i++)
{
if (vacpage->offsets[i] == off)
break;
}
if (i >= vacpage->offsets_free) /* not found */
{
vacpage->offsets[vacpage->offsets_free++] = off;
Assert(keep_tuples > 0);
keep_tuples--;
}
/* some chains was moved while cleaning this page */
Assert(vacpage->offsets_free > 0);
for (i = 0; i < vacpage->offsets_free; i++)
{
if (vacpage->offsets[i] == off)
break;
}
else
if (i >= vacpage->offsets_free) /* not found */
{
vacpage->offsets[vacpage->offsets_free++] = off;
Assert(keep_tuples > 0);
......@@ -2286,7 +2146,11 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
}
}
else
elog(ERROR, "HEAP_MOVED_OFF was expected");
{
vacpage->offsets[vacpage->offsets_free++] = off;
Assert(keep_tuples > 0);
keep_tuples--;
}
}
}
......@@ -2312,10 +2176,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
blkno++; /* new number of blocks */
if (cur_buffer != InvalidBuffer)
if (dst_buffer != InvalidBuffer)
{
Assert(num_moved > 0);
WriteBuffer(cur_buffer);
WriteBuffer(dst_buffer);
}
if (num_moved > 0)
......@@ -2348,6 +2212,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
Assert((*curpage)->blkno < blkno);
if ((*curpage)->offsets_used == 0)
{
Buffer buf;
Page page;
/* this page was not used as a move target, so must clean it */
buf = ReadBuffer(onerel, (*curpage)->blkno);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
......@@ -2363,62 +2230,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
* Now scan all the pages that we moved tuples onto and update tuple
* status bits. This is not really necessary, but will save time for
* future transactions examining these tuples.
*
* XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
* pages that were move source pages but not move dest pages. One
* also wonders whether it wouldn't be better to skip this step and
* let the tuple status updates happen someplace that's not holding an
* exclusive lock on the relation.
*/
checked_moved = 0;
for (i = 0, curpage = fraged_pages->pagedesc;
i < num_fraged_pages;
i++, curpage++)
{
vacuum_delay_point();
Assert((*curpage)->blkno < blkno);
if ((*curpage)->blkno > last_move_dest_block)
break; /* no need to scan any further */
if ((*curpage)->offsets_used == 0)
continue; /* this page was never used as a move dest */
buf = ReadBuffer(onerel, (*curpage)->blkno);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
num_tuples = 0;
max_offset = PageGetMaxOffsetNumber(page);
for (newoff = FirstOffsetNumber;
newoff <= max_offset;
newoff = OffsetNumberNext(newoff))
{
itemid = PageGetItemId(page, newoff);
if (!ItemIdIsUsed(itemid))
continue;
tuple.t_datamcxt = NULL;
tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
{
if (!(tuple.t_data->t_infomask & HEAP_MOVED))
elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
elog(ERROR, "invalid XVAC in tuple header");
if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
{
tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
tuple.t_data->t_infomask &= ~HEAP_MOVED;
num_tuples++;
}
else
tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
}
}
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
WriteBuffer(buf);
Assert((*curpage)->offsets_used == num_tuples);
checked_moved += num_tuples;
}
Assert(num_moved == checked_moved);
update_hint_bits(onerel, fraged_pages, num_fraged_pages,
last_move_dest_block, num_moved);
/*
* It'd be cleaner to make this report at the bottom of this routine,
* but then the rusage would double-count the second pass of index
......@@ -2455,6 +2270,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
*vpleft = *vpright;
*vpright = vpsave;
}
/*
* keep_tuples is the number of tuples that have been moved
* off a page during chain moves but not been scanned over
* subsequently. The tuple ids of these tuples are not
* recorded as free offsets for any VacPage, so they will not
* be cleared from the indexes.
*/
Assert(keep_tuples >= 0);
for (i = 0; i < nindexes; i++)
vacuum_index(&Nvacpagelist, Irel[i],
......@@ -2465,36 +2287,41 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
if (vacpage->blkno == (blkno - 1) &&
vacpage->offsets_free > 0)
{
OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
int uncnt;
Buffer buf;
Page page;
OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
OffsetNumber offnum,
maxoff;
int uncnt;
int num_tuples = 0;
buf = ReadBuffer(onerel, vacpage->blkno);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
num_tuples = 0;
maxoff = PageGetMaxOffsetNumber(page);
for (offnum = FirstOffsetNumber;
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
itemid = PageGetItemId(page, offnum);
ItemId itemid = PageGetItemId(page, offnum);
HeapTupleHeader htup;
if (!ItemIdIsUsed(itemid))
continue;
tuple.t_datamcxt = NULL;
tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
htup = (HeapTupleHeader) PageGetItem(page, itemid);
if (htup->t_infomask & HEAP_XMIN_COMMITTED)
continue;
if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
{
if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
{
if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
elog(ERROR, "invalid XVAC in tuple header");
itemid->lp_flags &= ~LP_USED;
num_tuples++;
}
else
elog(ERROR, "HEAP_MOVED_OFF was expected");
}
/*
** See comments in the walk-along-page loop above, why we
** have Asserts here instead of if (...) elog(ERROR).
*/
Assert(!(htup->t_infomask & HEAP_MOVED_IN));
Assert(htup->t_infomask & HEAP_MOVED_OFF);
Assert(HeapTupleHeaderGetXvac(htup) == myXID);
itemid->lp_flags &= ~LP_USED;
num_tuples++;
}
Assert(vacpage->offsets_free == num_tuples);
......@@ -2554,11 +2381,337 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
if (vacrelstats->vtlinks != NULL)
pfree(vacrelstats->vtlinks);
ExecDropTupleTable(tupleTable, true);
ExecContext_Finish(&ec);
}
/*
* move_chain_tuple() -- move one tuple that is part of a tuple chain
*
* This routine moves old_tup from old_page to dst_page.
* old_page and dst_page might be the same page.
* On entry old_buf and dst_buf are locked exclusively, both locks (or
* the single lock, if this is a intra-page-move) are released before
* exit.
*
* Yes, a routine with ten parameters is ugly, but it's still better
* than having these 120 lines of code in repair_frag() which is
* already too long and almost unreadable.
*/
static void
move_chain_tuple(Relation rel,
Buffer old_buf, Page old_page, HeapTuple old_tup,
Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
ExecContext ec, ItemPointer ctid, bool cleanVpd)
{
TransactionId myXID = GetCurrentTransactionId();
HeapTupleData newtup;
OffsetNumber newoff;
ItemId newitemid;
Size tuple_len = old_tup->t_len;
heap_copytuple_with_tuple(old_tup, &newtup);
/*
* register invalidation of source tuple in catcaches.
*/
CacheInvalidateHeapTuple(rel, old_tup);
/* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
START_CRIT_SECTION();
old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
HEAP_XMIN_INVALID |
HEAP_MOVED_IN);
old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
/*
* If this page was not used before - clean it.
*
* NOTE: a nasty bug used to lurk here. It is possible
* for the source and destination pages to be the same
* (since this tuple-chain member can be on a page
* lower than the one we're currently processing in
* the outer loop). If that's true, then after
* vacuum_page() the source tuple will have been
* moved, and tuple.t_data will be pointing at
* garbage. Therefore we must do everything that uses
* old_tup->t_data BEFORE this step!!
*
* This path is different from the other callers of
* vacuum_page, because we have already incremented
* the vacpage's offsets_used field to account for the
* tuple(s) we expect to move onto the page. Therefore
* vacuum_page's check for offsets_used == 0 is wrong.
* But since that's a good debugging check for all
* other callers, we work around it here rather than
* remove it.
*/
if (!PageIsEmpty(dst_page) && cleanVpd)
{
int sv_offsets_used = dst_vacpage->offsets_used;
dst_vacpage->offsets_used = 0;
vacuum_page(rel, dst_buf, dst_vacpage);
dst_vacpage->offsets_used = sv_offsets_used;
}
/*
* Update the state of the copied tuple, and store it
* on the destination page.
*/
newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
HEAP_XMIN_INVALID |
HEAP_MOVED_OFF);
newtup.t_data->t_infomask |= HEAP_MOVED_IN;
HeapTupleHeaderSetXvac(newtup.t_data, myXID);
newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
InvalidOffsetNumber, LP_USED);
if (newoff == InvalidOffsetNumber)
{
elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
(unsigned long) tuple_len, dst_vacpage->blkno);
}
newitemid = PageGetItemId(dst_page, newoff);
pfree(newtup.t_data);
newtup.t_datamcxt = NULL;
newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
/* XLOG stuff */
if (!rel->rd_istemp)
{
XLogRecPtr recptr = log_heap_move(rel, old_buf, old_tup->t_self,
dst_buf, &newtup);
ExecCloseIndices(resultRelInfo);
if (old_buf != dst_buf)
{
PageSetLSN(old_page, recptr);
PageSetSUI(old_page, ThisStartUpID);
}
PageSetLSN(dst_page, recptr);
PageSetSUI(dst_page, ThisStartUpID);
}
else
{
/*
* No XLOG record, but still need to flag that XID
* exists on disk
*/
MyXactMadeTempRelUpdate = true;
}
END_CRIT_SECTION();
/*
* Set new tuple's t_ctid pointing to itself for last
* tuple in chain, and to next tuple in chain
* otherwise.
*/
/* Is this ok after log_heap_move() and END_CRIT_SECTION()? */
if (!ItemPointerIsValid(ctid))
newtup.t_data->t_ctid = newtup.t_self;
else
newtup.t_data->t_ctid = *ctid;
*ctid = newtup.t_self;
FreeExecutorState(estate);
LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
if (dst_buf != old_buf)
LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
/* Create index entries for the moved tuple */
if (ec->resultRelInfo->ri_NumIndices > 0)
{
ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
}
}
/*
* move_plain_tuple() -- move one tuple that is not part of a chain
*
* This routine moves old_tup from old_page to dst_page.
* On entry old_buf and dst_buf are locked exclusively, both locks are
* released before exit.
*
* Yes, a routine with eight parameters is ugly, but it's still better
* than having these 90 lines of code in repair_frag() which is already
* too long and almost unreadable.
*/
static void
move_plain_tuple(Relation rel,
Buffer old_buf, Page old_page, HeapTuple old_tup,
Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
ExecContext ec)
{
TransactionId myXID = GetCurrentTransactionId();
HeapTupleData newtup;
OffsetNumber newoff;
ItemId newitemid;
Size tuple_len = old_tup->t_len;
/* copy tuple */
heap_copytuple_with_tuple(old_tup, &newtup);
/*
* register invalidation of source tuple in catcaches.
*
* (Note: we do not need to register the copied tuple, because we
* are not changing the tuple contents and so there cannot be
* any need to flush negative catcache entries.)
*/
CacheInvalidateHeapTuple(rel, old_tup);
/* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
START_CRIT_SECTION();
/*
* Mark new tuple as MOVED_IN by me.
*/
newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
HEAP_XMIN_INVALID |
HEAP_MOVED_OFF);
newtup.t_data->t_infomask |= HEAP_MOVED_IN;
HeapTupleHeaderSetXvac(newtup.t_data, myXID);
/* add tuple to the page */
newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
InvalidOffsetNumber, LP_USED);
if (newoff == InvalidOffsetNumber)
{
elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
(unsigned long) tuple_len,
dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
dst_vacpage->offsets_used, dst_vacpage->offsets_free);
}
newitemid = PageGetItemId(dst_page, newoff);
pfree(newtup.t_data);
newtup.t_datamcxt = NULL;
newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
newtup.t_self = newtup.t_data->t_ctid;
/*
* Mark old tuple as MOVED_OFF by me.
*/
old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
HEAP_XMIN_INVALID |
HEAP_MOVED_IN);
old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
/* XLOG stuff */
if (!rel->rd_istemp)
{
XLogRecPtr recptr = log_heap_move(rel, old_buf, old_tup->t_self,
dst_buf, &newtup);
PageSetLSN(old_page, recptr);
PageSetSUI(old_page, ThisStartUpID);
PageSetLSN(dst_page, recptr);
PageSetSUI(dst_page, ThisStartUpID);
}
else
{
/*
* No XLOG record, but still need to flag that XID exists
* on disk
*/
MyXactMadeTempRelUpdate = true;
}
END_CRIT_SECTION();
dst_vacpage->free = ((PageHeader) dst_page)->pd_upper -
((PageHeader) dst_page)->pd_lower;
LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
dst_vacpage->offsets_used++;
/* insert index' tuples if needed */
if (ec->resultRelInfo->ri_NumIndices > 0)
{
ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
}
}
/*
* update_hint_bits() -- update hint bits in destination pages
*
* Scan all the pages that we moved tuples onto and update tuple
* status bits. This is not really necessary, but will save time for
* future transactions examining these tuples.
*
* XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
* pages that were move source pages but not move dest pages. One
* also wonders whether it wouldn't be better to skip this step and
* let the tuple status updates happen someplace that's not holding an
* exclusive lock on the relation.
*/
static void
update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
BlockNumber last_move_dest_block, int num_moved)
{
int checked_moved = 0;
int i;
VacPage *curpage;
for (i = 0, curpage = fraged_pages->pagedesc;
i < num_fraged_pages;
i++, curpage++)
{
Buffer buf;
Page page;
OffsetNumber max_offset;
OffsetNumber off;
int num_tuples = 0;
vacuum_delay_point();
if ((*curpage)->blkno > last_move_dest_block)
break; /* no need to scan any further */
if ((*curpage)->offsets_used == 0)
continue; /* this page was never used as a move dest */
buf = ReadBuffer(rel, (*curpage)->blkno);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
max_offset = PageGetMaxOffsetNumber(page);
for (off = FirstOffsetNumber;
off <= max_offset;
off = OffsetNumberNext(off))
{
ItemId itemid = PageGetItemId(page, off);
HeapTupleHeader htup;
if (!ItemIdIsUsed(itemid))
continue;
htup = (HeapTupleHeader) PageGetItem(page, itemid);
if (htup->t_infomask & HEAP_XMIN_COMMITTED)
continue;
/*
* See comments in the walk-along-page loop above, why we
* have Asserts here instead of if (...) elog(ERROR). The
* difference here is that we may see MOVED_IN.
*/
Assert(htup->t_infomask & HEAP_MOVED);
Assert(HeapTupleHeaderGetXvac(htup) == GetCurrentTransactionId());
if (htup->t_infomask & HEAP_MOVED_IN)
{
htup->t_infomask |= HEAP_XMIN_COMMITTED;
htup->t_infomask &= ~HEAP_MOVED;
num_tuples++;
}
else
htup->t_infomask |= HEAP_XMIN_INVALID;
}
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
WriteBuffer(buf);
Assert((*curpage)->offsets_used == num_tuples);
checked_moved += num_tuples;
}
Assert(num_moved == checked_moved);
}
/*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment