Commit 9c808f89 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Refactor XLogInsert a bit. The rdata entries for backup blocks are now

constructed before acquiring WALInsertLock, which slightly reduces the time
the lock is held. Although I could not measure any benefit in benchmarks,
the code is more readable this way.
parent 26e89e7f
...@@ -694,6 +694,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) ...@@ -694,6 +694,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
uint32 freespace; uint32 freespace;
int curridx; int curridx;
XLogRecData *rdt; XLogRecData *rdt;
XLogRecData *rdt_lastnormal;
Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; Buffer dtbuf[XLR_MAX_BKP_BLOCKS];
bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS]; bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS]; BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
...@@ -708,6 +709,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) ...@@ -708,6 +709,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
bool updrqst; bool updrqst;
bool doPageWrites; bool doPageWrites;
bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH); bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
uint8 info_orig = info;
/* cross-check on whether we should be here or not */ /* cross-check on whether we should be here or not */
if (!XLogInsertAllowed()) if (!XLogInsertAllowed())
...@@ -731,23 +733,18 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) ...@@ -731,23 +733,18 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
} }
/* /*
* Here we scan the rdata chain, determine which buffers must be backed * Here we scan the rdata chain, to determine which buffers must be backed
* up, and compute the CRC values for the data. Note that the record * up.
* header isn't added into the CRC initially since we don't know the final
* length or info bits quite yet. Thus, the CRC will represent the CRC of
* the whole record in the order "rdata, then backup blocks, then record
* header".
* *
* We may have to loop back to here if a race condition is detected below. * We may have to loop back to here if a race condition is detected below.
* We could prevent the race by doing all this work while holding the * We could prevent the race by doing all this work while holding the
* insert lock, but it seems better to avoid doing CRC calculations while * insert lock, but it seems better to avoid doing CRC calculations while
* holding the lock. This means we have to be careful about modifying the * holding the lock.
* rdata chain until we know we aren't going to loop back again. The only *
* change we allow ourselves to make earlier is to set rdt->data = NULL in * We add entries for backup blocks to the chain, so that they don't
* chain items we have decided we will have to back up the whole buffer * need any special treatment in the critical section where the chunks are
* for. This is OK because we will certainly decide the same thing again * copied into the WAL buffers. Those entries have to be unlinked from the
* for those items if we do it over; doing it here saves an extra pass * chain if we have to loop back here.
* over the chain later.
*/ */
begin:; begin:;
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
...@@ -764,7 +761,6 @@ begin:; ...@@ -764,7 +761,6 @@ begin:;
*/ */
doPageWrites = fullPageWrites || Insert->forcePageWrites; doPageWrites = fullPageWrites || Insert->forcePageWrites;
INIT_CRC32(rdata_crc);
len = 0; len = 0;
for (rdt = rdata;;) for (rdt = rdata;;)
{ {
...@@ -772,7 +768,6 @@ begin:; ...@@ -772,7 +768,6 @@ begin:;
{ {
/* Simple data, just include it */ /* Simple data, just include it */
len += rdt->len; len += rdt->len;
COMP_CRC32(rdata_crc, rdt->data, rdt->len);
} }
else else
{ {
...@@ -783,12 +778,12 @@ begin:; ...@@ -783,12 +778,12 @@ begin:;
{ {
/* Buffer already referenced by earlier chain item */ /* Buffer already referenced by earlier chain item */
if (dtbuf_bkp[i]) if (dtbuf_bkp[i])
{
rdt->data = NULL; rdt->data = NULL;
rdt->len = 0;
}
else if (rdt->data) else if (rdt->data)
{
len += rdt->len; len += rdt->len;
COMP_CRC32(rdata_crc, rdt->data, rdt->len);
}
break; break;
} }
if (dtbuf[i] == InvalidBuffer) if (dtbuf[i] == InvalidBuffer)
...@@ -800,12 +795,10 @@ begin:; ...@@ -800,12 +795,10 @@ begin:;
{ {
dtbuf_bkp[i] = true; dtbuf_bkp[i] = true;
rdt->data = NULL; rdt->data = NULL;
rdt->len = 0;
} }
else if (rdt->data) else if (rdt->data)
{
len += rdt->len; len += rdt->len;
COMP_CRC32(rdata_crc, rdt->data, rdt->len);
}
break; break;
} }
} }
...@@ -820,47 +813,88 @@ begin:; ...@@ -820,47 +813,88 @@ begin:;
} }
/* /*
* Now add the backup block headers and data into the CRC * NOTE: We disallow len == 0 because it provides a useful bit of extra
* error checking in ReadRecord. This means that all callers of
* XLogInsert must supply at least some not-in-a-buffer data. However, we
* make an exception for XLOG SWITCH records because we don't want them to
* ever cross a segment boundary.
*/ */
if (len == 0 && !isLogSwitch)
elog(PANIC, "invalid xlog record length %u", len);
/*
* Make additional rdata chain entries for the backup blocks, so that we
* don't need to special-case them in the write loop. This modifies the
* original rdata chain, but we keep a pointer to the last regular entry,
* rdt_lastnormal, so that we can undo this if we have to loop back to the
* beginning.
*
* At the exit of this loop, write_len includes the backup block data.
*
* Also set the appropriate info bits to show which buffers were backed
* up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
* buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
*/
rdt_lastnormal = rdt;
write_len = len;
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{ {
if (dtbuf_bkp[i]) BkpBlock *bkpb;
{
BkpBlock *bkpb = &(dtbuf_xlg[i]);
char *page; char *page;
COMP_CRC32(rdata_crc, if (!dtbuf_bkp[i])
(char *) bkpb, continue;
sizeof(BkpBlock));
info |= XLR_SET_BKP_BLOCK(i);
bkpb = &(dtbuf_xlg[i]);
page = (char *) BufferGetBlock(dtbuf[i]); page = (char *) BufferGetBlock(dtbuf[i]);
rdt->next = &(dtbuf_rdt1[i]);
rdt = rdt->next;
rdt->data = (char *) bkpb;
rdt->len = sizeof(BkpBlock);
write_len += sizeof(BkpBlock);
rdt->next = &(dtbuf_rdt2[i]);
rdt = rdt->next;
if (bkpb->hole_length == 0) if (bkpb->hole_length == 0)
{ {
COMP_CRC32(rdata_crc, rdt->data = page;
page, rdt->len = BLCKSZ;
BLCKSZ); write_len += BLCKSZ;
rdt->next = NULL;
} }
else else
{ {
/* must skip the hole */ /* must skip the hole */
COMP_CRC32(rdata_crc, rdt->data = page;
page, rdt->len = bkpb->hole_offset;
bkpb->hole_offset); write_len += bkpb->hole_offset;
COMP_CRC32(rdata_crc,
page + (bkpb->hole_offset + bkpb->hole_length), rdt->next = &(dtbuf_rdt3[i]);
BLCKSZ - (bkpb->hole_offset + bkpb->hole_length)); rdt = rdt->next;
}
rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
write_len += rdt->len;
rdt->next = NULL;
} }
} }
/* /*
* NOTE: We disallow len == 0 because it provides a useful bit of extra * Calculate CRC of the data, including all the backup blocks
* error checking in ReadRecord. This means that all callers of *
* XLogInsert must supply at least some not-in-a-buffer data. However, we * Note that the record header isn't added into the CRC initially since
* make an exception for XLOG SWITCH records because we don't want them to * we don't know the prev-link yet. Thus, the CRC will represent the CRC
* ever cross a segment boundary. * of the whole record in the order: rdata, then backup blocks, then
* record header.
*/ */
if (len == 0 && !isLogSwitch) INIT_CRC32(rdata_crc);
elog(PANIC, "invalid xlog record length %u", len); for (rdt = rdata; rdt != NULL; rdt = rdt->next)
COMP_CRC32(rdata_crc, rdt->data, rdt->len);
START_CRIT_SECTION(); START_CRIT_SECTION();
...@@ -896,6 +930,8 @@ begin:; ...@@ -896,6 +930,8 @@ begin:;
*/ */
LWLockRelease(WALInsertLock); LWLockRelease(WALInsertLock);
END_CRIT_SECTION(); END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
goto begin; goto begin;
} }
} }
...@@ -910,70 +946,14 @@ begin:; ...@@ -910,70 +946,14 @@ begin:;
*/ */
if (Insert->forcePageWrites && !doPageWrites) if (Insert->forcePageWrites && !doPageWrites)
{ {
/* Oops, must redo it with full-page data */ /* Oops, must redo it with full-page data. */
LWLockRelease(WALInsertLock); LWLockRelease(WALInsertLock);
END_CRIT_SECTION(); END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
goto begin; goto begin;
} }
/*
* Make additional rdata chain entries for the backup blocks, so that we
* don't need to special-case them in the write loop. Note that we have
* now irrevocably changed the input rdata chain. At the exit of this
* loop, write_len includes the backup block data.
*
* Also set the appropriate info bits to show which buffers were backed
* up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
* buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
*/
write_len = len;
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
BkpBlock *bkpb;
char *page;
if (!dtbuf_bkp[i])
continue;
info |= XLR_SET_BKP_BLOCK(i);
bkpb = &(dtbuf_xlg[i]);
page = (char *) BufferGetBlock(dtbuf[i]);
rdt->next = &(dtbuf_rdt1[i]);
rdt = rdt->next;
rdt->data = (char *) bkpb;
rdt->len = sizeof(BkpBlock);
write_len += sizeof(BkpBlock);
rdt->next = &(dtbuf_rdt2[i]);
rdt = rdt->next;
if (bkpb->hole_length == 0)
{
rdt->data = page;
rdt->len = BLCKSZ;
write_len += BLCKSZ;
rdt->next = NULL;
}
else
{
/* must skip the hole */
rdt->data = page;
rdt->len = bkpb->hole_offset;
write_len += bkpb->hole_offset;
rdt->next = &(dtbuf_rdt3[i]);
rdt = rdt->next;
rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
write_len += rdt->len;
rdt->next = NULL;
}
}
/* /*
* If there isn't enough space on the current XLOG page for a record * If there isn't enough space on the current XLOG page for a record
* header, advance to the next page (leaving the unused space as zeroes). * header, advance to the next page (leaving the unused space as zeroes).
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment