Commit db03cf37 authored by Tom Lane's avatar Tom Lane

Code review/prettification for generic_xlog.c.

Improve commentary, use more specific names for the delta fields,
const-ify pointer arguments where possible, avoid assuming that
initializing only the first element of a local array will guarantee
that the remaining elements end up as we need them.  (I think that
code in generic_redo actually worked, but only because InvalidBuffer
is zero; this is a particularly ugly way of depending on that ...)
parent 2dd318d2
...@@ -27,12 +27,12 @@ ...@@ -27,12 +27,12 @@
* - length of page region (OffsetNumber) * - length of page region (OffsetNumber)
* - data - the data to place into the region ('length' number of bytes) * - data - the data to place into the region ('length' number of bytes)
* *
* Unchanged regions of a page are not represented in its delta. As a * Unchanged regions of a page are not represented in its delta. As a result,
* result, a delta can be more compact than the full page image. But having * a delta can be more compact than the full page image. But having an
* an unchanged region in the middle of two fragments that is smaller than * unchanged region between two fragments that is smaller than the fragment
* the fragment header (offset and length) does not pay off in terms of the * header (offset+length) does not pay off in terms of the overall size of
* overall size of the delta. For this reason, we break fragments only if * the delta. For this reason, we merge adjacent fragments if the unchanged
* the unchanged region is bigger than MATCH_THRESHOLD. * region between them is <= MATCH_THRESHOLD bytes.
* *
* The worst case for delta sizes occurs when we did not find any unchanged * The worst case for delta sizes occurs when we did not find any unchanged
* region in the page. The size of the delta will be the size of the page plus * region in the page. The size of the delta will be the size of the page plus
...@@ -41,16 +41,16 @@ ...@@ -41,16 +41,16 @@
*/ */
#define FRAGMENT_HEADER_SIZE (2 * sizeof(OffsetNumber)) #define FRAGMENT_HEADER_SIZE (2 * sizeof(OffsetNumber))
#define MATCH_THRESHOLD FRAGMENT_HEADER_SIZE #define MATCH_THRESHOLD FRAGMENT_HEADER_SIZE
#define MAX_DELTA_SIZE BLCKSZ + FRAGMENT_HEADER_SIZE #define MAX_DELTA_SIZE (BLCKSZ + FRAGMENT_HEADER_SIZE)
/* Struct of generic xlog data for single page */ /* Struct of generic xlog data for single page */
typedef struct typedef struct
{ {
Buffer buffer; /* registered buffer */ Buffer buffer; /* registered buffer */
char image[BLCKSZ]; /* copy of page image for modification */
char data[MAX_DELTA_SIZE]; /* delta between page images */
int dataLen; /* space consumed in data field */
bool fullImage; /* are we taking a full image of this page? */ bool fullImage; /* are we taking a full image of this page? */
int deltaLen; /* space consumed in delta field */
char image[BLCKSZ]; /* copy of page image for modification */
char delta[MAX_DELTA_SIZE]; /* delta between page images */
} PageData; } PageData;
/* State of generic xlog record construction */ /* State of generic xlog record construction */
...@@ -61,22 +61,26 @@ struct GenericXLogState ...@@ -61,22 +61,26 @@ struct GenericXLogState
}; };
static void writeFragment(PageData *pageData, OffsetNumber offset, static void writeFragment(PageData *pageData, OffsetNumber offset,
OffsetNumber len, Pointer data); OffsetNumber len, const char *data);
static void writeDelta(PageData *pageData); static void computeDelta(PageData *pageData);
static void applyPageRedo(Page page, Pointer data, Size dataSize); static void applyPageRedo(Page page, const char *delta, Size deltaSize);
/* /*
* Write next fragment into delta. * Write next fragment into pageData's delta.
*
* The fragment has the given offset and length, and data points to the
* actual data (of length length).
*/ */
static void static void
writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length, writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
Pointer data) const char *data)
{ {
Pointer ptr = pageData->data + pageData->dataLen; char *ptr = pageData->delta + pageData->deltaLen;
/* Check if we have enough space */ /* Verify we have enough space */
Assert(pageData->dataLen + sizeof(offset) + Assert(pageData->deltaLen + sizeof(offset) +
sizeof(length) + length <= sizeof(pageData->data)); sizeof(length) + length <= sizeof(pageData->delta));
/* Write fragment data */ /* Write fragment data */
memcpy(ptr, &offset, sizeof(offset)); memcpy(ptr, &offset, sizeof(offset));
...@@ -86,14 +90,14 @@ writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length, ...@@ -86,14 +90,14 @@ writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
memcpy(ptr, data, length); memcpy(ptr, data, length);
ptr += length; ptr += length;
pageData->dataLen = ptr - pageData->data; pageData->deltaLen = ptr - pageData->delta;
} }
/* /*
* Make delta for given page. * Compute the delta record for given page.
*/ */
static void static void
writeDelta(PageData *pageData) computeDelta(PageData *pageData)
{ {
Page page = BufferGetPage(pageData->buffer, NULL, NULL, Page page = BufferGetPage(pageData->buffer, NULL, NULL,
BGP_NO_SNAPSHOT_TEST), BGP_NO_SNAPSHOT_TEST),
...@@ -106,6 +110,8 @@ writeDelta(PageData *pageData) ...@@ -106,6 +110,8 @@ writeDelta(PageData *pageData)
imageLower = ((PageHeader) image)->pd_lower, imageLower = ((PageHeader) image)->pd_lower,
imageUpper = ((PageHeader) image)->pd_upper; imageUpper = ((PageHeader) image)->pd_upper;
pageData->deltaLen = 0;
for (i = 0; i < BLCKSZ; i++) for (i = 0; i < BLCKSZ; i++)
{ {
bool match; bool match;
...@@ -181,22 +187,22 @@ writeDelta(PageData *pageData) ...@@ -181,22 +187,22 @@ writeDelta(PageData *pageData)
char tmp[BLCKSZ]; char tmp[BLCKSZ];
memcpy(tmp, image, BLCKSZ); memcpy(tmp, image, BLCKSZ);
applyPageRedo(tmp, pageData->data, pageData->dataLen); applyPageRedo(tmp, pageData->delta, pageData->deltaLen);
if (memcmp(tmp, page, pageLower) if (memcmp(tmp, page, pageLower) != 0 ||
|| memcmp(tmp + pageUpper, page + pageUpper, BLCKSZ - pageUpper)) memcmp(tmp + pageUpper, page + pageUpper, BLCKSZ - pageUpper) != 0)
elog(ERROR, "result of generic xlog apply does not match"); elog(ERROR, "result of generic xlog apply does not match");
} }
#endif #endif
} }
/* /*
* Start new generic xlog record. * Start new generic xlog record for modifications to specified relation.
*/ */
GenericXLogState * GenericXLogState *
GenericXLogStart(Relation relation) GenericXLogStart(Relation relation)
{ {
int i;
GenericXLogState *state; GenericXLogState *state;
int i;
state = (GenericXLogState *) palloc(sizeof(GenericXLogState)); state = (GenericXLogState *) palloc(sizeof(GenericXLogState));
...@@ -209,24 +215,30 @@ GenericXLogStart(Relation relation) ...@@ -209,24 +215,30 @@ GenericXLogStart(Relation relation)
/* /*
* Register new buffer for generic xlog record. * Register new buffer for generic xlog record.
*
* Returns pointer to the page's image in the GenericXLogState, which
* is what the caller should modify.
*
* If the buffer is already registered, just return its existing entry.
*/ */
Page Page
GenericXLogRegister(GenericXLogState *state, Buffer buffer, bool isNew) GenericXLogRegister(GenericXLogState *state, Buffer buffer, bool isNew)
{ {
int block_id; int block_id;
/* Place new buffer to unused slot in array */ /* Search array for existing entry or first unused slot */
for (block_id = 0; block_id < MAX_GENERIC_XLOG_PAGES; block_id++) for (block_id = 0; block_id < MAX_GENERIC_XLOG_PAGES; block_id++)
{ {
PageData *page = &state->pages[block_id]; PageData *page = &state->pages[block_id];
if (BufferIsInvalid(page->buffer)) if (BufferIsInvalid(page->buffer))
{ {
/* Empty slot, so use it (there cannot be a match later) */
page->buffer = buffer; page->buffer = buffer;
memcpy(page->image, BufferGetPage(buffer, NULL, NULL,
BGP_NO_SNAPSHOT_TEST), BLCKSZ);
page->dataLen = 0;
page->fullImage = isNew; page->fullImage = isNew;
memcpy(page->image,
BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST),
BLCKSZ);
return (Page) page->image; return (Page) page->image;
} }
else if (page->buffer == buffer) else if (page->buffer == buffer)
...@@ -239,15 +251,16 @@ GenericXLogRegister(GenericXLogState *state, Buffer buffer, bool isNew) ...@@ -239,15 +251,16 @@ GenericXLogRegister(GenericXLogState *state, Buffer buffer, bool isNew)
} }
} }
elog(ERROR, "maximum number of %d generic xlog buffers is exceeded", elog(ERROR, "maximum number %d of generic xlog buffers is exceeded",
MAX_GENERIC_XLOG_PAGES); MAX_GENERIC_XLOG_PAGES);
/* keep compiler quiet */ /* keep compiler quiet */
return NULL; return NULL;
} }
/* /*
* Unregister particular buffer for generic xlog record. * Unregister particular buffer for generic xlog record.
*
* XXX this is dangerous and should go away.
*/ */
void void
GenericXLogUnregister(GenericXLogState *state, Buffer buffer) GenericXLogUnregister(GenericXLogState *state, Buffer buffer)
...@@ -274,7 +287,8 @@ GenericXLogUnregister(GenericXLogState *state, Buffer buffer) ...@@ -274,7 +287,8 @@ GenericXLogUnregister(GenericXLogState *state, Buffer buffer)
} }
/* /*
* Put all changes in registered buffers to generic xlog record. * Apply changes represented by GenericXLogState to the actual buffers,
* and emit a generic xlog record.
*/ */
XLogRecPtr XLogRecPtr
GenericXLogFinish(GenericXLogState *state) GenericXLogFinish(GenericXLogState *state)
...@@ -291,33 +305,35 @@ GenericXLogFinish(GenericXLogState *state) ...@@ -291,33 +305,35 @@ GenericXLogFinish(GenericXLogState *state)
for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++) for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
{ {
PageData *pageData = &state->pages[i];
Page page;
char tmp[BLCKSZ]; char tmp[BLCKSZ];
PageData *page = &state->pages[i];
if (BufferIsInvalid(page->buffer)) if (BufferIsInvalid(pageData->buffer))
continue; continue;
page = BufferGetPage(pageData->buffer, NULL, NULL,
BGP_NO_SNAPSHOT_TEST);
/* Swap current and saved page image. */ /* Swap current and saved page image. */
memcpy(tmp, page->image, BLCKSZ); memcpy(tmp, pageData->image, BLCKSZ);
memcpy(page->image, BufferGetPage(page->buffer, NULL, NULL, memcpy(pageData->image, page, BLCKSZ);
BGP_NO_SNAPSHOT_TEST), BLCKSZ); memcpy(page, tmp, BLCKSZ);
memcpy(BufferGetPage(page->buffer, NULL, NULL,
BGP_NO_SNAPSHOT_TEST), tmp, BLCKSZ);
if (page->fullImage) if (pageData->fullImage)
{ {
/* A full page image does not require anything special */ /* A full page image does not require anything special */
XLogRegisterBuffer(i, page->buffer, REGBUF_FORCE_IMAGE); XLogRegisterBuffer(i, pageData->buffer, REGBUF_FORCE_IMAGE);
} }
else else
{ {
/* /*
* In normal mode, calculate delta and write it as data * In normal mode, calculate delta and write it as xlog data
* associated with this page. * associated with this page.
*/ */
XLogRegisterBuffer(i, page->buffer, REGBUF_STANDARD); XLogRegisterBuffer(i, pageData->buffer, REGBUF_STANDARD);
writeDelta(page); computeDelta(pageData);
XLogRegisterBufData(i, page->data, page->dataLen); XLogRegisterBufData(i, pageData->delta, pageData->deltaLen);
} }
} }
...@@ -327,13 +343,13 @@ GenericXLogFinish(GenericXLogState *state) ...@@ -327,13 +343,13 @@ GenericXLogFinish(GenericXLogState *state)
/* Set LSN and mark buffers dirty */ /* Set LSN and mark buffers dirty */
for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++) for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
{ {
PageData *page = &state->pages[i]; PageData *pageData = &state->pages[i];
if (BufferIsInvalid(page->buffer)) if (BufferIsInvalid(pageData->buffer))
continue; continue;
PageSetLSN(BufferGetPage(page->buffer, NULL, NULL, PageSetLSN(BufferGetPage(pageData->buffer, NULL, NULL,
BGP_NO_SNAPSHOT_TEST), lsn); BGP_NO_SNAPSHOT_TEST), lsn);
MarkBufferDirty(page->buffer); MarkBufferDirty(pageData->buffer);
} }
END_CRIT_SECTION(); END_CRIT_SECTION();
} }
...@@ -343,13 +359,15 @@ GenericXLogFinish(GenericXLogState *state) ...@@ -343,13 +359,15 @@ GenericXLogFinish(GenericXLogState *state)
START_CRIT_SECTION(); START_CRIT_SECTION();
for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++) for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
{ {
PageData *page = &state->pages[i]; PageData *pageData = &state->pages[i];
if (BufferIsInvalid(page->buffer)) if (BufferIsInvalid(pageData->buffer))
continue; continue;
memcpy(BufferGetPage(page->buffer, NULL, NULL, memcpy(BufferGetPage(pageData->buffer, NULL, NULL,
BGP_NO_SNAPSHOT_TEST), page->image, BLCKSZ); BGP_NO_SNAPSHOT_TEST),
MarkBufferDirty(page->buffer); pageData->image,
BLCKSZ);
MarkBufferDirty(pageData->buffer);
} }
END_CRIT_SECTION(); END_CRIT_SECTION();
} }
...@@ -360,7 +378,9 @@ GenericXLogFinish(GenericXLogState *state) ...@@ -360,7 +378,9 @@ GenericXLogFinish(GenericXLogState *state)
} }
/* /*
* Abort generic xlog record. * Abort generic xlog record construction. No changes are applied to buffers.
*
* Note: caller is responsible for releasing locks/pins on buffers, if needed.
*/ */
void void
GenericXLogAbort(GenericXLogState *state) GenericXLogAbort(GenericXLogState *state)
...@@ -372,10 +392,10 @@ GenericXLogAbort(GenericXLogState *state) ...@@ -372,10 +392,10 @@ GenericXLogAbort(GenericXLogState *state)
* Apply delta to given page image. * Apply delta to given page image.
*/ */
static void static void
applyPageRedo(Page page, Pointer data, Size dataSize) applyPageRedo(Page page, const char *delta, Size deltaSize)
{ {
Pointer ptr = data, const char *ptr = delta;
end = data + dataSize; const char *end = delta + deltaSize;
while (ptr < end) while (ptr < end)
{ {
...@@ -399,10 +419,11 @@ applyPageRedo(Page page, Pointer data, Size dataSize) ...@@ -399,10 +419,11 @@ applyPageRedo(Page page, Pointer data, Size dataSize)
void void
generic_redo(XLogReaderState *record) generic_redo(XLogReaderState *record)
{ {
uint8 block_id;
Buffer buffers[MAX_GENERIC_XLOG_PAGES] = {InvalidBuffer};
XLogRecPtr lsn = record->EndRecPtr; XLogRecPtr lsn = record->EndRecPtr;
Buffer buffers[MAX_GENERIC_XLOG_PAGES];
uint8 block_id;
/* Protect limited size of buffers[] array */
Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES); Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES);
/* Iterate over blocks */ /* Iterate over blocks */
...@@ -411,20 +432,24 @@ generic_redo(XLogReaderState *record) ...@@ -411,20 +432,24 @@ generic_redo(XLogReaderState *record)
XLogRedoAction action; XLogRedoAction action;
if (!XLogRecHasBlockRef(record, block_id)) if (!XLogRecHasBlockRef(record, block_id))
{
buffers[block_id] = InvalidBuffer;
continue; continue;
}
action = XLogReadBufferForRedo(record, block_id, &buffers[block_id]); action = XLogReadBufferForRedo(record, block_id, &buffers[block_id]);
/* Apply redo to given block if needed */ /* Apply redo to given block if needed */
if (action == BLK_NEEDS_REDO) if (action == BLK_NEEDS_REDO)
{ {
Pointer blockData;
Size blockDataSize;
Page page; Page page;
char *blockDelta;
Size blockDeltaSize;
page = BufferGetPage(buffers[block_id], NULL, NULL, BGP_NO_SNAPSHOT_TEST); page = BufferGetPage(buffers[block_id], NULL, NULL,
blockData = XLogRecGetBlockData(record, block_id, &blockDataSize); BGP_NO_SNAPSHOT_TEST);
applyPageRedo(page, blockData, blockDataSize); blockDelta = XLogRecGetBlockData(record, block_id, &blockDeltaSize);
applyPageRedo(page, blockDelta, blockDeltaSize);
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
MarkBufferDirty(buffers[block_id]); MarkBufferDirty(buffers[block_id]);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment