Commit a7fcadd1 authored by Vadim B. Mikheev's avatar Vadim B. Mikheev

WAL

parent 7c177a49
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.62 2000/07/14 22:17:28 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.63 2000/10/21 15:43:09 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -23,6 +23,12 @@ ...@@ -23,6 +23,12 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#ifdef XLOG
#include "access/xlogutils.h"
void gist_redo(XLogRecPtr lsn, XLogRecord *record);
void gist_undo(XLogRecPtr lsn, XLogRecord *record);
void gist_desc(char *buf, uint8 xl_info, char* rec);
#endif
/* non-export function prototypes */ /* non-export function prototypes */
static InsertIndexResult gistdoinsert(Relation r, IndexTuple itup, static InsertIndexResult gistdoinsert(Relation r, IndexTuple itup,
...@@ -1344,3 +1350,22 @@ int_range_out(INTRANGE *r) ...@@ -1344,3 +1350,22 @@ int_range_out(INTRANGE *r)
} }
#endif /* defined GISTDEBUG */ #endif /* defined GISTDEBUG */
#ifdef XLOG
void
gist_redo(XLogRecPtr lsn, XLogRecord *record)
{
elog(STOP, "gist_redo: unimplemented");
}
void
gist_undo(XLogRecPtr lsn, XLogRecord *record)
{
elog(STOP, "gist_undo: unimplemented");
}
void
gist_desc(char *buf, uint8 xl_info, char* rec)
{
}
#endif
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.42 2000/07/14 22:17:28 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.43 2000/10/21 15:43:11 vadim Exp $
* *
* NOTES * NOTES
* This file contains only the public interface routines. * This file contains only the public interface routines.
...@@ -25,9 +25,16 @@ ...@@ -25,9 +25,16 @@
#include "executor/executor.h" #include "executor/executor.h"
#include "miscadmin.h" #include "miscadmin.h"
bool BuildingHash = false; bool BuildingHash = false;
#ifdef XLOG
#include "access/xlogutils.h"
void hash_redo(XLogRecPtr lsn, XLogRecord *record);
void hash_undo(XLogRecPtr lsn, XLogRecord *record);
void hash_desc(char *buf, uint8 xl_info, char* rec);
#endif
/* /*
* hashbuild() -- build a new hash index. * hashbuild() -- build a new hash index.
* *
...@@ -478,3 +485,22 @@ hashdelete(PG_FUNCTION_ARGS) ...@@ -478,3 +485,22 @@ hashdelete(PG_FUNCTION_ARGS)
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
#ifdef XLOG
void
hash_redo(XLogRecPtr lsn, XLogRecord *record)
{
elog(STOP, "hash_redo: unimplemented");
}
void
hash_undo(XLogRecPtr lsn, XLogRecord *record)
{
elog(STOP, "hash_undo: unimplemented");
}
void
hash_desc(char *buf, uint8 xl_info, char* rec)
{
}
#endif
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.89 2000/10/20 11:01:02 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.90 2000/10/21 15:43:14 vadim Exp $
* *
* *
* INTERFACE ROUTINES * INTERFACE ROUTINES
...@@ -86,12 +86,14 @@ ...@@ -86,12 +86,14 @@
#include "utils/inval.h" #include "utils/inval.h"
#include "utils/relcache.h" #include "utils/relcache.h"
#ifdef XLOG /* comments are in heap_update */ #ifdef XLOG
#include "access/xlogutils.h" #include "access/xlogutils.h"
void heap_redo(XLogRecPtr lsn, XLogRecord *record); void heap_redo(XLogRecPtr lsn, XLogRecord *record);
void heap_undo(XLogRecPtr lsn, XLogRecord *record); void heap_undo(XLogRecPtr lsn, XLogRecord *record);
void heap_desc(char *buf, uint8 xl_info, char* rec);
/* comments are in heap_update */
static xl_heaptid _locked_tuple_; static xl_heaptid _locked_tuple_;
static void _heap_unlock_tuple(void *data); static void _heap_unlock_tuple(void *data);
...@@ -2480,4 +2482,53 @@ HeapPageCleanup(Buffer buffer) ...@@ -2480,4 +2482,53 @@ HeapPageCleanup(Buffer buffer)
PageRepairFragmentation(page); PageRepairFragmentation(page);
} }
static void
out_target(char *buf, xl_heaptid *target)
{
sprintf(buf + strlen(buf), "node %u/%u; cid %u; tid %u/%u",
target->node.tblNode, target->node.relNode,
target->cid,
ItemPointerGetBlockNumber(&(target->tid)),
ItemPointerGetOffsetNumber(&(target->tid)));
}
void
heap_desc(char *buf, uint8 xl_info, char* rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
if (info == XLOG_HEAP_INSERT)
{
xl_heap_insert *xlrec = (xl_heap_insert*) rec;
strcat(buf, "insert: ");
out_target(buf, &(xlrec->target));
}
else if (info == XLOG_HEAP_DELETE)
{
xl_heap_delete *xlrec = (xl_heap_delete*) rec;
strcat(buf, "delete: ");
out_target(buf, &(xlrec->target));
}
else if (info == XLOG_HEAP_UPDATE)
{
xl_heap_update *xlrec = (xl_heap_update*) rec;
strcat(buf, "update: ");
out_target(buf, &(xlrec->target));
sprintf(buf + strlen(buf), "; new %u/%u",
ItemPointerGetBlockNumber(&(xlrec->newtid)),
ItemPointerGetOffsetNumber(&(xlrec->newtid)));
}
else if (info == XLOG_HEAP_MOVE)
{
xl_heap_move *xlrec = (xl_heap_move*) rec;
strcat(buf, "move: ");
out_target(buf, &(xlrec->target));
sprintf(buf + strlen(buf), "; new %u/%u",
ItemPointerGetBlockNumber(&(xlrec->newtid)),
ItemPointerGetOffsetNumber(&(xlrec->newtid)));
}
else
strcat(buf, "UNKNOWN");
}
#endif /* XLOG */ #endif /* XLOG */
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.66 2000/10/13 12:05:20 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.67 2000/10/21 15:43:18 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -527,12 +527,13 @@ _bt_insertonpg(Relation rel, ...@@ -527,12 +527,13 @@ _bt_insertonpg(Relation rel,
{ {
char xlbuf[sizeof(xl_btree_insert) + char xlbuf[sizeof(xl_btree_insert) +
sizeof(CommandId) + sizeof(RelFileNode)]; sizeof(CommandId) + sizeof(RelFileNode)];
xl_btree_insert *xlrec = xlbuf; xl_btree_insert *xlrec = (xl_btree_insert*)xlbuf;
int hsize = SizeOfBtreeInsert; int hsize = SizeOfBtreeInsert;
BTItemData truncitem; BTItemData truncitem;
BTItem xlitem = btitem; BTItem xlitem = btitem;
Size xlsize = IndexTupleDSize(btitem->bti_itup) + Size xlsize = IndexTupleDSize(btitem->bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData)); (sizeof(BTItemData) - sizeof(IndexTupleData));
XLogRecPtr recptr;
xlrec->target.node = rel->rd_node; xlrec->target.node = rel->rd_node;
ItemPointerSet(&(xlrec->target.tid), BufferGetBlockNumber(buf), newitemoff); ItemPointerSet(&(xlrec->target.tid), BufferGetBlockNumber(buf), newitemoff);
...@@ -555,7 +556,7 @@ _bt_insertonpg(Relation rel, ...@@ -555,7 +556,7 @@ _bt_insertonpg(Relation rel,
xlsize = sizeof(BTItemData); xlsize = sizeof(BTItemData);
} }
XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_INSERT, recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_INSERT,
xlbuf, hsize, (char*) xlitem, xlsize); xlbuf, hsize, (char*) xlitem, xlsize);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
...@@ -785,17 +786,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, ...@@ -785,17 +786,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
{ {
char xlbuf[sizeof(xl_btree_split) + char xlbuf[sizeof(xl_btree_split) +
sizeof(CommandId) + sizeof(RelFileNode) + BLCKSZ]; sizeof(CommandId) + sizeof(RelFileNode) + BLCKSZ];
xl_btree_split *xlrec = xlbuf; xl_btree_split *xlrec = (xl_btree_split*) xlbuf;
int hsize = SizeOfBtreeSplit; int hsize = SizeOfBtreeSplit;
int flag = (newitemonleft) ? int flag = (newitemonleft) ?
XLOG_BTREE_SPLEFT : XLOG_BTREE_SPLIT; XLOG_BTREE_SPLEFT : XLOG_BTREE_SPLIT;
BlockNumber blkno;
XLogRecPtr recptr;
xlrec->target.node = rel->rd_node; xlrec->target.node = rel->rd_node;
ItemPointerSet(&(xlrec->target.tid), itup_blkno, itup_off); ItemPointerSet(&(xlrec->target.tid), *itup_blkno, *itup_off);
if (P_ISLEAF(lopaque)) if (P_ISLEAF(lopaque))
{ {
CommandId cid = GetCurrentCommandId(); CommandId cid = GetCurrentCommandId();
memcpy(xlbuf + hsize, &(char*)cid, sizeof(CommandId)); memcpy(xlbuf + hsize, &cid, sizeof(CommandId));
hsize += sizeof(CommandId); hsize += sizeof(CommandId);
memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode)); memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode));
hsize += sizeof(RelFileNode); hsize += sizeof(RelFileNode);
...@@ -814,7 +817,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, ...@@ -814,7 +817,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
* Actually, seems that in non-leaf splits newitem shouldn't * Actually, seems that in non-leaf splits newitem shouldn't
* go to first data key position on left page. * go to first data key position on left page.
*/ */
if (! P_ISLEAF(lopaque) && itup_off == P_FIRSTDATAKEY(lopaque)) if (! P_ISLEAF(lopaque) && *itup_off == P_FIRSTDATAKEY(lopaque))
{ {
BTItemData truncitem = *newitem; BTItemData truncitem = *newitem;
truncitem.bti_itup.t_info = sizeof(BTItemData); truncitem.bti_itup.t_info = sizeof(BTItemData);
...@@ -828,20 +831,24 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, ...@@ -828,20 +831,24 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
memcpy(xlbuf + hsize, (char*) newitem, itemsz); memcpy(xlbuf + hsize, (char*) newitem, itemsz);
hsize += itemsz; hsize += itemsz;
} }
xlrec->otherblk = BufferGetBlockNumber(rbuf); blkno = BufferGetBlockNumber(rbuf);
BlockIdSet(&(xlrec->otherblk), blkno);
} }
else else
xlrec->otherblk = BufferGetBlockNumber(buf); {
blkno = BufferGetBlockNumber(buf);
BlockIdSet(&(xlrec->otherblk), blkno);
}
xlrec->rightblk = ropaque->btpo_next; BlockIdSet(&(xlrec->rightblk), ropaque->btpo_next);
/* /*
* Dirrect access to page is not good but faster - we should * Dirrect access to page is not good but faster - we should
* implement some new func in page API. * implement some new func in page API.
*/ */
XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, flag, xlbuf, recptr = XLogInsert(RM_BTREE_ID, flag, xlbuf,
hsize, (char*)rightpage + (PageHeader) rightpage)->pd_upper, hsize, (char*)rightpage + ((PageHeader) rightpage)->pd_upper,
((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->upper); ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
PageSetLSN(leftpage, recptr); PageSetLSN(leftpage, recptr);
PageSetSUI(leftpage, ThisStartUpID); PageSetSUI(leftpage, ThisStartUpID);
...@@ -1070,7 +1077,7 @@ static Buffer ...@@ -1070,7 +1077,7 @@ static Buffer
_bt_getstackbuf(Relation rel, BTStack stack) _bt_getstackbuf(Relation rel, BTStack stack)
{ {
BlockNumber blkno; BlockNumber blkno;
Buffer buf, newbuf; Buffer buf;
OffsetNumber start, OffsetNumber start,
offnum, offnum,
maxoff; maxoff;
...@@ -1236,6 +1243,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) ...@@ -1236,6 +1243,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
xl_btree_newroot xlrec; xl_btree_newroot xlrec;
Page metapg = BufferGetPage(metabuf); Page metapg = BufferGetPage(metabuf);
BTMetaPageData *metad = BTPageGetMeta(metapg); BTMetaPageData *metad = BTPageGetMeta(metapg);
XLogRecPtr recptr;
xlrec.node = rel->rd_node; xlrec.node = rel->rd_node;
BlockIdSet(&(xlrec.rootblk), rootblknum); BlockIdSet(&(xlrec.rootblk), rootblknum);
...@@ -1244,10 +1252,10 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) ...@@ -1244,10 +1252,10 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
* Dirrect access to page is not good but faster - we should * Dirrect access to page is not good but faster - we should
* implement some new func in page API. * implement some new func in page API.
*/ */
XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT,
&xlrec, SizeOfBtreeNewroot, (char*)&xlrec, SizeOfBtreeNewroot,
(char*)rootpage + (PageHeader) rootpage)->pd_upper, (char*)rootpage + ((PageHeader) rootpage)->pd_upper,
((PageHeader) rootpage)->pd_special - ((PageHeader) rootpage)->upper); ((PageHeader) rootpage)->pd_special - ((PageHeader) rootpage)->pd_upper);
metad->btm_root = rootblknum; metad->btm_root = rootblknum;
(metad->btm_level)++; (metad->btm_level)++;
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.39 2000/10/13 02:03:00 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.40 2000/10/21 15:43:18 vadim Exp $
* *
* NOTES * NOTES
* Postgres btree pages look like ordinary relation pages. The opaque * Postgres btree pages look like ordinary relation pages. The opaque
...@@ -171,13 +171,14 @@ _bt_getroot(Relation rel, int access) ...@@ -171,13 +171,14 @@ _bt_getroot(Relation rel, int access)
#ifdef XLOG #ifdef XLOG
/* XLOG stuff */ /* XLOG stuff */
{ {
xl_btree_newroot xlrec; xl_btree_newroot xlrec;
XLogRecPtr recptr;
xlrec.node = rel->rd_node; xlrec.node = rel->rd_node;
BlockIdSet(&(xlrec.rootblk), rootblkno); BlockIdSet(&(xlrec.rootblk), rootblkno);
XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT,
&xlrec, SizeOfBtreeNewroot, NULL, 0); (char*)&xlrec, SizeOfBtreeNewroot, NULL, 0);
PageSetLSN(rootpage, recptr); PageSetLSN(rootpage, recptr);
PageSetSUI(rootpage, ThisStartUpID); PageSetSUI(rootpage, ThisStartUpID);
...@@ -404,10 +405,12 @@ _bt_pagedel(Relation rel, ItemPointer tid) ...@@ -404,10 +405,12 @@ _bt_pagedel(Relation rel, ItemPointer tid)
/* XLOG stuff */ /* XLOG stuff */
{ {
xl_btree_delete xlrec; xl_btree_delete xlrec;
XLogRecPtr recptr;
xlrec.target.node = rel->rd_node; xlrec.target.node = rel->rd_node;
xlrec.target.tid = *tid; xlrec.target.tid = *tid;
XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE,
(char*) xlrec, SizeOfBtreeDelete, NULL, 0); (char*) &xlrec, SizeOfBtreeDelete, NULL, 0);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID); PageSetSUI(page, ThisStartUpID);
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.66 2000/10/20 11:01:03 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.67 2000/10/21 15:43:18 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -32,6 +32,14 @@ bool BuildingBtree = false; /* see comment in btbuild() */ ...@@ -32,6 +32,14 @@ bool BuildingBtree = false; /* see comment in btbuild() */
bool FastBuild = true; /* use sort/build instead of insertion bool FastBuild = true; /* use sort/build instead of insertion
* build */ * build */
#ifdef XLOG
#include "access/xlogutils.h"
void btree_redo(XLogRecPtr lsn, XLogRecord *record);
void btree_undo(XLogRecPtr lsn, XLogRecord *record);
void btree_desc(char *buf, uint8 xl_info, char* rec);
#endif
static void _bt_restscan(IndexScanDesc scan); static void _bt_restscan(IndexScanDesc scan);
/* /*
...@@ -732,163 +740,382 @@ _bt_restscan(IndexScanDesc scan) ...@@ -732,163 +740,382 @@ _bt_restscan(IndexScanDesc scan)
} }
#ifdef XLOG #ifdef XLOG
void btree_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
if (info == XLOG_BTREE_DELETE)
btree_xlog_delete(true, lsn, record);
else if (info == XLOG_BTREE_INSERT)
btree_xlog_insert(true, lsn, record);
else if (info == XLOG_BTREE_SPLIT)
btree_xlog_split(true, false, lsn, record); /* new item on the right */
else if (info == XLOG_BTREE_SPLEFT)
btree_xlog_split(true, true, lsn, record); /* new item on the left */
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(true, lsn, record);
else
elog(STOP, "btree_redo: unknown op code %u", info);
}
void btree_undo(XLogRecPtr lsn, XLogRecord *record) static bool
_bt_cleanup_page(Page page, RelFileNode hnode)
{ {
uint8 info = record->xl_info & ~XLR_INFO_MASK; OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
OffsetNumber offno;
ItemId lp;
BTItem item;
bool result = false;
if (info == XLOG_BTREE_DELETE) for (offno = P_FIRSTDATAKEY(pageop); offno <= maxoff; )
btree_xlog_delete(false, lsn, record); {
else if (info == XLOG_BTREE_INSERT) lp = PageGetItemId(page, offno);
btree_xlog_insert(false, lsn, record); item = (BTItem) PageGetItem(page, lp);
else if (info == XLOG_BTREE_SPLIT) if (XLogIsValidTuple(hnode, &(item->bti_itup.t_tid)))
btree_xlog_split(false, false, lsn, record);/* new item on the right */ offno = OffsetNumberNext(offno);
else if (info == XLOG_BTREE_SPLEFT) else
btree_xlog_split(false, true, lsn, record); /* new item on the left */ {
else if (info == XLOG_BTREE_NEWROOT) PageIndexTupleDelete(page, offno);
btree_xlog_newroot(false, lsn, record); maxoff = PageGetMaxOffsetNumber(page);
else result = true;
elog(STOP, "btree_undo: unknown op code %u", info); }
}
return(result);
} }
static void btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) static bool
_bt_add_item(Page page, OffsetNumber offno,
char* item, Size size, RelFileNode hnode)
{ {
xl_btree_delete *xlrec; BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
Relation *reln;
Buffer buffer;
Page page;
if (!redo) if (offno > PageGetMaxOffsetNumber(page) + 1)
return; {
if (! (pageop->btpo_flags & BTP_REORDER))
{
elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected");
pageop->btpo_flags |= BTP_REORDER;
}
offno = PageGetMaxOffsetNumber(page) + 1;
}
xlrec = (xl_btree_delete*) XLogRecGetData(record); if (PageAddItem(page, (Item) item, size, offno,
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); LP_USED) == InvalidOffsetNumber)
if (!RelationIsValid(reln)) {
return; /* ops, not enough space - try to deleted dead tuples */
buffer = XLogReadBuffer(false, reln, bool result;
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
if (!BufferIsValid(buffer))
elog(STOP, "btree_delete_redo: block unfound");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_delete_redo: uninitialized page");
PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid))); if (! P_ISLEAF(pageop))
return(false);
result = _bt_cleanup_page(page, hnode);
if (!result || PageAddItem(page, (Item) item, size, offno,
LP_USED) == InvalidOffsetNumber)
return(false);
}
return; return(true);
} }
static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) /*
* Remove from left sibling items belonging to right sibling
* and change P_HIKEY
*/
static void
_bt_fix_left_page(Page page, XLogRecord *record, bool onleft)
{ {
xl_btree_insert *xlrec; char *xlrec = (char*) XLogRecGetData(record);
Relation *reln; BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
Buffer buffer; Size hsize = SizeOfBtreeSplit;
Page page; RelFileNode hnode;
BTPageOpaque pageop; BTItemData btdata;
OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
OffsetNumber offno;
char *item;
Size itemsz;
char *previtem = NULL;
char *lhikey = NULL;
Size lhisize = 0;
xlrec = (xl_btree_insert*) XLogRecGetData(record); if (pageop->btpo_flags & BTP_LEAF)
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); {
if (!RelationIsValid(reln)) hsize += (sizeof(CommandId) + sizeof(RelFileNode));
return; memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit +
buffer = XLogReadBuffer((redo) ? true : false, reln, sizeof(CommandId), sizeof(RelFileNode));
ItemPointerGetBlockNumber(&(xlrec->target.tid))); }
if (!BufferIsValid(buffer)) else
return; {
page = (Page) BufferGetPage(buffer); lhikey = (char*)xlrec + hsize;
if (PageIsNew((PageHeader) page)) memcpy(&btdata, lhikey, sizeof(BTItemData));
elog(STOP, "btree_insert_%s: uninitialized page", lhisize = IndexTupleDSize(btdata.bti_itup) +
(redo) ? "redo" : "undo"); (sizeof(BTItemData) - sizeof(IndexTupleData));
pageop = (BTPageOpaque) PageGetSpecialPointer(page); hsize += lhisize;
}
if (redo) if (! P_RIGHTMOST(pageop))
PageIndexTupleDelete(page, P_HIKEY);
if (onleft) /* skip target item */
{ {
if (XLByteLE(lsn, PageGetLSN(page))) memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
UnlockAndReleaseBuffer(buffer); itemsz = IndexTupleDSize(btdata.bti_itup) +
else (sizeof(BTItemData) - sizeof(IndexTupleData));
hsize += itemsz;
}
for (item = (char*)xlrec + hsize; ; )
{
memcpy(&btdata, item, sizeof(BTItemData));
for (offno = P_FIRSTDATAKEY(pageop);
offno <= maxoff;
offno = OffsetNumberNext(offno))
{ {
Size hsize = SizeOfBtreeInsert; ItemId lp = PageGetItemId(page, offno);
RelFileNode hnode; BTItem btitem = (BTItem) PageGetItem(page, lp);
if (P_ISLEAF(pageop)) if (BTItemSame(&btdata, btitem))
{ {
hsize += (sizeof(CommandId) + sizeof(RelFileNode)); PageIndexTupleDelete(page, offno);
memcpy(&hnode, (char*)xlrec + SizeOfBtreeInsert + break;
sizeof(CommandId), sizeof(RelFileNode));
} }
if (! _bt_add_item(page,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
(char*)xlrec + hsize,
record->xl_len - hsize,
hnode))
elog(STOP, "btree_insert_redo: failed to add item");
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
} }
}
else
{
BTItemData btdata;
if (XLByteLT(PageGetLSN(page), lsn)) itemsz = IndexTupleDSize(btdata.bti_itup) +
elog(STOP, "btree_insert_undo: bad page LSN"); (sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz);
if (! P_ISLEAF(pageop)) if (item + itemsz < (char*)record + record->xl_len)
{ {
UnlockAndReleaseBuffer(buffer); previtem = item;
return; item += itemsz;
} }
else
break;
}
memcpy(&btdata, (char*)xlrec + SizeOfBtreeInsert + /* time to insert hi-key */
sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData)); if (pageop->btpo_flags & BTP_LEAF)
{
_bt_del_item(reln, buffer, &btdata, true, lsn, record); lhikey = (P_RIGHTMOST(pageop)) ? item : previtem;
memcpy(&btdata, lhikey, sizeof(BTItemData));
lhisize = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
} }
if (! _bt_add_item(page,
P_HIKEY,
lhikey,
lhisize,
hnode))
elog(STOP, "btree_split_redo: failed to add hi key to left sibling");
return; return;
} }
/*
* UNDO insertion on *leaf* page:
* - find inserted tuple;
* - delete it if heap tuple was inserted by the same xaction
*/
static void static void
btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
XLogRecPtr lsn, XLogRecord *record)
{ {
xl_btree_split *xlrec; char *xlrec = (char*) XLogRecGetData(record);
Relation *reln; Page page = (Page) BufferGetPage(buffer);
BlockNumber blkno; BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
BlockNumber parent; BlockNumber blkno;
Buffer buffer; OffsetNumber offno;
Page page; ItemId lp;
BTPageOpaque pageop; BTItem item;
char *op = (redo) ? "redo" : "undo";
bool isleaf;
xlrec = (xl_btree_split*) XLogRecGetData(record); for ( ; ; )
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); {
if (!RelationIsValid(reln)) OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
return;
for (offno = P_FIRSTDATAKEY(pageop);
offno <= maxoff;
offno = OffsetNumberNext(offno))
{
lp = PageGetItemId(page, offno);
item = (BTItem) PageGetItem(page, lp);
if (BTItemSame(item, btitem))
break;
}
if (offno <= maxoff)
break;
offno = InvalidOffsetNumber;
if (P_RIGHTMOST(pageop))
break;
blkno = pageop->btpo_next;
UnlockAndReleaseBuffer(buffer);
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_%s_undo: lost right sibling",
(insert) ? "insert" : "split");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_%s_undo: uninitialized right sibling",
(insert) ? "insert" : "split");
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (XLByteLT(PageGetLSN(page), lsn))
break;
}
if (offno == InvalidOffsetNumber) /* not found */
{
if (!InRecovery)
elog(STOP, "btree_%s_undo: lost target tuple in rollback",
(insert) ? "insert" : "split");
UnlockAndReleaseBuffer(buffer);
return;
}
lp = PageGetItemId(page, offno);
if (InRecovery) /* check heap tuple */
{
if (!ItemIdDeleted(lp))
{
int result;
CommandId cid;
RelFileNode hnode;
Size hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit;
memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId));
memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode));
result = XLogIsOwnerOfTuple(hnode, &(btitem->bti_itup.t_tid),
record->xl_xid, cid);
if (result < 0) /* not owner */
{
UnlockAndReleaseBuffer(buffer);
return;
}
}
PageIndexTupleDelete(page, offno);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_flags |= BTP_REORDER;
UnlockAndWriteBuffer(buffer);
return;
}
/* normal rollback */
if (ItemIdDeleted(lp)) /* marked for deletion ?! */
elog(STOP, "btree_%s_undo: deleted target tuple in rollback",
(insert) ? "insert" : "split");
lp->lp_flags |= LP_DELETE;
MarkBufferForCleanup(buffer, IndexPageCleanup);
return;
}
static void
btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_delete *xlrec;
Relation reln;
Buffer buffer;
Page page;
if (!redo)
return;
xlrec = (xl_btree_delete*) XLogRecGetData(record);
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
if (!BufferIsValid(buffer))
elog(STOP, "btree_delete_redo: block unfound");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_delete_redo: uninitialized page");
PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid)));
return;
}
static void
btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_insert *xlrec;
Relation reln;
Buffer buffer;
Page page;
BTPageOpaque pageop;
xlrec = (xl_btree_insert*) XLogRecGetData(record);
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer((redo) ? true : false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_insert_%s: uninitialized page",
(redo) ? "redo" : "undo");
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (redo)
{
if (XLByteLE(lsn, PageGetLSN(page)))
UnlockAndReleaseBuffer(buffer);
else
{
Size hsize = SizeOfBtreeInsert;
RelFileNode hnode;
if (P_ISLEAF(pageop))
{
hsize += (sizeof(CommandId) + sizeof(RelFileNode));
memcpy(&hnode, (char*)xlrec + SizeOfBtreeInsert +
sizeof(CommandId), sizeof(RelFileNode));
}
if (! _bt_add_item(page,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
(char*)xlrec + hsize,
record->xl_len - hsize,
hnode))
elog(STOP, "btree_insert_redo: failed to add item");
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
}
else
{
BTItemData btdata;
if (XLByteLT(PageGetLSN(page), lsn))
elog(STOP, "btree_insert_undo: bad page LSN");
if (! P_ISLEAF(pageop))
{
UnlockAndReleaseBuffer(buffer);
return;
}
memcpy(&btdata, (char*)xlrec + SizeOfBtreeInsert +
sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData));
_bt_del_item(reln, buffer, &btdata, true, lsn, record);
}
return;
}
static void
btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_split *xlrec;
Relation reln;
BlockNumber blkno;
BlockNumber parent;
Buffer buffer;
Page page;
BTPageOpaque pageop;
char *op = (redo) ? "redo" : "undo";
bool isleaf;
xlrec = (xl_btree_split*) XLogRecGetData(record);
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
/* Left (original) sibling */ /* Left (original) sibling */
blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) : blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
BlockIdGetBlockNumber(xlrec->otherblk); BlockIdGetBlockNumber(&(xlrec->otherblk));
buffer = XLogReadBuffer(false, reln, blkno); buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer)) if (!BufferIsValid(buffer))
elog(STOP, "btree_split_%s: lost left sibling", op); elog(STOP, "btree_split_%s: lost left sibling", op);
...@@ -917,7 +1144,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -917,7 +1144,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
Size itemsz; Size itemsz;
RelFileNode hnode; RelFileNode hnode;
pageop->btpo_next = BlockIdGetBlockNumber(xlrec->otherblk); pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->otherblk));
if (isleaf) if (isleaf)
{ {
hsize += (sizeof(CommandId) + sizeof(RelFileNode)); hsize += (sizeof(CommandId) + sizeof(RelFileNode));
...@@ -970,7 +1197,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -970,7 +1197,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
} }
/* Right (new) sibling */ /* Right (new) sibling */
blkno = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) : blkno = (onleft) ? BlockIdGetBlockNumber(&(xlrec->otherblk)) :
ItemPointerGetBlockNumber(&(xlrec->target.tid)); ItemPointerGetBlockNumber(&(xlrec->target.tid));
buffer = XLogReadBuffer((redo) ? true : false, reln, blkno); buffer = XLogReadBuffer((redo) ? true : false, reln, blkno);
if (!BufferIsValid(buffer)) if (!BufferIsValid(buffer))
...@@ -993,6 +1220,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -993,6 +1220,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
Size hsize = SizeOfBtreeSplit; Size hsize = SizeOfBtreeSplit;
BTItemData btdata; BTItemData btdata;
Size itemsz; Size itemsz;
char *item;
_bt_pageinit(page, BufferGetPageSize(buffer)); _bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page); pageop = (BTPageOpaque) PageGetSpecialPointer(page);
...@@ -1016,7 +1244,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -1016,7 +1244,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
hsize += itemsz; hsize += itemsz;
} }
for (char* item = (char*)xlrec + hsize; for (item = (char*)xlrec + hsize;
item < (char*)record + record->xl_len; ) item < (char*)record + record->xl_len; )
{ {
memcpy(&btdata, item, sizeof(BTItemData)); memcpy(&btdata, item, sizeof(BTItemData));
...@@ -1030,8 +1258,8 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -1030,8 +1258,8 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
} }
pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) : pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
BlockIdGetBlockNumber(xlrec->otherblk); BlockIdGetBlockNumber(&(xlrec->otherblk));
pageop->btpo_next = BlockIdGetBlockNumber(xlrec->rightblk); pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->rightblk));
pageop->btpo_parent = parent; pageop->btpo_parent = parent;
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
...@@ -1077,7 +1305,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -1077,7 +1305,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
} }
/* Right (next) page */ /* Right (next) page */
blkno = BlockIdGetBlockNumber(xlrec->rightblk); blkno = BlockIdGetBlockNumber(&(xlrec->rightblk));
buffer = XLogReadBuffer(false, reln, blkno); buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer)) if (!BufferIsValid(buffer))
elog(STOP, "btree_split_%s: lost next right page", op); elog(STOP, "btree_split_%s: lost next right page", op);
...@@ -1093,7 +1321,8 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -1093,7 +1321,8 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
else else
{ {
pageop = (BTPageOpaque) PageGetSpecialPointer(page); pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) : pageop->btpo_prev = (onleft) ?
BlockIdGetBlockNumber(&(xlrec->otherblk)) :
ItemPointerGetBlockNumber(&(xlrec->target.tid)); ItemPointerGetBlockNumber(&(xlrec->target.tid));
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
...@@ -1111,10 +1340,11 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -1111,10 +1340,11 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
} }
static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) static void
btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
{ {
xl_btree_newroot *xlrec; xl_btree_newroot *xlrec;
Relation *reln; Relation reln;
Buffer buffer; Buffer buffer;
Page page; Page page;
Buffer metabuf; Buffer metabuf;
...@@ -1137,6 +1367,8 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) ...@@ -1137,6 +1367,8 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn)) if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn))
{ {
BTPageOpaque pageop;
_bt_pageinit(page, BufferGetPageSize(buffer)); _bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page); pageop = (BTPageOpaque) PageGetSpecialPointer(page);
...@@ -1150,8 +1382,9 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) ...@@ -1150,8 +1382,9 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
{ {
BTItemData btdata; BTItemData btdata;
Size itemsz; Size itemsz;
char *item;
for (char* item = (char*)xlrec + SizeOfBtreeNewroot; for (item = (char*)xlrec + SizeOfBtreeNewroot;
item < (char*)record + record->xl_len; ) item < (char*)record + record->xl_len; )
{ {
memcpy(&btdata, item, sizeof(BTItemData)); memcpy(&btdata, item, sizeof(BTItemData));
...@@ -1182,7 +1415,7 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) ...@@ -1182,7 +1415,7 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
md.btm_version = BTREE_VERSION; md.btm_version = BTREE_VERSION;
md.btm_root = P_NONE; md.btm_root = P_NONE;
md.btm_level = 0; md.btm_level = 0;
memcpy((char *) BTPageGetMeta(pg), (char *) &md, sizeof(md)); memcpy((char *) BTPageGetMeta(metapg), (char *) &md, sizeof(md));
} }
if (XLByteLT(PageGetLSN(metapg), lsn)) if (XLByteLT(PageGetLSN(metapg), lsn))
...@@ -1201,255 +1434,89 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) ...@@ -1201,255 +1434,89 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
return; return;
} }
/* void
* UNDO insertion on *leaf* page: btree_redo(XLogRecPtr lsn, XLogRecord *record)
* - find inserted tuple;
* - delete it if heap tuple was inserted by the same xaction
*/
static void
_bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
XLogRecPtr lsn, XLogRecord *record)
{ {
char *xlrec = (char*) XLogRecGetData(record); uint8 info = record->xl_info & ~XLR_INFO_MASK;
Page page = (Page) BufferGetPage(buffer);
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
BlockNumber blkno;
OffsetNumber offno;
ItemId lp;
BTItem item;
for ( ; ; )
{
OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
for (offno = P_FIRSTDATAKEY(pageop);
offno <= maxoff;
offno = OffsetNumberNext(offno))
{
lp = PageGetItemId(page, offno);
item = (BTItem) PageGetItem(page, lp);
if (BTItemSame(item, btitem))
break;
}
if (offno <= maxoff)
break;
offno = InvalidOffsetNumber;
if (P_RIGHTMOST(pageop))
break;
blkno = pageop->btpo_next;
UnlockAndReleaseBuffer(buffer);
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_%s_undo: lost right sibling",
(insert) ? "insert" : "split");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_%s_undo: uninitialized right sibling",
(insert) ? "insert" : "split");
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (XLByteLT(PageGetLSN(page), lsn))
break;
}
if (offno == InvalidOffsetNumber) /* not found */
{
if (!InRecovery)
elog(STOP, "btree_%s_undo: lost target tuple in rollback",
(insert) ? "insert" : "split");
UnlockAndReleaseBuffer(buffer);
return;
}
lp = PageGetItemId(page, offno);
if (InRecovery) /* check heap tuple */
{
if (!ItemIdDeleted(lp))
{
int result;
CommandId cid;
RelFileNode hnode;
Size hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit;
memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId));
memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode));
result = XLogIsOwnerOfTuple(hnode, &(btitem->bti_itup.t_tid),
record->xl_xid, cid);
if (result < 0) /* not owner */
{
UnlockAndReleaseBuffer(buffer);
return;
}
}
PageIndexTupleDelete(page, offno);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_flags |= BTP_REORDER;
UnlockAndWriteBuffer(buffer);
return;
}
/* normal rollback */
if (ItemIdDeleted(lp)) /* marked for deletion ?! */
elog(STOP, "btree_%s_undo: deleted target tuple in rollback",
(insert) ? "insert" : "split");
lp->lp_flags |= LP_DELETE; if (info == XLOG_BTREE_DELETE)
MarkBufferForCleanup(buffer, IndexPageCleanup); btree_xlog_delete(true, lsn, record);
return; else if (info == XLOG_BTREE_INSERT)
btree_xlog_insert(true, lsn, record);
else if (info == XLOG_BTREE_SPLIT)
btree_xlog_split(true, false, lsn, record); /* new item on the right */
else if (info == XLOG_BTREE_SPLEFT)
btree_xlog_split(true, true, lsn, record); /* new item on the left */
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(true, lsn, record);
else
elog(STOP, "btree_redo: unknown op code %u", info);
} }
static bool void
_bt_add_item(Page page, OffsetNumber offno, btree_undo(XLogRecPtr lsn, XLogRecord *record)
char* item, Size size, RelFileNode hnode)
{ {
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); uint8 info = record->xl_info & ~XLR_INFO_MASK;
if (offno > PageGetMaxOffsetNumber(page) + 1)
{
if (! (pageop->btpo_flags & BTP_REORDER))
{
elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected");
pageop->btpo_flags |= BTP_REORDER;
}
offno = PageGetMaxOffsetNumber(page) + 1;
}
if (PageAddItem(page, (Item) item, size, offno,
LP_USED) == InvalidOffsetNumber)
{
/* ops, not enough space - try to deleted dead tuples */
bool result;
if (! P_ISLEAF(pageop))
return(false);
result = _bt_cleanup_page(page, hnode);
if (!result || PageAddItem(page, (Item) item, size, offno,
LP_USED) == InvalidOffsetNumber)
return(false);
}
return(true); if (info == XLOG_BTREE_DELETE)
btree_xlog_delete(false, lsn, record);
else if (info == XLOG_BTREE_INSERT)
btree_xlog_insert(false, lsn, record);
else if (info == XLOG_BTREE_SPLIT)
btree_xlog_split(false, false, lsn, record);/* new item on the right */
else if (info == XLOG_BTREE_SPLEFT)
btree_xlog_split(false, true, lsn, record); /* new item on the left */
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(false, lsn, record);
else
elog(STOP, "btree_undo: unknown op code %u", info);
} }
static bool static void
_bt_cleanup_page(Page page, RelFileNode hnode) out_target(char *buf, xl_btreetid *target)
{ {
OffsetNumber maxoff = PageGetMaxOffsetNumber(page); sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u",
OffsetNumber offno; target->node.tblNode, target->node.relNode,
ItemId lp; ItemPointerGetBlockNumber(&(target->tid)),
BTItem item; ItemPointerGetOffsetNumber(&(target->tid)));
bool result = false;
for (offno = P_FIRSTDATAKEY(pageop); offno <= maxoff; )
{
lp = PageGetItemId(page, offno);
item = (BTItem) PageGetItem(page, lp);
if (XLogIsValidTuple(hnode, &(item->bti_itup.t_tid))
offno = OffsetNumberNext(offno);
else
{
PageIndexTupleDelete(page, offno);
maxoff = PageGetMaxOffsetNumber(page);
result = true;
}
}
return(result);
} }
/* void
* Remove from left sibling items belonging to right sibling btree_desc(char *buf, uint8 xl_info, char* rec)
* and change P_HIKEY
*/
static void
_bt_fix_left_page(Page page, XLogRecord *record, bool onleft)
{ {
char *xlrec = (char*) XLogRecGetData(record); uint8 info = xl_info & ~XLR_INFO_MASK;
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
Size hsize = SizeOfBtreeSplit;
RelFileNode hnode;
BTItemData btdata;
OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
OffsetNumber offno;
char *item;
Size itemsz;
char *previtem = NULL;
char *lhikey = NULL;
Size lhisize = 0;
if (pageop->btpo_flags & BTP_LEAF) if (info == XLOG_BTREE_INSERT)
{ {
hsize += (sizeof(CommandId) + sizeof(RelFileNode)); xl_btree_insert *xlrec = (xl_btree_insert*) rec;
memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + strcat(buf, "insert: ");
sizeof(CommandId), sizeof(RelFileNode)); out_target(buf, &(xlrec->target));
} }
else else if (info == XLOG_BTREE_DELETE)
{ {
lhikey = (char*)xlrec + hsize; xl_btree_delete *xlrec = (xl_btree_delete*) rec;
memcpy(&btdata, lhikey, sizeof(BTItemData)); strcat(buf, "delete: ");
lhisize = IndexTupleDSize(btdata.bti_itup) + out_target(buf, &(xlrec->target));
(sizeof(BTItemData) - sizeof(IndexTupleData));
hsize += lhisize;
} }
else if (info == XLOG_BTREE_SPLIT || info == XLOG_BTREE_SPLEFT)
if (! P_RIGHTMOST(pageop))
PageIndexTupleDelete(page, P_HIKEY);
if (onleft) /* skip target item */
{
memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
itemsz = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
hsize += itemsz;
}
for (item = (char*)xlrec + hsize; ; )
{ {
memcpy(&btdata, item, sizeof(BTItemData)); xl_btree_split *xlrec = (xl_btree_split*) rec;
for (offno = P_FIRSTDATAKEY(pageop); sprintf(buf + strlen(buf), "split(%s): ",
offno <= maxoff; (info == XLOG_BTREE_SPLIT) ? "right" : "left");
offno = OffsetNumberNext(offno)) out_target(buf, &(xlrec->target));
{ sprintf(buf + strlen(buf), "; oth %u; rgh %u",
ItemId lp = PageGetItemId(page, offno); BlockIdGetBlockNumber(&xlrec->otherblk),
BTItem btitem = (BTItem) PageGetItem(page, lp); BlockIdGetBlockNumber(&xlrec->rightblk));
if (BTItemSame(&btdata, btitem))
{
PageIndexTupleDelete(page, offno);
break;
}
}
itemsz = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz);
if (item + itemsz < (char*)record + record->xl_len)
{
previtem = item;
item += itemsz;
}
else
break;
} }
else if (info == XLOG_BTREE_NEWROOT)
/* time to insert hi-key */
if (pageop->btpo_flags & BTP_LEAF)
{ {
lhikey = (P_RIGHTMOST(pageop)) ? item : previtem; xl_btree_newroot *xlrec = (xl_btree_newroot*) rec;
memcpy(&btdata, lhikey, sizeof(BTItemData)); sprintf(buf + strlen(buf), "root: node %u/%u; blk %u",
lhisize = IndexTupleDSize(btdata.bti_itup) + xlrec->node.tblNode, xlrec->node.relNode,
(sizeof(BTItemData) - sizeof(IndexTupleData)); BlockIdGetBlockNumber(&xlrec->rootblk));
} }
else
if (! _bt_add_item(page, strcat(buf, "UNKNOWN");
P_HIKEY,
lhikey,
lhisize,
&hnode))
elog(STOP, "btree_split_redo: failed to add hi key to left sibling");
return;
} }
#endif #endif
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.53 2000/07/30 20:43:40 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.54 2000/10/21 15:43:20 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -22,6 +22,12 @@ ...@@ -22,6 +22,12 @@
#include "executor/executor.h" #include "executor/executor.h"
#include "miscadmin.h" #include "miscadmin.h"
#ifdef XLOG
#include "access/xlogutils.h"
void rtree_redo(XLogRecPtr lsn, XLogRecord *record);
void rtree_undo(XLogRecPtr lsn, XLogRecord *record);
void rtree_desc(char *buf, uint8 xl_info, char* rec);
#endif
typedef struct SPLITVEC typedef struct SPLITVEC
{ {
...@@ -1066,3 +1072,22 @@ _rtdump(Relation r) ...@@ -1066,3 +1072,22 @@ _rtdump(Relation r)
} }
#endif /* defined RTDEBUG */ #endif /* defined RTDEBUG */
#ifdef XLOG
void
rtree_redo(XLogRecPtr lsn, XLogRecord *record)
{
elog(STOP, "rtree_redo: unimplemented");
}
void
rtree_undo(XLogRecPtr lsn, XLogRecord *record)
{
elog(STOP, "rtree_undo: unimplemented");
}
void
rtree_desc(char *buf, uint8 xl_info, char* rec)
{
}
#endif
#include "postgres.h" #include "postgres.h"
#include "access/rmgr.h" #include "access/xlog.h"
RmgrData *RmgrTable = NULL; #ifdef XLOG
extern void xlog_redo(XLogRecPtr lsn, XLogRecord *rptr);
extern void xlog_undo(XLogRecPtr lsn, XLogRecord *rptr);
extern void xlog_desc(char *buf, uint8 xl_info, char* rec);
extern void xact_redo(XLogRecPtr lsn, XLogRecord *rptr);
extern void xact_undo(XLogRecPtr lsn, XLogRecord *rptr);
extern void xact_desc(char *buf, uint8 xl_info, char* rec);
extern void smgr_redo(XLogRecPtr lsn, XLogRecord *rptr);
extern void smgr_undo(XLogRecPtr lsn, XLogRecord *rptr);
extern void smgr_desc(char *buf, uint8 xl_info, char* rec);
extern void heap_redo(XLogRecPtr lsn, XLogRecord *rptr);
extern void heap_undo(XLogRecPtr lsn, XLogRecord *rptr);
extern void heap_desc(char *buf, uint8 xl_info, char* rec);
extern void btree_redo(XLogRecPtr lsn, XLogRecord *rptr);
extern void btree_undo(XLogRecPtr lsn, XLogRecord *rptr);
extern void btree_desc(char *buf, uint8 xl_info, char* rec);
extern void hash_redo(XLogRecPtr lsn, XLogRecord *rptr);
extern void hash_undo(XLogRecPtr lsn, XLogRecord *rptr);
extern void hash_desc(char *buf, uint8 xl_info, char* rec);
extern void rtree_redo(XLogRecPtr lsn, XLogRecord *rptr);
extern void rtree_undo(XLogRecPtr lsn, XLogRecord *rptr);
extern void rtree_desc(char *buf, uint8 xl_info, char* rec);
extern void gist_redo(XLogRecPtr lsn, XLogRecord *rptr);
extern void gist_undo(XLogRecPtr lsn, XLogRecord *rptr);
extern void gist_desc(char *buf, uint8 xl_info, char* rec);
RmgrData RmgrTable[] = {
{"XLOG", xlog_redo, xlog_undo, xlog_desc},
{"Transaction", xact_redo, xact_undo, xact_desc},
{"Storage", smgr_redo, smgr_undo, smgr_desc},
{"Reserved 3", NULL, NULL, NULL},
{"Reserved 4", NULL, NULL, NULL},
{"Reserved 5", NULL, NULL, NULL},
{"Reserved 6", NULL, NULL, NULL},
{"Reserved 7", NULL, NULL, NULL},
{"Reserved 8", NULL, NULL, NULL},
{"Reserved 9", NULL, NULL, NULL},
{"Heap", heap_redo, heap_undo, heap_desc},
{"Btree", btree_redo, btree_undo, btree_desc},
{"Hash", hash_redo, hash_undo, hash_desc},
{"Rtree", rtree_redo, rtree_undo, rtree_desc},
{"Gist", gist_redo, gist_undo, gist_desc}
};
#else
RmgrData RmgrTable[] = {};
#endif
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.73 2000/10/20 11:01:04 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.74 2000/10/21 15:43:22 vadim Exp $
* *
* NOTES * NOTES
* Transaction aborts can now occur two ways: * Transaction aborts can now occur two ways:
...@@ -224,6 +224,7 @@ int CommitDelay; ...@@ -224,6 +224,7 @@ int CommitDelay;
void xact_redo(XLogRecPtr lsn, XLogRecord *record); void xact_redo(XLogRecPtr lsn, XLogRecord *record);
void xact_undo(XLogRecPtr lsn, XLogRecord *record); void xact_undo(XLogRecPtr lsn, XLogRecord *record);
void xact_desc(char *buf, uint8 xl_info, char* rec);
static void (*_RollbackFunc)(void*) = NULL; static void (*_RollbackFunc)(void*) = NULL;
static void *_RollbackData = NULL; static void *_RollbackData = NULL;
...@@ -692,6 +693,7 @@ RecordTransactionCommit() ...@@ -692,6 +693,7 @@ RecordTransactionCommit()
TransactionIdCommit(xid); TransactionIdCommit(xid);
#ifdef XLOG #ifdef XLOG
if (MyLastRecPtr.xlogid != 0 || MyLastRecPtr.xrecoff != 0)
{ {
xl_xact_commit xlrec; xl_xact_commit xlrec;
struct timeval delay; struct timeval delay;
...@@ -711,6 +713,9 @@ RecordTransactionCommit() ...@@ -711,6 +713,9 @@ RecordTransactionCommit()
delay.tv_sec = 0; delay.tv_sec = 0;
delay.tv_usec = CommitDelay; delay.tv_usec = CommitDelay;
(void) select(0, NULL, NULL, NULL, &delay); (void) select(0, NULL, NULL, NULL, &delay);
XLogFlush(recptr);
MyLastRecPtr.xlogid = 0;
MyLastRecPtr.xrecoff = 0;
} }
#endif #endif
/* /*
...@@ -823,7 +828,7 @@ RecordTransactionAbort() ...@@ -823,7 +828,7 @@ RecordTransactionAbort()
TransactionIdAbort(xid); TransactionIdAbort(xid);
#ifdef XLOG #ifdef XLOG
if (SharedBufferChanged) if (MyLastRecPtr.xlogid != 0 || MyLastRecPtr.xrecoff != 0)
{ {
xl_xact_abort xlrec; xl_xact_abort xlrec;
XLogRecPtr recptr; XLogRecPtr recptr;
...@@ -1176,6 +1181,8 @@ AbortTransaction() ...@@ -1176,6 +1181,8 @@ AbortTransaction()
AtEOXact_Files(); AtEOXact_Files();
/* Here we'll rollback xaction changes */ /* Here we'll rollback xaction changes */
MyLastRecPtr.xlogid = 0;
MyLastRecPtr.xrecoff = 0;
AtAbort_Locks(); AtAbort_Locks();
...@@ -1748,6 +1755,33 @@ xact_undo(XLogRecPtr lsn, XLogRecord *record) ...@@ -1748,6 +1755,33 @@ xact_undo(XLogRecPtr lsn, XLogRecord *record)
else if (info != XLOG_XACT_ABORT) else if (info != XLOG_XACT_ABORT)
elog(STOP, "xact_redo: unknown op code %u", info); elog(STOP, "xact_redo: unknown op code %u", info);
} }
void
xact_desc(char *buf, uint8 xl_info, char* rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit*) rec;
struct tm *tm = localtime(&xlrec->xtime);
sprintf(buf + strlen(buf), "commit: %04u-%02u-%02u %02u:%02u:%02u",
tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
tm->tm_hour, tm->tm_min, tm->tm_sec);
}
else if (info == XLOG_XACT_ABORT)
{
xl_xact_abort *xlrec = (xl_xact_abort*) rec;
struct tm *tm = localtime(&xlrec->xtime);
sprintf(buf + strlen(buf), "abort: %04u-%02u-%02u %02u:%02u:%02u",
tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
tm->tm_hour, tm->tm_min, tm->tm_sec);
}
else
strcat(buf, "UNKNOWN");
}
void void
XactPushRollback(void (*func) (void *), void* data) XactPushRollback(void (*func) (void *), void* data)
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.18 2000/10/20 11:01:04 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.19 2000/10/21 15:43:22 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -27,6 +27,8 @@ ...@@ -27,6 +27,8 @@
#include "storage/spin.h" #include "storage/spin.h"
#include "storage/s_lock.h" #include "storage/s_lock.h"
#include "miscadmin.h"
void UpdateControlFile(void); void UpdateControlFile(void);
int XLOGShmemSize(void); int XLOGShmemSize(void);
void XLOGShmemInit(void); void XLOGShmemInit(void);
...@@ -41,6 +43,9 @@ uint32 XLOGbuffers = 0; ...@@ -41,6 +43,9 @@ uint32 XLOGbuffers = 0;
XLogRecPtr MyLastRecPtr = {0, 0}; XLogRecPtr MyLastRecPtr = {0, 0};
bool StopIfError = false; bool StopIfError = false;
bool InRecovery = false; bool InRecovery = false;
StartUpID ThisStartUpID = 0;
int XLOG_DEBUG = 1;
SPINLOCK ControlFileLockId; SPINLOCK ControlFileLockId;
SPINLOCK XidGenLockId; SPINLOCK XidGenLockId;
...@@ -93,6 +98,7 @@ typedef struct XLogCtlData ...@@ -93,6 +98,7 @@ typedef struct XLogCtlData
XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */ XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */
uint32 XLogCacheByte; uint32 XLogCacheByte;
uint32 XLogCacheBlck; uint32 XLogCacheBlck;
StartUpID ThisStartUpID;
#ifdef HAS_TEST_AND_SET #ifdef HAS_TEST_AND_SET
slock_t insert_lck; slock_t insert_lck;
slock_t info_lck; slock_t info_lck;
...@@ -137,16 +143,20 @@ static ControlFileData *ControlFile = NULL; ...@@ -137,16 +143,20 @@ static ControlFileData *ControlFile = NULL;
typedef struct CheckPoint typedef struct CheckPoint
{ {
XLogRecPtr redo; /* next RecPtr available when we */ XLogRecPtr redo; /* next RecPtr available when we */
/* began to create CheckPoint */ /* began to create CheckPoint */
/* (i.e. REDO start point) */ /* (i.e. REDO start point) */
XLogRecPtr undo; /* first record of oldest in-progress */ XLogRecPtr undo; /* first record of oldest in-progress */
/* transaction when we started */ /* transaction when we started */
/* (i.e. UNDO end point) */ /* (i.e. UNDO end point) */
TransactionId nextXid; StartUpID ThisStartUpID;
Oid nextOid; TransactionId nextXid;
Oid nextOid;
bool Shutdown;
} CheckPoint; } CheckPoint;
#define XLOG_CHECKPOINT 0x00
/* /*
* We break each log file in 16Mb segments * We break each log file in 16Mb segments
*/ */
...@@ -190,6 +200,7 @@ static int XLogFileInit(uint32 log, uint32 seg); ...@@ -190,6 +200,7 @@ static int XLogFileInit(uint32 log, uint32 seg);
static int XLogFileOpen(uint32 log, uint32 seg, bool econt); static int XLogFileOpen(uint32 log, uint32 seg, bool econt);
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, char *buffer); static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, char *buffer);
static char *str_time(time_t tnow); static char *str_time(time_t tnow);
static void xlog_outrec(char *buf, XLogRecord *record);
static XLgwrResult LgwrResult = {{0, 0}, {0, 0}}; static XLgwrResult LgwrResult = {{0, 0}, {0, 0}};
static XLgwrRqst LgwrRqst = {{0, 0}, {0, 0}}; static XLgwrRqst LgwrRqst = {{0, 0}, {0, 0}};
...@@ -225,6 +236,13 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 ...@@ -225,6 +236,13 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32
if (len == 0 || len > MAXLOGRECSZ) if (len == 0 || len > MAXLOGRECSZ)
elog(STOP, "XLogInsert: invalid record len %u", len); elog(STOP, "XLogInsert: invalid record len %u", len);
if (IsBootstrapProcessingMode())
{
RecPtr.xlogid = 0;
RecPtr.xrecoff = SizeOfXLogPHD; /* start of 1st checkpoint record */
return (RecPtr);
}
/* obtain xlog insert lock */ /* obtain xlog insert lock */
if (TAS(&(XLogCtl->insert_lck))) /* busy */ if (TAS(&(XLogCtl->insert_lck))) /* busy */
{ {
...@@ -310,6 +328,23 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 ...@@ -310,6 +328,23 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32
MyProc->logRec = RecPtr; MyProc->logRec = RecPtr;
SpinRelease(SInvalLock); SpinRelease(SInvalLock);
} }
Insert->PrevRecord = RecPtr;
if (XLOG_DEBUG)
{
char buf[8192];
sprintf(buf, "INSERT @ %u/%u: ", RecPtr.xlogid, RecPtr.xrecoff);
xlog_outrec(buf, record);
if (hdr != NULL)
{
strcat(buf, " - ");
RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, hdr);
}
strcat(buf, "\n");
write(2, buf, strlen(buf));
}
MyLastRecPtr = RecPtr; /* begin of record */ MyLastRecPtr = RecPtr; /* begin of record */
Insert->currpos += SizeOfXLogRecord; Insert->currpos += SizeOfXLogRecord;
if (freespace > 0) if (freespace > 0)
...@@ -330,7 +365,7 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 ...@@ -330,7 +365,7 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32
Insert->currpos += wlen; Insert->currpos += wlen;
} }
Insert->currpos = ((char *) Insert->currpage) + Insert->currpos = ((char *) Insert->currpage) +
DOUBLEALIGN(Insert->currpos - ((char *) Insert->currpage)); MAXALIGN(Insert->currpos - ((char *) Insert->currpage));
len = hdrlen + buflen; len = hdrlen + buflen;
} }
...@@ -391,7 +426,7 @@ nbuf: ...@@ -391,7 +426,7 @@ nbuf:
/* we don't store info in subrecord' xl_info */ /* we don't store info in subrecord' xl_info */
subrecord->xl_info = 0; subrecord->xl_info = 0;
Insert->currpos = ((char *) Insert->currpage) + Insert->currpos = ((char *) Insert->currpage) +
DOUBLEALIGN(Insert->currpos - ((char *) Insert->currpage)); MAXALIGN(Insert->currpos - ((char *) Insert->currpage));
} }
freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos; freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos;
...@@ -738,7 +773,7 @@ XLogFileInit(uint32 log, uint32 seg) ...@@ -738,7 +773,7 @@ XLogFileInit(uint32 log, uint32 seg)
fd = BasicOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, S_IRUSR | S_IWUSR); fd = BasicOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0) if (fd < 0)
elog(STOP, "Open(logfile %u seg %u) failed: %d", elog(STOP, "Init(logfile %u seg %u) failed: %d",
logId, logSeg, errno); logId, logSeg, errno);
if (lseek(fd, XLogSegSize - 1, SEEK_SET) != (off_t) (XLogSegSize - 1)) if (lseek(fd, XLogSegSize - 1, SEEK_SET) != (off_t) (XLogSegSize - 1))
...@@ -777,6 +812,7 @@ XLogFileOpen(uint32 log, uint32 seg, bool econt) ...@@ -777,6 +812,7 @@ XLogFileOpen(uint32 log, uint32 seg, bool econt)
logId, logSeg); logId, logSeg);
return (fd); return (fd);
} }
abort();
elog(STOP, "Open(logfile %u seg %u) failed: %d", elog(STOP, "Open(logfile %u seg %u) failed: %d",
logId, logSeg, errno); logId, logSeg, errno);
} }
...@@ -876,7 +912,7 @@ got_record:; ...@@ -876,7 +912,7 @@ got_record:;
XLogSubRecord *subrecord; XLogSubRecord *subrecord;
uint32 len = record->xl_len; uint32 len = record->xl_len;
if (DOUBLEALIGN(record->xl_len) + RecPtr->xrecoff % BLCKSZ + if (MAXALIGN(record->xl_len) + RecPtr->xrecoff % BLCKSZ +
SizeOfXLogRecord != BLCKSZ) SizeOfXLogRecord != BLCKSZ)
{ {
elog(emode, "ReadRecord: invalid fragmented record len %u in (%u, %u)", elog(emode, "ReadRecord: invalid fragmented record len %u in (%u, %u)",
...@@ -938,7 +974,7 @@ got_record:; ...@@ -938,7 +974,7 @@ got_record:;
buffer += subrecord->xl_len; buffer += subrecord->xl_len;
if (subrecord->xl_info & XLR_TO_BE_CONTINUED) if (subrecord->xl_info & XLR_TO_BE_CONTINUED)
{ {
if (DOUBLEALIGN(subrecord->xl_len) + if (MAXALIGN(subrecord->xl_len) +
SizeOfXLogPHD + SizeOfXLogSubRecord != BLCKSZ) SizeOfXLogPHD + SizeOfXLogSubRecord != BLCKSZ)
{ {
elog(emode, "ReadRecord: invalid fragmented subrecord len %u in logfile %u seg %u off %u", elog(emode, "ReadRecord: invalid fragmented subrecord len %u in logfile %u seg %u off %u",
...@@ -949,26 +985,26 @@ got_record:; ...@@ -949,26 +985,26 @@ got_record:;
} }
break; break;
} }
if (BLCKSZ - SizeOfXLogRecord >= DOUBLEALIGN(subrecord->xl_len) + if (BLCKSZ - SizeOfXLogRecord >= MAXALIGN(subrecord->xl_len) +
SizeOfXLogPHD + SizeOfXLogSubRecord) SizeOfXLogPHD + SizeOfXLogSubRecord)
{ {
nextRecord = (XLogRecord *) ((char *) subrecord + nextRecord = (XLogRecord *) ((char *) subrecord +
DOUBLEALIGN(subrecord->xl_len) + SizeOfXLogSubRecord); MAXALIGN(subrecord->xl_len) + SizeOfXLogSubRecord);
} }
EndRecPtr.xlogid = readId; EndRecPtr.xlogid = readId;
EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ + EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ +
SizeOfXLogPHD + SizeOfXLogSubRecord + SizeOfXLogPHD + SizeOfXLogSubRecord +
DOUBLEALIGN(subrecord->xl_len); MAXALIGN(subrecord->xl_len);
ReadRecPtr = *RecPtr; ReadRecPtr = *RecPtr;
return (record); return (record);
} }
if (BLCKSZ - SizeOfXLogRecord >= DOUBLEALIGN(record->xl_len) + if (BLCKSZ - SizeOfXLogRecord >= MAXALIGN(record->xl_len) +
RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord) RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord)
nextRecord = (XLogRecord *) ((char *) record + nextRecord = (XLogRecord *) ((char *) record +
DOUBLEALIGN(record->xl_len) + SizeOfXLogRecord); MAXALIGN(record->xl_len) + SizeOfXLogRecord);
EndRecPtr.xlogid = RecPtr->xlogid; EndRecPtr.xlogid = RecPtr->xlogid;
EndRecPtr.xrecoff = RecPtr->xrecoff + EndRecPtr.xrecoff = RecPtr->xrecoff +
DOUBLEALIGN(record->xl_len) + SizeOfXLogRecord; MAXALIGN(record->xl_len) + SizeOfXLogRecord;
ReadRecPtr = *RecPtr; ReadRecPtr = *RecPtr;
return (record); return (record);
...@@ -1130,7 +1166,7 @@ BootStrapXLOG() ...@@ -1130,7 +1166,7 @@ BootStrapXLOG()
char buffer[BLCKSZ]; char buffer[BLCKSZ];
CheckPoint checkPoint; CheckPoint checkPoint;
#ifdef NOT_USED #ifdef XLOG
XLogPageHeader page = (XLogPageHeader) buffer; XLogPageHeader page = (XLogPageHeader) buffer;
XLogRecord *record; XLogRecord *record;
...@@ -1146,8 +1182,9 @@ BootStrapXLOG() ...@@ -1146,8 +1182,9 @@ BootStrapXLOG()
checkPoint.undo = checkPoint.redo; checkPoint.undo = checkPoint.redo;
checkPoint.nextXid = FirstTransactionId; checkPoint.nextXid = FirstTransactionId;
checkPoint.nextOid = BootstrapObjectIdData; checkPoint.nextOid = BootstrapObjectIdData;
checkPoint.ThisStartUpID = 0;
#ifdef NOT_USED #ifdef XLOG
memset(buffer, 0, BLCKSZ); memset(buffer, 0, BLCKSZ);
page->xlp_magic = XLOG_PAGE_MAGIC; page->xlp_magic = XLOG_PAGE_MAGIC;
...@@ -1213,7 +1250,7 @@ str_time(time_t tnow) ...@@ -1213,7 +1250,7 @@ str_time(time_t tnow)
void void
StartupXLOG() StartupXLOG()
{ {
#ifdef NOT_USED #ifdef XLOG
XLogCtlInsert *Insert; XLogCtlInsert *Insert;
CheckPoint checkPoint; CheckPoint checkPoint;
XLogRecPtr RecPtr, XLogRecPtr RecPtr,
...@@ -1291,7 +1328,7 @@ StartupXLOG() ...@@ -1291,7 +1328,7 @@ StartupXLOG()
elog(LOG, "Data Base System was interrupted being in production at %s", elog(LOG, "Data Base System was interrupted being in production at %s",
str_time(ControlFile->time)); str_time(ControlFile->time));
#ifdef NOT_USED #ifdef XLOG
LastRec = RecPtr = ControlFile->checkPoint; LastRec = RecPtr = ControlFile->checkPoint;
if (!XRecOffIsValid(RecPtr.xrecoff)) if (!XRecOffIsValid(RecPtr.xrecoff))
...@@ -1312,17 +1349,20 @@ StartupXLOG() ...@@ -1312,17 +1349,20 @@ StartupXLOG()
checkPoint.nextXid, checkPoint.nextOid); checkPoint.nextXid, checkPoint.nextOid);
if (checkPoint.nextXid < FirstTransactionId || if (checkPoint.nextXid < FirstTransactionId ||
checkPoint.nextOid < BootstrapObjectIdData) checkPoint.nextOid < BootstrapObjectIdData)
#ifdef XLOG
#ifdef XLOG_2
elog(STOP, "Invalid NextTransactionId/NextOid"); elog(STOP, "Invalid NextTransactionId/NextOid");
#else #else
elog(LOG, "Invalid NextTransactionId/NextOid"); elog(LOG, "Invalid NextTransactionId/NextOid");
#endif #endif
#ifdef XLOG #ifdef XLOG_2
ShmemVariableCache->nextXid = checkPoint.nextXid; ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->nextOid = checkPoint.nextOid;
#endif #endif
ThisStartUpID = checkPoint.ThisStartUpID;
if (XLByteLT(RecPtr, checkPoint.redo)) if (XLByteLT(RecPtr, checkPoint.redo))
elog(STOP, "Invalid redo in checkPoint record"); elog(STOP, "Invalid redo in checkPoint record");
if (checkPoint.undo.xrecoff == 0) if (checkPoint.undo.xrecoff == 0)
...@@ -1364,10 +1404,23 @@ StartupXLOG() ...@@ -1364,10 +1404,23 @@ StartupXLOG()
ReadRecPtr.xlogid, ReadRecPtr.xrecoff); ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
do do
{ {
#ifdef XLOG #ifdef XLOG_2
if (record->xl_xid >= ShmemVariableCache->nextXid) if (record->xl_xid >= ShmemVariableCache->nextXid)
ShmemVariableCache->nextXid = record->xl_xid + 1; ShmemVariableCache->nextXid = record->xl_xid + 1;
#endif #endif
if (XLOG_DEBUG)
{
char buf[8192];
sprintf(buf, "REDO @ %u/%u: ", ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
xlog_outrec(buf, record);
strcat(buf, " - ");
RmgrTable[record->xl_rmid].rm_desc(buf,
record->xl_info, XLogRecGetData(record));
strcat(buf, "\n");
write(2, buf, strlen(buf));
}
RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record); RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
record = ReadRecord(NULL, buffer); record = ReadRecord(NULL, buffer);
} while (record->xl_len != 0); } while (record->xl_len != 0);
...@@ -1422,6 +1475,7 @@ StartupXLOG() ...@@ -1422,6 +1475,7 @@ StartupXLOG()
if (recovery > 0) if (recovery > 0)
{ {
#ifdef NOT_USED
int i; int i;
/* /*
...@@ -1429,21 +1483,37 @@ StartupXLOG() ...@@ -1429,21 +1483,37 @@ StartupXLOG()
*/ */
for (i = 0; i <= RM_MAX_ID; i++) for (i = 0; i <= RM_MAX_ID; i++)
RmgrTable[record->xl_rmid].rm_redo(ReadRecPtr, NULL); RmgrTable[record->xl_rmid].rm_redo(ReadRecPtr, NULL);
#endif
CreateCheckPoint(true); CreateCheckPoint(true);
StopIfError = sie_saved; StopIfError = sie_saved;
} }
#endif /* NOT_USED */ #endif /* XLOG */
ControlFile->state = DB_IN_PRODUCTION; ControlFile->state = DB_IN_PRODUCTION;
ControlFile->time = time(NULL); ControlFile->time = time(NULL);
UpdateControlFile(); UpdateControlFile();
ThisStartUpID++;
XLogCtl->ThisStartUpID = ThisStartUpID;
elog(LOG, "Data Base System is in production state at %s", str_time(time(NULL))); elog(LOG, "Data Base System is in production state at %s", str_time(time(NULL)));
return; return;
} }
/*
* Postmaster uses it to set ThisStartUpID from XLogCtlData
* located in shmem after successful startup.
*/
void SetThisStartUpID(void);
void
SetThisStartUpID(void)
{
ThisStartUpID = XLogCtl->ThisStartUpID;
}
/* /*
* This func must be called ONCE on system shutdown * This func must be called ONCE on system shutdown
*/ */
...@@ -1461,7 +1531,7 @@ ShutdownXLOG() ...@@ -1461,7 +1531,7 @@ ShutdownXLOG()
void void
CreateCheckPoint(bool shutdown) CreateCheckPoint(bool shutdown)
{ {
#ifdef NOT_USED #ifdef XLOG
CheckPoint checkPoint; CheckPoint checkPoint;
XLogRecPtr recptr; XLogRecPtr recptr;
XLogCtlInsert *Insert = &XLogCtl->Insert; XLogCtlInsert *Insert = &XLogCtl->Insert;
...@@ -1475,6 +1545,8 @@ CreateCheckPoint(bool shutdown) ...@@ -1475,6 +1545,8 @@ CreateCheckPoint(bool shutdown)
ControlFile->time = time(NULL); ControlFile->time = time(NULL);
UpdateControlFile(); UpdateControlFile();
} }
checkPoint.ThisStartUpID = ThisStartUpID;
checkPoint.Shutdown = shutdown;
/* Get REDO record ptr */ /* Get REDO record ptr */
while (TAS(&(XLogCtl->insert_lck))) while (TAS(&(XLogCtl->insert_lck)))
...@@ -1517,20 +1589,21 @@ CreateCheckPoint(bool shutdown) ...@@ -1517,20 +1589,21 @@ CreateCheckPoint(bool shutdown)
if (shutdown && checkPoint.undo.xrecoff != 0) if (shutdown && checkPoint.undo.xrecoff != 0)
elog(STOP, "Active transaction while data base is shutting down"); elog(STOP, "Active transaction while data base is shutting down");
recptr = XLogInsert(RM_XLOG_ID, (char *) &checkPoint, sizeof(checkPoint), NULL, 0); recptr = XLogInsert(RM_XLOG_ID, XLOG_CHECKPOINT, (char *) &checkPoint,
sizeof(checkPoint), NULL, 0);
if (shutdown && !XLByteEQ(checkPoint.redo, MyLastRecPtr)) if (shutdown && !XLByteEQ(checkPoint.redo, MyLastRecPtr))
elog(STOP, "XLog concurrent activity while data base is shutting down"); elog(STOP, "XLog concurrent activity while data base is shutting down");
XLogFlush(recptr); XLogFlush(recptr);
#endif /* NOT_USED */ #endif /* XLOG */
SpinAcquire(ControlFileLockId); SpinAcquire(ControlFileLockId);
if (shutdown) if (shutdown)
ControlFile->state = DB_SHUTDOWNED; ControlFile->state = DB_SHUTDOWNED;
#ifdef NOT_USED #ifdef XLOG
ControlFile->checkPoint = MyLastRecPtr; ControlFile->checkPoint = MyLastRecPtr;
#else #else
ControlFile->checkPoint.xlogid = 0; ControlFile->checkPoint.xlogid = 0;
...@@ -1543,3 +1616,47 @@ CreateCheckPoint(bool shutdown) ...@@ -1543,3 +1616,47 @@ CreateCheckPoint(bool shutdown)
return; return;
} }
void xlog_redo(XLogRecPtr lsn, XLogRecord *record);
void xlog_undo(XLogRecPtr lsn, XLogRecord *record);
void xlog_desc(char *buf, uint8 xl_info, char* rec);
void
xlog_redo(XLogRecPtr lsn, XLogRecord *record)
{
}
void
xlog_undo(XLogRecPtr lsn, XLogRecord *record)
{
}
void
xlog_desc(char *buf, uint8 xl_info, char* rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
if (info == XLOG_CHECKPOINT)
{
CheckPoint *checkpoint = (CheckPoint*) rec;
sprintf(buf + strlen(buf), "checkpoint: redo %u/%u; undo %u/%u; "
"sui %u; xid %u; oid %u; %s",
checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
checkpoint->ThisStartUpID, checkpoint->nextXid,
checkpoint->nextOid,
(checkpoint->Shutdown) ? "shutdown" : "online");
}
else
strcat(buf, "UNKNOWN");
}
static void
xlog_outrec(char *buf, XLogRecord *record)
{
sprintf(buf + strlen(buf), "prev %u/%u; xprev %u/%u; xid %u: %s",
record->xl_prev.xlogid, record->xl_prev.xrecoff,
record->xl_xact_prev.xlogid, record->xl_xact_prev.xrecoff,
record->xl_xid,
RmgrTable[record->xl_rmid].rm_name);
}
...@@ -276,6 +276,9 @@ _xl_init_rel_cache(void) ...@@ -276,6 +276,9 @@ _xl_init_rel_cache(void)
_xlpgcarr = (Form_pg_class) malloc(sizeof(FormData_pg_class) * _xlcnt); _xlpgcarr = (Form_pg_class) malloc(sizeof(FormData_pg_class) * _xlcnt);
memset(_xlpgcarr, 0, sizeof(XLogRelDesc) * _xlcnt); memset(_xlpgcarr, 0, sizeof(XLogRelDesc) * _xlcnt);
_xlrelarr[0].moreRecently = &(_xlrelarr[0]);
_xlrelarr[0].lessRecently = &(_xlrelarr[0]);
memset(&ctl, 0, (int) sizeof(ctl)); memset(&ctl, 0, (int) sizeof(ctl));
ctl.keysize = sizeof(RelFileNode); ctl.keysize = sizeof(RelFileNode);
ctl.datasize = sizeof(XLogRelDesc*); ctl.datasize = sizeof(XLogRelDesc*);
...@@ -383,6 +386,7 @@ XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode) ...@@ -383,6 +386,7 @@ XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode)
hentry->rdesc = res; hentry->rdesc = res;
res->reldata.rd_unlinked = true; /* look smgropen */ res->reldata.rd_unlinked = true; /* look smgropen */
res->reldata.rd_fd = -1;
res->reldata.rd_fd = smgropen(DEFAULT_SMGR, &(res->reldata)); res->reldata.rd_fd = smgropen(DEFAULT_SMGR, &(res->reldata));
} }
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/bootstrap/bootstrap.c,v 1.93 2000/09/06 14:15:14 petere Exp $ * $Header: /cvsroot/pgsql/src/backend/bootstrap/bootstrap.c,v 1.94 2000/10/21 15:43:24 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -345,6 +345,7 @@ BootstrapMain(int argc, char *argv[]) ...@@ -345,6 +345,7 @@ BootstrapMain(int argc, char *argv[])
if (IsUnderPostmaster && xloginit) if (IsUnderPostmaster && xloginit)
{ {
SetProcessingMode(NormalProcessing);
StartupXLOG(); StartupXLOG();
proc_exit(0); proc_exit(0);
} }
...@@ -360,6 +361,7 @@ BootstrapMain(int argc, char *argv[]) ...@@ -360,6 +361,7 @@ BootstrapMain(int argc, char *argv[])
if (IsUnderPostmaster && !xloginit) if (IsUnderPostmaster && !xloginit)
{ {
SetProcessingMode(NormalProcessing);
ShutdownXLOG(); ShutdownXLOG();
proc_exit(0); proc_exit(0);
} }
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.172 2000/10/16 14:52:08 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.173 2000/10/21 15:43:26 vadim Exp $
* *
* NOTES * NOTES
* *
...@@ -220,6 +220,10 @@ extern char *optarg; ...@@ -220,6 +220,10 @@ extern char *optarg;
extern int optind, extern int optind,
opterr; opterr;
extern char XLogDir[];
extern char ControlFilePath[];
extern void SetThisStartUpID(void);
/* /*
* postmaster.c - function prototypes * postmaster.c - function prototypes
*/ */
...@@ -600,6 +604,10 @@ PostmasterMain(int argc, char *argv[]) ...@@ -600,6 +604,10 @@ PostmasterMain(int argc, char *argv[])
/* set up shared memory and semaphores */ /* set up shared memory and semaphores */
reset_shared(PostPortName); reset_shared(PostPortName);
/* Init XLOG pathes */
snprintf(XLogDir, MAXPGPATH, "%s/pg_xlog", DataDir);
snprintf(ControlFilePath, MAXPGPATH, "%s/global/pg_control", DataDir);
/* /*
* Initialize the list of active backends. This list is only used for * Initialize the list of active backends. This list is only used for
* garbage collecting the backend processes. * garbage collecting the backend processes.
...@@ -1449,6 +1457,12 @@ reaper(SIGNAL_ARGS) ...@@ -1449,6 +1457,12 @@ reaper(SIGNAL_ARGS)
abort(); abort();
ShutdownPID = ShutdownDataBase(); ShutdownPID = ShutdownDataBase();
} }
/*
* Startup succeeded - remember its ID
*/
SetThisStartUpID();
pqsignal(SIGCHLD, reaper); pqsignal(SIGCHLD, reaper);
return; return;
} }
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.88 2000/10/20 11:01:07 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.89 2000/10/21 15:43:27 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -609,7 +609,7 @@ BufferAlloc(Relation reln, ...@@ -609,7 +609,7 @@ BufferAlloc(Relation reln,
} }
/* record the database name and relation name for this buffer */ /* record the database name and relation name for this buffer */
strcpy(buf->blind.dbname, DatabaseName); strcpy(buf->blind.dbname, (DatabaseName) ? DatabaseName : "Recovery");
strcpy(buf->blind.relname, RelationGetPhysicalRelationName(reln)); strcpy(buf->blind.relname, RelationGetPhysicalRelationName(reln));
buf->relId = reln->rd_lockInfo.lockRelId; buf->relId = reln->rd_lockInfo.lockRelId;
...@@ -1168,8 +1168,9 @@ BufferSync() ...@@ -1168,8 +1168,9 @@ BufferSync()
SpinRelease(BufMgrLock); SpinRelease(BufMgrLock);
} }
#ifndef XLOG
LocalBufferSync(); LocalBufferSync();
#endif
} }
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/page/bufpage.c,v 1.32 2000/10/20 11:28:39 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/page/bufpage.c,v 1.33 2000/10/21 15:43:29 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -245,7 +245,11 @@ itemidcompare(const void *itemidp1, const void *itemidp2) ...@@ -245,7 +245,11 @@ itemidcompare(const void *itemidp1, const void *itemidp2)
/* /*
* PageRepairFragmentation * PageRepairFragmentation
* Frees fragmented space on a page. *
* Frees fragmented space on a page.
* It doesn't remove unused line pointers! Please don't change this.
* This routine is usable for heap pages only.
*
*/ */
void void
PageRepairFragmentation(Page page) PageRepairFragmentation(Page page)
...@@ -264,6 +268,8 @@ PageRepairFragmentation(Page page) ...@@ -264,6 +268,8 @@ PageRepairFragmentation(Page page)
for (i = 0; i < nline; i++) for (i = 0; i < nline; i++)
{ {
lp = ((PageHeader) page)->pd_linp + i; lp = ((PageHeader) page)->pd_linp + i;
if ((*lp).lp_flags & LP_DELETE) /* marked for deletion */
(*lp).lp_flags &= ~(LP_USED | LP_DELETE);
if ((*lp).lp_flags & LP_USED) if ((*lp).lp_flags & LP_USED)
nused++; nused++;
} }
...@@ -343,6 +349,31 @@ PageGetFreeSpace(Page page) ...@@ -343,6 +349,31 @@ PageGetFreeSpace(Page page)
return space; return space;
} }
/*
* PageRepairFragmentation un-useful for index page cleanup because
* of it doesn't remove line pointers. This routine could be more
* effective but ... no time -:)
*/
void
IndexPageCleanup(Buffer buffer)
{
Page page = (Page) BufferGetPage(buffer);
ItemId lp;
OffsetNumber maxoff;
OffsetNumber i;
maxoff = PageGetMaxOffsetNumber(page);
for (i = 0; i < maxoff; i++)
{
lp = ((PageHeader) page)->pd_linp + i;
if ((*lp).lp_flags & LP_DELETE) /* marked for deletion */
{
PageIndexTupleDelete(page, i + 1);
maxoff--;
}
}
}
/* /*
*---------------------------------------------------------------- *----------------------------------------------------------------
* PageIndexTupleDelete * PageIndexTupleDelete
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.40 2000/10/16 14:52:12 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.41 2000/10/21 15:43:31 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -539,3 +539,26 @@ smgriswo(int16 smgrno) ...@@ -539,3 +539,26 @@ smgriswo(int16 smgrno)
} }
#endif #endif
#ifdef XLOG
#include "access/xlog.h"
void smgr_redo(XLogRecPtr lsn, XLogRecord *record);
void smgr_undo(XLogRecPtr lsn, XLogRecord *record);
void smgr_desc(char *buf, uint8 xl_info, char* rec);
void
smgr_redo(XLogRecPtr lsn, XLogRecord *record)
{
}
void
smgr_undo(XLogRecPtr lsn, XLogRecord *record)
{
}
void
smgr_desc(char *buf, uint8 xl_info, char* rec)
{
}
#endif
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: nbtree.h,v 1.45 2000/10/13 12:05:22 vadim Exp $ * $Id: nbtree.h,v 1.46 2000/10/21 15:43:33 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -266,9 +266,9 @@ typedef struct xl_btree_insert ...@@ -266,9 +266,9 @@ typedef struct xl_btree_insert
typedef struct xl_btree_split typedef struct xl_btree_split
{ {
xl_btreetid target; /* inserted tuple id */ xl_btreetid target; /* inserted tuple id */
BlockId otherblk; /* second block participated in split: */ BlockIdData otherblk; /* second block participated in split: */
/* first one is stored in target' tid */ /* first one is stored in target' tid */
BlockId rightblk; /* next right block */ BlockIdData rightblk; /* next right block */
/* /*
* We log all btitems from the right sibling. If new btitem goes on * We log all btitems from the right sibling. If new btitem goes on
* the left sibling then we log it too and it will be the first * the left sibling then we log it too and it will be the first
...@@ -277,7 +277,7 @@ typedef struct xl_btree_split ...@@ -277,7 +277,7 @@ typedef struct xl_btree_split
*/ */
} xl_btree_split; } xl_btree_split;
#define SizeOfBtreeSplit (offsetof(xl_btree_insert, rightblk) + sizeof(BlockId)) #define SizeOfBtreeSplit (offsetof(xl_btree_split, rightblk) + sizeof(BlockIdData))
/* /*
* New root log record. * New root log record.
...@@ -285,11 +285,11 @@ typedef struct xl_btree_split ...@@ -285,11 +285,11 @@ typedef struct xl_btree_split
typedef struct xl_btree_newroot typedef struct xl_btree_newroot
{ {
RelFileNode node; RelFileNode node;
BlockId rootblk; BlockIdData rootblk;
/* 0 or 2 BTITEMS FOLLOW AT END OF STRUCT */ /* 0 or 2 BTITEMS FOLLOW AT END OF STRUCT */
} xl_btree_newroot; } xl_btree_newroot;
#define SizeOfBtreeNewroot (offsetof(xl_btree_newroot, rootblk) + sizeof(BlockId)) #define SizeOfBtreeNewroot (offsetof(xl_btree_newroot, rootblk) + sizeof(BlockIdData))
/* end of XLOG stuff */ /* end of XLOG stuff */
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
* *
* rmgr.h * rmgr.h
* *
* Resource managers description table * Resource managers definition
* *
*/ */
#ifndef RMGR_H #ifndef RMGR_H
...@@ -10,15 +10,6 @@ ...@@ -10,15 +10,6 @@
typedef uint8 RmgrId; typedef uint8 RmgrId;
typedef struct RmgrData
{
char *rm_name;
void (*rm_redo)(); /* REDO(XLogRecPtr lsn, XLogRecord rptr) */
void (*rm_undo)(); /* UNDO(XLogRecPtr lsn, XLogRecord rptr) */
} RmgrData;
extern RmgrData *RmgrTable;
/* /*
* Built-in resource managers * Built-in resource managers
*/ */
......
...@@ -90,6 +90,17 @@ typedef XLogPageHeaderData *XLogPageHeader; ...@@ -90,6 +90,17 @@ typedef XLogPageHeaderData *XLogPageHeader;
typedef uint32 StartUpID; typedef uint32 StartUpID;
extern StartUpID ThisStartUpID; extern StartUpID ThisStartUpID;
extern bool InRecovery; extern bool InRecovery;
extern XLogRecPtr MyLastRecPtr;
typedef struct RmgrData
{
char *rm_name;
void (*rm_redo)(XLogRecPtr lsn, XLogRecord *rptr);
void (*rm_undo)(XLogRecPtr lsn, XLogRecord *rptr);
void (*rm_desc)(char *buf, uint8 xl_info, char *rec);
} RmgrData;
extern RmgrData RmgrTable[];
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info,
char *hdr, uint32 hdrlen, char *hdr, uint32 hdrlen,
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: bufpage.h,v 1.33 2000/10/20 11:01:21 vadim Exp $ * $Id: bufpage.h,v 1.34 2000/10/21 15:43:36 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -324,6 +324,7 @@ extern void PageRestoreTempPage(Page tempPage, Page oldPage); ...@@ -324,6 +324,7 @@ extern void PageRestoreTempPage(Page tempPage, Page oldPage);
extern void PageRepairFragmentation(Page page); extern void PageRepairFragmentation(Page page);
extern Size PageGetFreeSpace(Page page); extern Size PageGetFreeSpace(Page page);
extern void PageIndexTupleDelete(Page page, OffsetNumber offset); extern void PageIndexTupleDelete(Page page, OffsetNumber offset);
extern void IndexPageCleanup(Buffer buffer);
#endif /* BUFPAGE_H */ #endif /* BUFPAGE_H */
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: itemid.h,v 1.13 2000/10/20 11:01:21 vadim Exp $ * $Id: itemid.h,v 1.14 2000/10/21 15:43:36 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -31,15 +31,11 @@ typedef ItemIdData *ItemId; ...@@ -31,15 +31,11 @@ typedef ItemIdData *ItemId;
*/ */
#define LP_USED 0x01 /* this line pointer is being used */ #define LP_USED 0x01 /* this line pointer is being used */
#ifdef XLOG
#define LP_DELETE 0x02 /* item is to be deleted */ #define LP_DELETE 0x02 /* item is to be deleted */
#define ItemIdDeleted(itemId) \ #define ItemIdDeleted(itemId) \
(((itemId)->lp_flags & LP_DELETE) != 0) (((itemId)->lp_flags & LP_DELETE) != 0)
#endif
/* /*
* This bit may be passed to PageAddItem together with * This bit may be passed to PageAddItem together with
* LP_USED & LP_DELETED bits to specify overwrite mode * LP_USED & LP_DELETED bits to specify overwrite mode
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment