Commit e3ba5435 authored by Vadim B. Mikheev's avatar Vadim B. Mikheev

WAL fixes.

parent 433cd770
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.91 2000/10/24 09:56:07 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.92 2000/10/29 18:33:39 vadim Exp $
* *
* *
* INTERFACE ROUTINES * INTERFACE ROUTINES
...@@ -2057,6 +2057,48 @@ log_heap_move(Relation reln, ItemPointerData from, HeapTuple newtup) ...@@ -2057,6 +2057,48 @@ log_heap_move(Relation reln, ItemPointerData from, HeapTuple newtup)
return(log_heap_update(reln, from, newtup, true)); return(log_heap_update(reln, from, newtup, true));
} }
static void
_heap_cleanup_page_(Page page)
{
OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
OffsetNumber offnum;
ItemId lp;
HeapTupleHeader htup;
for (offnum = FirstOffsetNumber;
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
lp = PageGetItemId(page, offnum);
if (!ItemIdIsUsed(lp))
continue;
htup = (HeapTupleHeader) PageGetItem(page, lp);
if (!HeapTupleSatisfiesNow(htup))
lp->lp_flags &= ~LP_USED;
}
PageRepairFragmentation(page);
}
static OffsetNumber
_heap_add_tuple_(Page page, HeapTupleHeader htup, uint32 len, OffsetNumber offnum)
{
ItemId lp = PageGetItemId(page, offnum);
if (len > PageGetFreeSpace(page) ||
lp->lp_flags & LP_USED || lp->lp_len != 0)
_heap_cleanup_page_(page);
offnum = PageAddItem(page, (Item)htup, len, offnum,
LP_USED | OverwritePageMode);
return(offnum);
}
static void static void
heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{ {
...@@ -2097,24 +2139,18 @@ heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) ...@@ -2097,24 +2139,18 @@ heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
elog(STOP, "heap_delete_undo: bad page LSN"); elog(STOP, "heap_delete_undo: bad page LSN");
offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
if (PageGetMaxOffsetNumber(page) >= offnum)
lp = PageGetItemId(page, offnum); lp = PageGetItemId(page, offnum);
if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp)) /* page removed by vacuum ? */
if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
{ {
if (redo) PageSetLSN(page, lsn);
elog(STOP, "heap_delete_redo: unused/deleted target tuple"); PageSetSUI(page, ThisStartUpID);
if (!InRecovery)
elog(STOP, "heap_delete_undo: unused/deleted target tuple in rollback");
if (ItemIdDeleted(lp))
{
lp->lp_flags &= ~LP_USED;
PageRepairFragmentation(page);
UnlockAndWriteBuffer(buffer); UnlockAndWriteBuffer(buffer);
}
else
UnlockAndReleaseBuffer(buffer);
return; return;
} }
htup = (HeapTupleHeader) PageGetItem(page, lp); htup = (HeapTupleHeader) PageGetItem(page, lp);
if (redo) if (redo)
...@@ -2189,6 +2225,16 @@ heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) ...@@ -2189,6 +2225,16 @@ heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
return; return;
} }
offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
/* page removed by vacuum ? */
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
{
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
return;
}
memcpy(tbuf + offsetof(HeapTupleHeaderData, t_bits), memcpy(tbuf + offsetof(HeapTupleHeaderData, t_bits),
(char*)xlrec + SizeOfHeapInsert, newlen); (char*)xlrec + SizeOfHeapInsert, newlen);
newlen += offsetof(HeapTupleHeaderData, t_bits); newlen += offsetof(HeapTupleHeaderData, t_bits);
...@@ -2200,9 +2246,7 @@ heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) ...@@ -2200,9 +2246,7 @@ heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
htup->t_xmax = htup->t_cmax = 0; htup->t_xmax = htup->t_cmax = 0;
htup->t_infomask = HEAP_XMAX_INVALID | HEAP_XMIN_COMMITTED | xlrec->mask; htup->t_infomask = HEAP_XMAX_INVALID | HEAP_XMIN_COMMITTED | xlrec->mask;
offnum = PageAddItem(page, (Item)htup, newlen, offnum = _heap_add_tuple_(page, htup, newlen, offnum);
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
LP_USED | OverwritePageMode);
if (offnum == InvalidOffsetNumber) if (offnum == InvalidOffsetNumber)
elog(STOP, "heap_insert_redo: failed to add tuple"); elog(STOP, "heap_insert_redo: failed to add tuple");
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
...@@ -2258,6 +2302,9 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move) ...@@ -2258,6 +2302,9 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
xl_heap_update *xlrec = (xl_heap_update*) XLogRecGetData(record); xl_heap_update *xlrec = (xl_heap_update*) XLogRecGetData(record);
Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node); Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
Buffer buffer; Buffer buffer;
bool samepage =
(ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
Page page; Page page;
OffsetNumber offnum; OffsetNumber offnum;
ItemId lp; ItemId lp;
...@@ -2266,13 +2313,6 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move) ...@@ -2266,13 +2313,6 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
if (!RelationIsValid(reln)) if (!RelationIsValid(reln))
return; return;
/*
* Currently UPDATE is DELETE + INSERT and so code below are near
* exact sum of code in heap_xlog_delete & heap_xlog_insert. We could
* re-structure code better, but keeping in mind upcoming overwriting
* smgr separate heap_xlog_update code seems to be Good Thing.
*/
/* Deal with old tuple version */ /* Deal with old tuple version */
buffer = XLogReadBuffer(false, reln, buffer = XLogReadBuffer(false, reln,
...@@ -2283,6 +2323,8 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move) ...@@ -2283,6 +2323,8 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page)) if (PageIsNew((PageHeader) page))
{ {
if (samepage)
goto newsame;
PageInit(page, BufferGetPageSize(buffer), 0); PageInit(page, BufferGetPageSize(buffer), 0);
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID); PageSetSUI(page, ThisStartUpID);
...@@ -2295,6 +2337,8 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move) ...@@ -2295,6 +2337,8 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{ {
UnlockAndReleaseBuffer(buffer); UnlockAndReleaseBuffer(buffer);
if (samepage)
return;
goto newt; goto newt;
} }
} }
...@@ -2302,22 +2346,17 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move) ...@@ -2302,22 +2346,17 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
elog(STOP, "heap_update_undo: bad old tuple page LSN"); elog(STOP, "heap_update_undo: bad old tuple page LSN");
offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
if (PageGetMaxOffsetNumber(page) >= offnum)
lp = PageGetItemId(page, offnum); lp = PageGetItemId(page, offnum);
if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp)) /* page removed by vacuum ? */
{ if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
if (redo)
elog(STOP, "heap_update_redo: unused/deleted old tuple");
if (!InRecovery)
elog(STOP, "heap_update_undo: unused/deleted old tuple in rollback");
if (ItemIdDeleted(lp))
{ {
lp->lp_flags &= ~LP_USED; if (samepage)
PageRepairFragmentation(page); goto newsame;
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer); UnlockAndWriteBuffer(buffer);
}
else
UnlockAndReleaseBuffer(buffer);
goto newt; goto newt;
} }
htup = (HeapTupleHeader) PageGetItem(page, lp); htup = (HeapTupleHeader) PageGetItem(page, lp);
...@@ -2338,6 +2377,8 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move) ...@@ -2338,6 +2377,8 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
} }
if (samepage)
goto newsame;
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID); PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer); UnlockAndWriteBuffer(buffer);
...@@ -2377,6 +2418,8 @@ newt:; ...@@ -2377,6 +2418,8 @@ newt:;
return; return;
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
newsame:;
if (PageIsNew((PageHeader) page)) if (PageIsNew((PageHeader) page))
{ {
PageInit(page, BufferGetPageSize(buffer), 0); PageInit(page, BufferGetPageSize(buffer), 0);
...@@ -2401,6 +2444,16 @@ newt:; ...@@ -2401,6 +2444,16 @@ newt:;
return; return;
} }
offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid));
/* page removed by vacuum ? */
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
{
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
return;
}
hsize = SizeOfHeapUpdate; hsize = SizeOfHeapUpdate;
if (move) if (move)
hsize += sizeof(TransactionId); hsize += sizeof(TransactionId);
...@@ -2431,9 +2484,8 @@ newt:; ...@@ -2431,9 +2484,8 @@ newt:;
htup->t_infomask = HEAP_XMAX_INVALID | xlrec->mask; htup->t_infomask = HEAP_XMAX_INVALID | xlrec->mask;
} }
offnum = PageAddItem(page, (Item)htup, newlen, offnum = _heap_add_tuple_(page, htup, newlen,
ItemPointerGetOffsetNumber(&(xlrec->newtid)), ItemPointerGetOffsetNumber(&(xlrec->newtid)));
LP_USED | OverwritePageMode);
if (offnum == InvalidOffsetNumber) if (offnum == InvalidOffsetNumber)
elog(STOP, "heap_update_redo: failed to add tuple"); elog(STOP, "heap_update_redo: failed to add tuple");
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.67 2000/10/21 15:43:18 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.68 2000/10/29 18:33:40 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -787,6 +787,7 @@ _bt_add_item(Page page, OffsetNumber offno, ...@@ -787,6 +787,7 @@ _bt_add_item(Page page, OffsetNumber offno,
if (PageAddItem(page, (Item) item, size, offno, if (PageAddItem(page, (Item) item, size, offno,
LP_USED) == InvalidOffsetNumber) LP_USED) == InvalidOffsetNumber)
{ {
#ifdef NOT_USED /* it's not valid code currently */
/* ops, not enough space - try to deleted dead tuples */ /* ops, not enough space - try to deleted dead tuples */
bool result; bool result;
...@@ -795,6 +796,7 @@ _bt_add_item(Page page, OffsetNumber offno, ...@@ -795,6 +796,7 @@ _bt_add_item(Page page, OffsetNumber offno,
result = _bt_cleanup_page(page, hnode); result = _bt_cleanup_page(page, hnode);
if (!result || PageAddItem(page, (Item) item, size, offno, if (!result || PageAddItem(page, (Item) item, size, offno,
LP_USED) == InvalidOffsetNumber) LP_USED) == InvalidOffsetNumber)
#endif
return(false); return(false);
} }
...@@ -868,7 +870,7 @@ _bt_fix_left_page(Page page, XLogRecord *record, bool onleft) ...@@ -868,7 +870,7 @@ _bt_fix_left_page(Page page, XLogRecord *record, bool onleft)
(sizeof(BTItemData) - sizeof(IndexTupleData)); (sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz); itemsz = MAXALIGN(itemsz);
if (item + itemsz < (char*)record + record->xl_len) if (item + itemsz < (char*)xlrec + record->xl_len)
{ {
previtem = item; previtem = item;
item += itemsz; item += itemsz;
...@@ -1173,6 +1175,8 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -1173,6 +1175,8 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
else else
pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid)); pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid));
pageop->btpo_flags &= ~BTP_ROOT;
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID); PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer); UnlockAndWriteBuffer(buffer);
...@@ -1245,7 +1249,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -1245,7 +1249,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
} }
for (item = (char*)xlrec + hsize; for (item = (char*)xlrec + hsize;
item < (char*)record + record->xl_len; ) item < (char*)xlrec + record->xl_len; )
{ {
memcpy(&btdata, item, sizeof(BTItemData)); memcpy(&btdata, item, sizeof(BTItemData));
itemsz = IndexTupleDSize(btdata.bti_itup) + itemsz = IndexTupleDSize(btdata.bti_itup) +
...@@ -1283,7 +1287,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -1283,7 +1287,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
item = (char*)xlrec + SizeOfBtreeSplit + item = (char*)xlrec + SizeOfBtreeSplit +
sizeof(CommandId) + sizeof(RelFileNode); sizeof(CommandId) + sizeof(RelFileNode);
for (cnt = 0; item < (char*)record + record->xl_len; ) for (cnt = 0; item < (char*)xlrec + record->xl_len; )
{ {
BTItem btitem = (BTItem) BTItem btitem = (BTItem)
(tbuf + cnt * (MAXALIGN(sizeof(BTItemData)))); (tbuf + cnt * (MAXALIGN(sizeof(BTItemData))));
...@@ -1306,6 +1310,9 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -1306,6 +1310,9 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
/* Right (next) page */ /* Right (next) page */
blkno = BlockIdGetBlockNumber(&(xlrec->rightblk)); blkno = BlockIdGetBlockNumber(&(xlrec->rightblk));
if (blkno == P_NONE)
return;
buffer = XLogReadBuffer(false, reln, blkno); buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer)) if (!BufferIsValid(buffer))
elog(STOP, "btree_split_%s: lost next right page", op); elog(STOP, "btree_split_%s: lost next right page", op);
...@@ -1385,7 +1392,7 @@ btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) ...@@ -1385,7 +1392,7 @@ btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
char *item; char *item;
for (item = (char*)xlrec + SizeOfBtreeNewroot; for (item = (char*)xlrec + SizeOfBtreeNewroot;
item < (char*)record + record->xl_len; ) item < (char*)xlrec + record->xl_len; )
{ {
memcpy(&btdata, item, sizeof(BTItemData)); memcpy(&btdata, item, sizeof(BTItemData));
itemsz = IndexTupleDSize(btdata.bti_itup) + itemsz = IndexTupleDSize(btdata.bti_itup) +
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.78 2000/10/28 16:20:53 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.79 2000/10/29 18:33:41 vadim Exp $
* *
* NOTES * NOTES
* Transaction aborts can now occur two ways: * Transaction aborts can now occur two ways:
...@@ -1807,8 +1807,10 @@ xact_desc(char *buf, uint8 xl_info, char* rec) ...@@ -1807,8 +1807,10 @@ xact_desc(char *buf, uint8 xl_info, char* rec)
void void
XactPushRollback(void (*func) (void *), void* data) XactPushRollback(void (*func) (void *), void* data)
{ {
#ifdef XLOG_II
if (_RollbackFunc != NULL) if (_RollbackFunc != NULL)
elog(STOP, "XactPushRollback: already installed"); elog(STOP, "XactPushRollback: already installed");
#endif
_RollbackFunc = func; _RollbackFunc = func;
_RollbackData = data; _RollbackData = data;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment