Commit 96ef3b8f authored by Simon Riggs's avatar Simon Riggs

Allow I/O reliability checks using 16-bit checksums

Checksums are set immediately prior to flush out of shared buffers
and checked when pages are read in again. Hint bit setting will
require full page write when block is dirtied, which causes various
infrastructure changes. Extensive comments, docs and README.

WARNING message thrown if checksum fails on non-all zeroes page;
ERROR thrown but can be disabled with ignore_checksum_failure = on.

Feature enabled by an initdb option, since transition from option off
to option on is long and complex and has not yet been implemented.
Default is not to use checksums.

Checksum used is WAL CRC-32 truncated to 16-bits.

Simon Riggs, Jeff Davis, Greg Smith
Wide input and assistance from many community members. Thank you.
parent e4a05c75
......@@ -56,6 +56,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
bool got_toast = false;
bool got_date_is_int = false;
bool got_float8_pass_by_value = false;
bool got_data_checksums = false;
char *lc_collate = NULL;
char *lc_ctype = NULL;
char *lc_monetary = NULL;
......@@ -131,6 +132,13 @@ get_control_data(ClusterInfo *cluster, bool live_check)
got_float8_pass_by_value = true;
}
/* Only in <= 9.2 */
if (GET_MAJOR_VERSION(cluster->major_version) <= 902)
{
cluster->controldata.data_checksums = false;
got_data_checksums = true;
}
/* we have the result of cmd in "output". so parse it line by line now */
while (fgets(bufin, sizeof(bufin), output))
{
......@@ -393,6 +401,18 @@ get_control_data(ClusterInfo *cluster, bool live_check)
cluster->controldata.float8_pass_by_value = strstr(p, "by value") != NULL;
got_float8_pass_by_value = true;
}
else if ((p = strstr(bufin, "checksums")) != NULL)
{
p = strchr(p, ':');
if (p == NULL || strlen(p) <= 1)
pg_log(PG_FATAL, "%d: controldata retrieval problem\n", __LINE__);
p++; /* removing ':' char */
/* used later for contrib check */
cluster->controldata.data_checksums = strstr(p, "enabled") != NULL;
got_data_checksums = true;
}
/* In pre-8.4 only */
else if ((p = strstr(bufin, "LC_COLLATE:")) != NULL)
{
......@@ -476,7 +496,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
!got_tli ||
!got_align || !got_blocksz || !got_largesz || !got_walsz ||
!got_walseg || !got_ident || !got_index || !got_toast ||
!got_date_is_int || !got_float8_pass_by_value)
!got_date_is_int || !got_float8_pass_by_value || !got_data_checksums)
{
pg_log(PG_REPORT,
"The %s cluster lacks some required control information:\n",
......@@ -535,6 +555,10 @@ get_control_data(ClusterInfo *cluster, bool live_check)
if (!got_float8_pass_by_value)
pg_log(PG_REPORT, " float8 argument passing method\n");
/* value added in Postgres 9.3 */
if (!got_data_checksums)
pg_log(PG_REPORT, " data checksums\n");
pg_log(PG_FATAL,
"Cannot continue without required control information, terminating\n");
}
......@@ -596,6 +620,12 @@ check_control_data(ControlData *oldctrl,
"--disable-integer-datetimes or get server binaries built with those\n"
"options.\n");
}
if (oldctrl->data_checksums != newctrl->data_checksums)
{
pg_log(PG_FATAL,
"old and new pg_controldata checksums settings are invalid or do not match\n");
}
}
......
......@@ -202,6 +202,7 @@ typedef struct
uint32 toast;
bool date_is_int;
bool float8_pass_by_value;
bool data_checksums;
char *lc_collate;
char *lc_ctype;
char *encoding;
......
......@@ -6629,6 +6629,30 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
</listitem>
</varlistentry>
<varlistentry id="guc-ignore-checksum-failure" xreflabel="ignore_checksum_failure">
<term><varname>ignore_checksum_failure</varname> (<type>boolean</type>)</term>
<indexterm>
<primary><varname>ignore_checksum_failure</> configuration parameter</primary>
</indexterm>
<listitem>
<para>
Only has effect if <xref linkend="app-initdb-data-checksums"> are enabled.
</para>
<para>
Detection of a checksum failure during a read normally causes
<productname>PostgreSQL</> to report an error, aborting the current
transaction. Setting <varname>ignore_checksum_failure</> to on causes
the system to ignore the failure (but still report a warning), and
continue processing. This behavior may <emphasis>cause crashes, propagate
or hide corruption, or other serious problems</>. However, it may allow
you to get past the error and retrieve undamaged tuples that might still be
present in the table if the block header is still sane. If the header is
corrupt an error will be reported even if this option is enabled. The
default setting is <literal>off</>, and it can only be changed by a superuser.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-zero-damaged-pages" xreflabel="zero_damaged_pages">
<term><varname>zero_damaged_pages</varname> (<type>boolean</type>)</term>
<indexterm>
......
......@@ -182,6 +182,20 @@ PostgreSQL documentation
</listitem>
</varlistentry>
<varlistentry id="app-initdb-data-checksums" xreflabel="data checksums">
<term><option>-k</option></term>
<term><option>--data-checksums</option></term>
<listitem>
<para>
Use checksums on data pages to help detect corruption by the
I/O system that would otherwise be silent. Enabling checksums
may incur a noticeable performance penalty. This option can only
be set during initialization, and cannot be changed later. If
set, checksums are calculated for all objects, in all databases.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--locale=<replaceable>locale</replaceable></option></term>
<listitem>
......
......@@ -362,8 +362,12 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
{
/* Creating index-page GISTSearchItem */
item->blkno = ItemPointerGetBlockNumber(&it->t_tid);
/* lsn of current page is lsn of parent page for child */
item->data.parentlsn = PageGetLSN(page);
/*
* LSN of current page is lsn of parent page for child. We only
* have a shared lock, so we need to get the LSN atomically.
*/
item->data.parentlsn = BufferGetLSNAtomic(buffer);
}
/* Insert it into the queue using new distance data */
......
......@@ -285,11 +285,9 @@ hashgettuple(PG_FUNCTION_ARGS)
ItemIdMarkDead(PageGetItemId(page, offnum));
/*
* Since this can be redone later if needed, it's treated the same
* as a commit-hint-bit status update for heap tuples: we mark the
* buffer dirty but don't make a WAL log entry.
* Since this can be redone later if needed, mark as a hint.
*/
SetBufferCommitInfoNeedsSave(buf);
MarkBufferDirtyHint(buf);
}
/*
......
......@@ -5754,17 +5754,23 @@ log_heap_freeze(Relation reln, Buffer buffer,
* being marked all-visible, and vm_buffer is the buffer containing the
* corresponding visibility map block. Both should have already been modified
* and dirtied.
*
* If checksums are enabled, we also add the heap_buffer to the chain to
* protect it from being torn.
*/
XLogRecPtr
log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer,
log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
TransactionId cutoff_xid)
{
xl_heap_visible xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
XLogRecData rdata[3];
Assert(BufferIsValid(heap_buffer));
Assert(BufferIsValid(vm_buffer));
xlrec.node = rnode;
xlrec.block = block;
xlrec.block = BufferGetBlockNumber(heap_buffer);
xlrec.cutoff_xid = cutoff_xid;
rdata[0].data = (char *) &xlrec;
......@@ -5778,6 +5784,17 @@ log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer,
rdata[1].buffer_std = false;
rdata[1].next = NULL;
if (DataChecksumsEnabled())
{
rdata[1].next = &(rdata[2]);
rdata[2].data = NULL;
rdata[2].len = 0;
rdata[2].buffer = heap_buffer;
rdata[2].buffer_std = true;
rdata[2].next = NULL;
}
recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VISIBLE, rdata);
return recptr;
......@@ -6139,8 +6156,6 @@ static void
heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
{
xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
Buffer buffer;
Page page;
/*
* If there are any Hot Standby transactions running that have an xmin
......@@ -6154,13 +6169,27 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
if (InHotStandby)
ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, xlrec->node);
/*
* If heap block was backed up, restore it. This can only happen with
* checksums enabled.
*/
if (record->xl_info & XLR_BKP_BLOCK(1))
{
Assert(DataChecksumsEnabled());
(void) RestoreBackupBlock(lsn, record, 1, false, false);
}
else
{
Buffer buffer;
Page page;
/*
* Read the heap page, if it still exists. If the heap file has been
* dropped or truncated later in recovery, we don't need to update the
* page, but we'd better still update the visibility map.
*/
buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block,
RBM_NORMAL);
buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM,
xlrec->block, RBM_NORMAL);
if (BufferIsValid(buffer))
{
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
......@@ -6168,17 +6197,19 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
page = (Page) BufferGetPage(buffer);
/*
* We don't bump the LSN of the heap page when setting the visibility
* map bit, because that would generate an unworkable volume of
* full-page writes. This exposes us to torn page hazards, but since
* we're not inspecting the existing page contents in any way, we
* don't care.
* We don't bump the LSN of the heap page when setting the
* visibility map bit (unless checksums are enabled, in which case
* we must), because that would generate an unworkable volume of
* full-page writes. This exposes us to torn page hazards, but
* since we're not inspecting the existing page contents in any
* way, we don't care.
*
* However, all operations that clear the visibility map bit *do* bump
* the LSN, and those operations will only be replayed if the XLOG LSN
* follows the page LSN. Thus, if the page LSN has advanced past our
* XLOG record's LSN, we mustn't mark the page all-visible, because
* the subsequent update won't be replayed to clear the flag.
* However, all operations that clear the visibility map bit *do*
* bump the LSN, and those operations will only be replayed if the
* XLOG LSN follows the page LSN. Thus, if the page LSN has
* advanced past our XLOG record's LSN, we mustn't mark the page
* all-visible, because the subsequent update won't be replayed to
* clear the flag.
*/
if (lsn > PageGetLSN(page))
{
......@@ -6189,6 +6220,7 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
/* Done with heap page. */
UnlockReleaseBuffer(buffer);
}
}
/*
* Even if we skipped the heap page update due to the LSN interlock, it's
......@@ -6218,7 +6250,7 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
* real harm is done; and the next VACUUM will fix it.
*/
if (lsn > PageGetLSN(BufferGetPage(vmbuffer)))
visibilitymap_set(reln, xlrec->block, lsn, vmbuffer,
visibilitymap_set(reln, xlrec->block, InvalidBuffer, lsn, vmbuffer,
xlrec->cutoff_xid);
ReleaseBuffer(vmbuffer);
......
......@@ -262,7 +262,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
{
((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
PageClearFull(page);
SetBufferCommitInfoNeedsSave(buffer);
MarkBufferDirtyHint(buffer);
}
}
......
......@@ -273,6 +273,8 @@ end_heap_rewrite(RewriteState state)
/* Write the last page, if any */
if (state->rs_buffer_valid)
{
PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
if (state->rs_use_wal)
log_newpage(&state->rs_new_rel->rd_node,
MAIN_FORKNUM,
......@@ -614,6 +616,8 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
{
/* Doesn't fit, so write out the existing page */
PageSetChecksumInplace(page, state->rs_blockno);
/* XLOG stuff */
if (state->rs_use_wal)
log_newpage(&state->rs_new_rel->rd_node,
......
......@@ -233,13 +233,18 @@ visibilitymap_pin_ok(BlockNumber heapBlk, Buffer buf)
* marked all-visible; it is needed for Hot Standby, and can be
* InvalidTransactionId if the page contains no tuples.
*
* Caller is expected to set the heap page's PD_ALL_VISIBLE bit before calling
* this function. Except in recovery, caller should also pass the heap
* buffer. When checksums are enabled and we're not in recovery, we must add
* the heap buffer to the WAL chain to protect it from being torn.
*
* You must pass a buffer containing the correct map page to this function.
* Call visibilitymap_pin first to pin the right one. This function doesn't do
* any I/O.
*/
void
visibilitymap_set(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
Buffer buf, TransactionId cutoff_xid)
visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid)
{
BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
uint32 mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
......@@ -252,34 +257,55 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
#endif
Assert(InRecovery || XLogRecPtrIsInvalid(recptr));
Assert(InRecovery || BufferIsValid(heapBuf));
/* Check that we have the right page pinned */
if (!BufferIsValid(buf) || BufferGetBlockNumber(buf) != mapBlock)
elog(ERROR, "wrong buffer passed to visibilitymap_set");
/* Check that we have the right heap page pinned, if present */
if (BufferIsValid(heapBuf) && BufferGetBlockNumber(heapBuf) != heapBlk)
elog(ERROR, "wrong heap buffer passed to visibilitymap_set");
page = BufferGetPage(buf);
/* Check that we have the right VM page pinned */
if (!BufferIsValid(vmBuf) || BufferGetBlockNumber(vmBuf) != mapBlock)
elog(ERROR, "wrong VM buffer passed to visibilitymap_set");
page = BufferGetPage(vmBuf);
map = PageGetContents(page);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
LockBuffer(vmBuf, BUFFER_LOCK_EXCLUSIVE);
if (!(map[mapByte] & (1 << mapBit)))
{
START_CRIT_SECTION();
map[mapByte] |= (1 << mapBit);
MarkBufferDirty(buf);
MarkBufferDirty(vmBuf);
if (RelationNeedsWAL(rel))
{
if (XLogRecPtrIsInvalid(recptr))
recptr = log_heap_visible(rel->rd_node, heapBlk, buf,
{
Assert(!InRecovery);
recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf,
cutoff_xid);
/*
* If data checksums are enabled, we need to protect the heap
* page from being torn.
*/
if (DataChecksumsEnabled())
{
Page heapPage = BufferGetPage(heapBuf);
/* caller is expected to set PD_ALL_VISIBLE first */
Assert(PageIsAllVisible(heapPage));
PageSetLSN(heapPage, recptr);
}
}
PageSetLSN(page, recptr);
}
END_CRIT_SECTION();
}
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
LockBuffer(vmBuf, BUFFER_LOCK_UNLOCK);
}
/*
......@@ -579,6 +605,8 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
/* Now extend the file */
while (vm_nblocks_now < vm_nblocks)
{
PageSetChecksumInplace(pg, vm_nblocks_now);
smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, vm_nblocks_now,
(char *) pg, false);
vm_nblocks_now++;
......
......@@ -407,11 +407,15 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
*/
ItemIdMarkDead(curitemid);
opaque->btpo_flags |= BTP_HAS_GARBAGE;
/* be sure to mark the proper buffer dirty... */
/*
* Mark buffer with a dirty hint, since state is not
* crucial. Be sure to mark the proper buffer dirty.
*/
if (nbuf != InvalidBuffer)
SetBufferCommitInfoNeedsSave(nbuf);
MarkBufferDirtyHint(nbuf);
else
SetBufferCommitInfoNeedsSave(buf);
MarkBufferDirtyHint(buf);
}
}
}
......
......@@ -217,6 +217,7 @@ btbuildempty(PG_FUNCTION_ARGS)
_bt_initmetapage(metapage, P_NONE, 0);
/* Write the page. If archiving/streaming, XLOG it. */
PageSetChecksumInplace(metapage, BTREE_METAPAGE);
smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE,
(char *) metapage, true);
if (XLogIsNeeded())
......@@ -1051,7 +1052,7 @@ restart:
opaque->btpo_cycleid == vstate->cycleid)
{
opaque->btpo_cycleid = 0;
SetBufferCommitInfoNeedsSave(buf);
MarkBufferDirtyHint(buf);
}
}
......
......@@ -288,12 +288,15 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
{
if (!wstate->btws_zeropage)
wstate->btws_zeropage = (Page) palloc0(BLCKSZ);
/* don't set checksum for all-zero page */
smgrextend(wstate->index->rd_smgr, MAIN_FORKNUM,
wstate->btws_pages_written++,
(char *) wstate->btws_zeropage,
true);
}
PageSetChecksumInplace(page, blkno);
/*
* Now write the page. There's no need for smgr to schedule an fsync for
* this write; we'll do it ourselves before ending the build.
......
......@@ -1781,9 +1781,7 @@ _bt_killitems(IndexScanDesc scan, bool haveLock)
}
/*
* Since this can be redone later if needed, it's treated the same as a
* commit-hint-bit status update for heap tuples: we mark the buffer dirty
* but don't make a WAL log entry.
* Since this can be redone later if needed, mark as dirty hint.
*
* Whenever we mark anything LP_DEAD, we also set the page's
* BTP_HAS_GARBAGE flag, which is likewise just a hint.
......@@ -1791,7 +1789,7 @@ _bt_killitems(IndexScanDesc scan, bool haveLock)
if (killedsomething)
{
opaque->btpo_flags |= BTP_HAS_GARBAGE;
SetBufferCommitInfoNeedsSave(so->currPos.buf);
MarkBufferDirtyHint(so->currPos.buf);
}
if (!haveLock)
......
......@@ -81,6 +81,10 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
appendStringInfo(buf, "restore point: %s", xlrec->rp_name);
}
else if (info == XLOG_HINT)
{
appendStringInfo(buf, "page hint");
}
else if (info == XLOG_BACKUP_END)
{
XLogRecPtr startpoint;
......
......@@ -154,6 +154,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
SpGistInitMetapage(page);
/* Write the page. If archiving/streaming, XLOG it. */
PageSetChecksumInplace(page, SPGIST_METAPAGE_BLKNO);
smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
(char *) page, true);
if (XLogIsNeeded())
......@@ -163,6 +164,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
/* Likewise for the root page. */
SpGistInitPage(page, SPGIST_LEAF);
PageSetChecksumInplace(page, SPGIST_ROOT_BLKNO);
smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_ROOT_BLKNO,
(char *) page, true);
if (XLogIsNeeded())
......@@ -172,6 +174,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
/* Likewise for the null-tuples root page. */
SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS);
PageSetChecksumInplace(page, SPGIST_NULL_BLKNO);
smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_NULL_BLKNO,
(char *) page, true);
if (XLogIsNeeded())
......
......@@ -437,6 +437,8 @@ critical section.)
4. Mark the shared buffer(s) as dirty with MarkBufferDirty(). (This must
happen before the WAL record is inserted; see notes in SyncOneBuffer().)
Note that marking a buffer dirty with MarkBufferDirty() should only
happen iff you write a WAL record; see Writing Hints below.
5. If the relation requires WAL-logging, build a WAL log record and pass it
to XLogInsert(); then update the page's LSN using the returned XLOG
......@@ -584,6 +586,26 @@ replay code has to do the insertion on its own to restore the index to
consistency. Such insertions occur after WAL is operational, so they can
and should write WAL records for the additional generated actions.
Writing Hints
-------------
In some cases, we write additional information to data blocks without
writing a preceding WAL record. This should only happen iff the data can
be reconstructed later following a crash and the action is simply a way
of optimising for performance. When a hint is written we use
MarkBufferDirtyHint() to mark the block dirty.
If the buffer is clean and checksums are in use then
MarkBufferDirtyHint() inserts an XLOG_HINT record to ensure that we
take a full page image that includes the hint. We do this to avoid
a partial page write, when we write the dirtied page. WAL is not
written during recovery, so we simply skip dirtying blocks because
of hints when in recovery.
If you do decide to optimise away a WAL record, then any calls to
MarkBufferDirty() must be replaced by MarkBufferDirtyHint(),
otherwise you will expose the risk of partial page writes.
Write-Ahead Logging for Filesystem Actions
------------------------------------------
......
......@@ -60,6 +60,7 @@
#include "utils/timestamp.h"
#include "pg_trace.h"
extern bool bootstrap_data_checksums;
/* File path names (all relative to $PGDATA) */
#define RECOVERY_COMMAND_FILE "recovery.conf"
......@@ -730,6 +731,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
bool updrqst;
bool doPageWrites;
bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
bool isHint = (rmid == RM_XLOG_ID && info == XLOG_HINT);
uint8 info_orig = info;
static XLogRecord *rechdr;
......@@ -999,6 +1001,18 @@ begin:;
goto begin;
}
/*
* If this is a hint record and we don't need a backup block then
* we have no more work to do and can exit quickly without inserting
* a WAL record at all. In that case return InvalidXLogRecPtr.
*/
if (isHint && !(info & XLR_BKP_BLOCK_MASK))
{
LWLockRelease(WALInsertLock);
END_CRIT_SECTION();
return InvalidXLogRecPtr;
}
/*
* If the current page is completely full, the record goes to the next
* page, right after the page header.
......@@ -1253,10 +1267,10 @@ XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
* not. We don't need the buffer header lock for PageGetLSN because we
* have exclusive lock on the page and/or the relation.
*/
*lsn = PageGetLSN(page);
*lsn = BufferGetLSNAtomic(rdata->buffer);
if (doPageWrites &&
PageGetLSN(page) <= RedoRecPtr)
*lsn <= RedoRecPtr)
{
/*
* The page needs to be backed up, so set up *bkpb
......@@ -3187,6 +3201,11 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
}
/*
* Any checksum set on this page will be invalid. We don't need
* to reset it here since it will be set before being written.
*/
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
......@@ -3766,6 +3785,16 @@ GetSystemIdentifier(void)
return ControlFile->system_identifier;
}
/*
* Are checksums enabled for data pages?
*/
bool
DataChecksumsEnabled(void)
{
Assert(ControlFile != NULL);
return ControlFile->data_checksums;
}
/*
* Returns a fake LSN for unlogged relations.
*
......@@ -4092,6 +4121,7 @@ BootStrapXLOG(void)
ControlFile->max_prepared_xacts = max_prepared_xacts;
ControlFile->max_locks_per_xact = max_locks_per_xact;
ControlFile->wal_level = wal_level;
ControlFile->data_checksums = bootstrap_data_checksums;
/* some additional ControlFile fields are set in WriteControlFile() */
......@@ -7601,6 +7631,51 @@ XLogRestorePoint(const char *rpName)
return RecPtr;
}
/*
* Write a backup block if needed when we are setting a hint. Note that
* this may be called for a variety of page types, not just heaps.
*
* Deciding the "if needed" part is delicate and requires us to either
* grab WALInsertLock or check the info_lck spinlock. If we check the
* spinlock and it says Yes then we will need to get WALInsertLock as well,
* so the design choice here is to just go straight for the WALInsertLock
* and trust that calls to this function are minimised elsewhere.
*
* Callable while holding just share lock on the buffer content.
*
* Possible that multiple concurrent backends could attempt to write
* WAL records. In that case, more than one backup block may be recorded
* though that isn't important to the outcome and the backup blocks are
* likely to be identical anyway.
*/
#define XLOG_HINT_WATERMARK 13579
XLogRecPtr
XLogSaveBufferForHint(Buffer buffer)
{
/*
* Make an XLOG entry reporting the hint
*/
XLogRecData rdata[2];
int watermark = XLOG_HINT_WATERMARK;
/*
* Not allowed to have zero-length records, so use a small watermark
*/
rdata[0].data = (char *) (&watermark);
rdata[0].len = sizeof(int);
rdata[0].buffer = InvalidBuffer;
rdata[0].buffer_std = false;
rdata[0].next = &(rdata[1]);
rdata[1].data = NULL;
rdata[1].len = 0;
rdata[1].buffer = buffer;
rdata[1].buffer_std = true;
rdata[1].next = NULL;
return XLogInsert(RM_XLOG_ID, XLOG_HINT, rdata);
}
/*
* Check if any of the GUC parameters that are critical for hot standby
* have changed, and update the value in pg_control file if necessary.
......@@ -7767,8 +7842,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
/* Backup blocks are not used in xlog records */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
/* Backup blocks are not used in most xlog records */
Assert(info == XLOG_HINT || !(record->xl_info & XLR_BKP_BLOCK_MASK));
if (info == XLOG_NEXTOID)
{
......@@ -7961,6 +8036,34 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
{
/* nothing to do here */
}
else if (info == XLOG_HINT)
{
#ifdef USE_ASSERT_CHECKING
int *watermark = (int *) XLogRecGetData(record);
#endif
/* Check the watermark is correct for the hint record */
Assert(*watermark == XLOG_HINT_WATERMARK);
/* Backup blocks must be present for smgr hint records */
Assert(record->xl_info & XLR_BKP_BLOCK_MASK);
/*
* Hint records have no information that needs to be replayed.
* The sole purpose of them is to ensure that a hint bit does
* not cause a checksum invalidation if a hint bit write should
* cause a torn page. So the body of the record is empty but
* there must be one backup block.
*
* Since the only change in the backup block is a hint bit,
* there is no confict with Hot Standby.
*
* This also means there is no corresponding API call for this,
* so an smgr implementation has no need to implement anything.
* Which means nothing is needed in md.c etc
*/
RestoreBackupBlock(lsn, record, 0, false, false);
}
else if (info == XLOG_BACKUP_END)
{
XLogRecPtr startpoint;
......
......@@ -48,6 +48,8 @@
extern int optind;
extern char *optarg;
bool bootstrap_data_checksums = false;
#define ALLOC(t, c) ((t *) calloc((unsigned)(c), sizeof(t)))
......@@ -233,7 +235,7 @@ AuxiliaryProcessMain(int argc, char *argv[])
/* If no -x argument, we are a CheckerProcess */
MyAuxProcType = CheckerProcess;
while ((flag = getopt(argc, argv, "B:c:d:D:Fr:x:-:")) != -1)
while ((flag = getopt(argc, argv, "B:c:d:D:Fkr:x:-:")) != -1)
{
switch (flag)
{
......@@ -259,6 +261,9 @@ AuxiliaryProcessMain(int argc, char *argv[])
case 'F':
SetConfigOption("fsync", "false", PGC_POSTMASTER, PGC_S_ARGV);
break;
case 'k':
bootstrap_data_checksums = true;
break;
case 'r':
strlcpy(OutputFileName, optarg, MAXPGPATH);
break;
......
......@@ -76,6 +76,8 @@ SetRelationIsScannable(Relation relation)
log_newpage(&(relation->rd_node), MAIN_FORKNUM, 0, page);
RelationOpenSmgr(relation);
PageSetChecksumInplace(page, 0);
smgrextend(relation->rd_smgr, MAIN_FORKNUM, 0, (char *) page, true);
pfree(page);
......
......@@ -1118,7 +1118,7 @@ read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
SetBufferCommitInfoNeedsSave(*buf);
MarkBufferDirtyHint(*buf);
}
seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
......
......@@ -8902,6 +8902,8 @@ copy_relation_data(SMgrRelation src, SMgrRelation dst,
smgrread(src, forkNum, blkno, buf);
PageSetChecksumInplace(page, blkno);
/* XLOG stuff */
if (use_wal)
log_newpage(&dst->smgr_rnode.node, forkNum, blkno, page);
......
......@@ -672,8 +672,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
{
PageSetAllVisible(page);
MarkBufferDirty(buf);
visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer,
InvalidTransactionId);
visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
vmbuffer, InvalidTransactionId);
}
UnlockReleaseBuffer(buf);
......@@ -907,8 +907,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
{
PageSetAllVisible(page);
MarkBufferDirty(buf);
visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer,
visibility_cutoff_xid);
visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
vmbuffer, visibility_cutoff_xid);
}
else if (!all_visible_according_to_vm)
{
......@@ -918,8 +918,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
* allowed. Set the visibility map bit as well so that we get
* back in sync.
*/
visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer,
visibility_cutoff_xid);
visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
vmbuffer, visibility_cutoff_xid);
}
}
......@@ -1154,7 +1154,7 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
{
Assert(BufferIsValid(*vmbuffer));
PageSetAllVisible(page);
visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, *vmbuffer,
visibilitymap_set(onerel, blkno, buffer, InvalidXLogRecPtr, *vmbuffer,
visibility_cutoff_xid);
}
......
This diff is collapsed.
......@@ -197,15 +197,18 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
if (bufHdr->flags & BM_DIRTY)
{
SMgrRelation oreln;
Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
/* Find smgr relation for buffer */
oreln = smgropen(bufHdr->tag.rnode, MyBackendId);
PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
/* And write... */
smgrwrite(oreln,
bufHdr->tag.forkNum,
bufHdr->tag.blockNum,
(char *) LocalBufHdrGetBlock(bufHdr),
localpage,
false);
/* Mark not-dirty now in case we error out below */
......
......@@ -169,7 +169,9 @@ Recovery
--------
The FSM is not explicitly WAL-logged. Instead, we rely on a bunch of
self-correcting measures to repair possible corruption.
self-correcting measures to repair possible corruption. As a result when
we write to the FSM we treat that as a hint and thus use MarkBufferDirtyHint()
rather than MarkBufferDirty().
First of all, whenever a value is set on an FSM page, the root node of the
page is compared against the new value after bubbling up the change is
......
......@@ -216,7 +216,7 @@ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
PageInit(page, BLCKSZ, 0);
if (fsm_set_avail(page, slot, new_cat))
MarkBufferDirty(buf);
MarkBufferDirtyHint(buf);
UnlockReleaseBuffer(buf);
}
......@@ -286,7 +286,7 @@ FreeSpaceMapTruncateRel(Relation rel, BlockNumber nblocks)
return; /* nothing to do; the FSM was already smaller */
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
MarkBufferDirty(buf);
MarkBufferDirtyHint(buf);
UnlockReleaseBuffer(buf);
new_nfsmblocks = fsm_logical_to_physical(first_removed_address) + 1;
......@@ -583,6 +583,8 @@ fsm_extend(Relation rel, BlockNumber fsm_nblocks)
while (fsm_nblocks_now < fsm_nblocks)
{
PageSetChecksumInplace(pg, fsm_nblocks_now);
smgrextend(rel->rd_smgr, FSM_FORKNUM, fsm_nblocks_now,
(char *) pg, false);
fsm_nblocks_now++;
......@@ -617,7 +619,7 @@ fsm_set_and_search(Relation rel, FSMAddress addr, uint16 slot,
page = BufferGetPage(buf);
if (fsm_set_avail(page, slot, newValue))
MarkBufferDirty(buf);
MarkBufferDirtyHint(buf);
if (minValue != 0)
{
......@@ -768,7 +770,7 @@ fsm_vacuum_page(Relation rel, FSMAddress addr, bool *eof_p)
{
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
fsm_set_avail(BufferGetPage(buf), slot, child_avail);
MarkBufferDirty(buf);
MarkBufferDirtyHint(buf);
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
}
}
......
......@@ -284,7 +284,7 @@ restart:
exclusive_lock_held = true;
}
fsm_rebuild_page(page);
MarkBufferDirty(buf);
MarkBufferDirtyHint(buf);
goto restart;
}
}
......
......@@ -15,7 +15,14 @@
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xlog.h"
bool ignore_checksum_failure = false;
static char pageCopyData[BLCKSZ]; /* for checksum calculation */
static Page pageCopy = pageCopyData;
static uint16 PageCalcChecksum16(Page page, BlockNumber blkno);
/* ----------------------------------------------------------------
* Page support functions
......@@ -25,6 +32,8 @@
/*
* PageInit
* Initializes the contents of a page.
* Note that we don't calculate an initial checksum here; that's not done
* until it's time to write.
*/
void
PageInit(Page page, Size pageSize, Size specialSize)
......@@ -39,7 +48,7 @@ PageInit(Page page, Size pageSize, Size specialSize)
/* Make sure all fields of page are zero, as well as unused space */
MemSet(p, 0, pageSize);
/* p->pd_flags = 0; done by above MemSet */
p->pd_flags = 0;
p->pd_lower = SizeOfPageHeaderData;
p->pd_upper = pageSize - specialSize;
p->pd_special = pageSize - specialSize;
......@@ -49,8 +58,8 @@ PageInit(Page page, Size pageSize, Size specialSize)
/*
* PageHeaderIsValid
* Check that the header fields of a page appear valid.
* PageIsVerified
* Check that the page header and checksum (if any) appear valid.
*
* This is called when a page has just been read in from disk. The idea is
* to cheaply detect trashed pages before we go nuts following bogus item
......@@ -67,30 +76,77 @@ PageInit(Page page, Size pageSize, Size specialSize)
* will clean up such a page and make it usable.
*/
bool
PageHeaderIsValid(PageHeader page)
PageIsVerified(Page page, BlockNumber blkno)
{
PageHeader p = (PageHeader) page;
char *pagebytes;
int i;
bool checksum_failure = false;
bool header_sane = false;
bool all_zeroes = false;
uint16 checksum;
/*
* Don't verify page data unless the page passes basic non-zero test
*/
if (!PageIsNew(page))
{
if (DataChecksumsEnabled())
{
checksum = PageCalcChecksum16(page, blkno);
if (checksum != p->pd_checksum)
checksum_failure = true;
}
/* Check normal case */
if (PageGetPageSize(page) == BLCKSZ &&
PageGetPageLayoutVersion(page) == PG_PAGE_LAYOUT_VERSION &&
(page->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
page->pd_lower >= SizeOfPageHeaderData &&
page->pd_lower <= page->pd_upper &&
page->pd_upper <= page->pd_special &&
page->pd_special <= BLCKSZ &&
page->pd_special == MAXALIGN(page->pd_special))
/*
* The following checks don't prove the header is correct,
* only that it looks sane enough to allow into the buffer pool.
* Later usage of the block can still reveal problems,
* which is why we offer the checksum option.
*/
if ((p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
p->pd_lower <= p->pd_upper &&
p->pd_upper <= p->pd_special &&
p->pd_special <= BLCKSZ &&
p->pd_special == MAXALIGN(p->pd_special))
header_sane = true;
if (header_sane && !checksum_failure)
return true;
}
/* Check all-zeroes case */
all_zeroes = true;
pagebytes = (char *) page;
for (i = 0; i < BLCKSZ; i++)
{
if (pagebytes[i] != 0)
return false;
{
all_zeroes = false;
break;
}
}
if (all_zeroes)
return true;
/*
* Throw a WARNING if the checksum fails, but only after we've checked for
* the all-zeroes case.
*/
if (checksum_failure)
{
ereport(WARNING,
(ERRCODE_DATA_CORRUPTED,
errmsg("page verification failed, calculated checksum %u but expected %u",
checksum, p->pd_checksum)));
if (header_sane && ignore_checksum_failure)
return true;
}
return false;
}
......@@ -827,3 +883,98 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
pfree(itemidbase);
}
/*
* Set checksum for page in shared buffers.
*
* If checksums are disabled, or if the page is not initialized, just return
* the input. Otherwise, we must make a copy of the page before calculating the
* checksum, to prevent concurrent modifications (e.g. setting hint bits) from
* making the final checksum invalid.
*
* Returns a pointer to the block-sized data that needs to be written. Uses
* statically-allocated memory, so the caller must immediately write the
* returned page and not refer to it again.
*/
char *
PageSetChecksumCopy(Page page, BlockNumber blkno)
{
if (PageIsNew(page) || !DataChecksumsEnabled())
return (char *) page;
/*
* We make a copy iff we need to calculate a checksum because other
* backends may set hint bits on this page while we write, which
* would mean the checksum differs from the page contents. It doesn't
* matter if we include or exclude hints during the copy, as long
* as we write a valid page and associated checksum.
*/
memcpy((char *) pageCopy, (char *) page, BLCKSZ);
PageSetChecksumInplace(pageCopy, blkno);
return (char *) pageCopy;
}
/*
* Set checksum for page in private memory.
*
* This is a simpler version of PageSetChecksumCopy(). The more explicit API
* allows us to more easily see if we're making the correct call and reduces
* the amount of additional code specific to page verification.
*/
void
PageSetChecksumInplace(Page page, BlockNumber blkno)
{
if (PageIsNew(page))
return;
if (DataChecksumsEnabled())
{
PageHeader p = (PageHeader) page;
p->pd_checksum = PageCalcChecksum16(page, blkno);
}
return;
}
/*
* Calculate checksum for a PostgreSQL Page. This includes the block number (to
* detect the case when a page is somehow moved to a different location), the
* page header (excluding the checksum itself), and the page data.
*
* Note that if the checksum validation fails we cannot tell the difference
* between a transposed block and failure from direct on-block corruption,
* though that is better than just ignoring transposed blocks altogether.
*/
static uint16
PageCalcChecksum16(Page page, BlockNumber blkno)
{
pg_crc32 crc;
PageHeader p = (PageHeader) page;
/* only calculate the checksum for properly-initialized pages */
Assert(!PageIsNew(page));
INIT_CRC32(crc);
/*
* Initialize the checksum calculation with the block number. This helps
* catch corruption from whole blocks being transposed with other whole
* blocks.
*/
COMP_CRC32(crc, &blkno, sizeof(blkno));
/*
* Now add in the LSN, which is always the first field on the page.
*/
COMP_CRC32(crc, page, sizeof(p->pd_lsn));
/*
* Now add the rest of the page, skipping the pd_checksum field.
*/
COMP_CRC32(crc, page + sizeof(p->pd_lsn) + sizeof(p->pd_checksum),
BLCKSZ - sizeof(p->pd_lsn) - sizeof(p->pd_checksum));
FIN_CRC32(crc);
return (uint16) crc;
}
......@@ -122,6 +122,7 @@ extern int CommitDelay;
extern int CommitSiblings;
extern char *default_tablespace;
extern char *temp_tablespaces;
extern bool ignore_checksum_failure;
extern bool synchronize_seqscans;
extern int ssl_renegotiation_limit;
extern char *SSLCipherSuites;
......@@ -807,6 +808,21 @@ static struct config_bool ConfigureNamesBool[] =
true,
NULL, NULL, NULL
},
{
{"ignore_checksum_failure", PGC_SUSET, DEVELOPER_OPTIONS,
gettext_noop("Continues processing after a checksum failure."),
gettext_noop("Detection of a checksum failure normally causes PostgreSQL to "
"report an error, aborting the current transaction. Setting "
"ignore_checksum_failure to true causes the system to ignore the failure "
"(but still report a warning), and continue processing. This "
"behavior could cause crashes or other serious problems. Only "
"has an effect if checksums are enabled."),
GUC_NOT_IN_SAMPLE
},
&ignore_checksum_failure,
false,
NULL, NULL, NULL
},
{
{"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS,
gettext_noop("Continues processing past damaged page headers."),
......
......@@ -6,7 +6,7 @@
* NOTE: all the HeapTupleSatisfies routines will update the tuple's
* "hint" status bits if we see that the inserting or deleting transaction
* has now committed or aborted (and it is safe to set the hint bits).
* If the hint bits are changed, SetBufferCommitInfoNeedsSave is called on
* If the hint bits are changed, MarkBufferDirtyHint is called on
* the passed-in buffer. The caller must hold not only a pin, but at least
* shared buffer content lock on the buffer containing the tuple.
*
......@@ -121,7 +121,7 @@ SetHintBits(HeapTupleHeader tuple, Buffer buffer,
}
tuple->t_infomask |= infomask;
SetBufferCommitInfoNeedsSave(buffer);
MarkBufferDirtyHint(buffer);
}
/*
......
......@@ -119,6 +119,7 @@ static bool noclean = false;
static bool do_sync = true;
static bool sync_only = false;
static bool show_setting = false;
static bool data_checksums = false;
static char *xlog_dir = "";
......@@ -1441,8 +1442,10 @@ bootstrap_template1(void)
unsetenv("PGCLIENTENCODING");
snprintf(cmd, sizeof(cmd),
"\"%s\" --boot -x1 %s %s",
backend_exec, boot_options, talkargs);
"\"%s\" --boot -x1 %s %s %s",
backend_exec,
data_checksums ? "-k" : "",
boot_options, talkargs);
PG_CMD_OPEN;
......@@ -2748,6 +2751,7 @@ usage(const char *progname)
printf(_(" -X, --xlogdir=XLOGDIR location for the transaction log directory\n"));
printf(_("\nLess commonly used options:\n"));
printf(_(" -d, --debug generate lots of debugging output\n"));
printf(_(" -k, --data-checksums data page checksums\n"));
printf(_(" -L DIRECTORY where to find the input files\n"));
printf(_(" -n, --noclean do not clean up after errors\n"));
printf(_(" -N, --nosync do not wait for changes to be written safely to disk\n"));
......@@ -3424,6 +3428,7 @@ main(int argc, char *argv[])
{"nosync", no_argument, NULL, 'N'},
{"sync-only", no_argument, NULL, 'S'},
{"xlogdir", required_argument, NULL, 'X'},
{"data-checksums", no_argument, NULL, 'k'},
{NULL, 0, NULL, 0}
};
......@@ -3455,7 +3460,7 @@ main(int argc, char *argv[])
/* process command-line options */
while ((c = getopt_long(argc, argv, "dD:E:L:nNU:WA:sST:X:", long_options, &option_index)) != -1)
while ((c = getopt_long(argc, argv, "dD:E:kL:nNU:WA:sST:X:", long_options, &option_index)) != -1)
{
switch (c)
{
......@@ -3504,6 +3509,9 @@ main(int argc, char *argv[])
case 'S':
sync_only = true;
break;
case 'k':
data_checksums = true;
break;
case 'L':
share_path = pg_strdup(optarg);
break;
......@@ -3616,6 +3624,11 @@ main(int argc, char *argv[])
setup_text_search();
if (data_checksums)
printf(_("Data page checksums are enabled.\n"));
else
printf(_("Data page checksums are disabled.\n"));
printf("\n");
initialize_data_directory();
......
......@@ -287,5 +287,7 @@ main(int argc, char *argv[])
(ControlFile.float4ByVal ? _("by value") : _("by reference")));
printf(_("Float8 argument passing: %s\n"),
(ControlFile.float8ByVal ? _("by value") : _("by reference")));
printf(_("Data page checksums: %s\n"),
(ControlFile.data_checksums ? _("enabled") : _("disabled")));
return 0;
}
......@@ -624,6 +624,8 @@ PrintControlValues(bool guessed)
(ControlFile.float4ByVal ? _("by value") : _("by reference")));
printf(_("Float8 argument passing: %s\n"),
(ControlFile.float8ByVal ? _("by value") : _("by reference")));
printf(_("Data page checksums: %s\n"),
(ControlFile.data_checksums ? _("enabled") : _("disabled")));
}
......
......@@ -279,7 +279,7 @@ extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
TransactionId cutoff_xid, MultiXactId cutoff_multi,
OffsetNumber *offsets, int offcnt);
extern XLogRecPtr log_heap_visible(RelFileNode rnode, BlockNumber block,
extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer,
Buffer vm_buffer, TransactionId cutoff_xid);
extern XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum,
BlockNumber blk, Page page);
......
......@@ -24,8 +24,8 @@ extern void visibilitymap_clear(Relation rel, BlockNumber heapBlk,
extern void visibilitymap_pin(Relation rel, BlockNumber heapBlk,
Buffer *vmbuf);
extern bool visibilitymap_pin_ok(BlockNumber heapBlk, Buffer vmbuf);
extern void visibilitymap_set(Relation rel, BlockNumber heapBlk,
XLogRecPtr recptr, Buffer vmbuf, TransactionId cutoff_xid);
extern void visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid);
extern bool visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *vmbuf);
extern BlockNumber visibilitymap_count(Relation rel);
extern void visibilitymap_truncate(Relation rel, BlockNumber nheapblocks);
......
......@@ -267,6 +267,8 @@ extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
extern int XLogFileInit(XLogSegNo segno, bool *use_existent, bool use_lock);
extern int XLogFileOpen(XLogSegNo segno);
extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer);
extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
extern void XLogSetAsyncXactLSN(XLogRecPtr record);
......@@ -294,6 +296,7 @@ extern char *XLogFileNameP(TimeLineID tli, XLogSegNo segno);
extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void);
extern bool DataChecksumsEnabled(void);
extern XLogRecPtr GetFakeLSNForUnloggedRel(void);
extern Size XLOGShmemSize(void);
extern void XLOGShmemInit(void);
......
......@@ -21,7 +21,7 @@
/* Version identifier for this pg_control format */
#define PG_CONTROL_VERSION 935
#define PG_CONTROL_VERSION 936
/*
* Body of CheckPoint XLOG records. This is declared here because we keep
......@@ -67,6 +67,7 @@ typedef struct CheckPoint
#define XLOG_RESTORE_POINT 0x70
#define XLOG_FPW_CHANGE 0x80
#define XLOG_END_OF_RECOVERY 0x90
#define XLOG_HINT 0xA0
/*
......@@ -212,6 +213,9 @@ typedef struct ControlFileData
bool float4ByVal; /* float4 pass-by-value? */
bool float8ByVal; /* float8, int8, etc pass-by-value? */
/* Are data pages protected by checksums? */
bool data_checksums;
/* CRC of all above ... MUST BE LAST! */
pg_crc32 crc;
} ControlFileData;
......
......@@ -195,6 +195,7 @@ extern void DropDatabaseBuffers(Oid dbid);
RelationGetNumberOfBlocksInFork(reln, MAIN_FORKNUM)
extern bool BufferIsPermanent(Buffer buffer);
extern XLogRecPtr BufferGetLSNAtomic(Buffer buffer);
#ifdef NOT_USED
extern void PrintPinnedBufs(void);
......@@ -203,7 +204,7 @@ extern Size BufferShmemSize(void);
extern void BufferGetTag(Buffer buffer, RelFileNode *rnode,
ForkNumber *forknum, BlockNumber *blknum);
extern void SetBufferCommitInfoNeedsSave(Buffer buffer);
extern void MarkBufferDirtyHint(Buffer buffer);
extern void UnlockBuffers(void);
extern void LockBuffer(Buffer buffer, int mode);
......
......@@ -15,6 +15,7 @@
#define BUFPAGE_H
#include "access/xlogdefs.h"
#include "storage/block.h"
#include "storage/item.h"
#include "storage/off.h"
......@@ -386,7 +387,7 @@ do { \
*/
extern void PageInit(Page page, Size pageSize, Size specialSize);
extern bool PageHeaderIsValid(PageHeader page);
extern bool PageIsVerified(Page page, BlockNumber blkno);
extern OffsetNumber PageAddItem(Page page, Item item, Size size,
OffsetNumber offsetNumber, bool overwrite, bool is_heap);
extern Page PageGetTempPage(Page page);
......@@ -399,5 +400,7 @@ extern Size PageGetExactFreeSpace(Page page);
extern Size PageGetHeapFreeSpace(Page page);
extern void PageIndexTupleDelete(Page page, OffsetNumber offset);
extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems);
extern char *PageSetChecksumCopy(Page page, BlockNumber blkno);
extern void PageSetChecksumInplace(Page page, BlockNumber blkno);
#endif /* BUFPAGE_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment