Commit 8bf74967 authored by Alvaro Herrera's avatar Alvaro Herrera

Reduce the number of pallocs() in BRIN

Instead of allocating memory in brin_deform_tuple and brin_copy_tuple
over and over during a scan, allow reuse of previously allocated memory.
This is said to make for a measurable performance improvement.

Author: Jinyu Zhang, Álvaro Herrera
Reviewed by: Tomas Vondra
Discussion: https://postgr.es/m/495deb78.4186.1500dacaa63.Coremail.beijing_pg@163.com
parent e8fdbd58
......@@ -226,7 +226,8 @@ brin_page_items(PG_FUNCTION_ARGS)
if (ItemIdIsUsed(itemId))
{
dtup = brin_deform_tuple(bdesc,
(BrinTuple *) PageGetItem(page, itemId));
(BrinTuple *) PageGetItem(page, itemId),
NULL);
attno = 1;
unusedItem = false;
}
......
......@@ -217,7 +217,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
MemoryContextSwitchTo(tupcxt);
}
dtup = brin_deform_tuple(bdesc, brtup);
dtup = brin_deform_tuple(bdesc, brtup, NULL);
/*
* Compare the key values of the new tuple to the stored index values;
......@@ -268,7 +268,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
* re-acquiring the lock.
*/
origsz = ItemIdGetLength(lp);
origtup = brin_copy_tuple(brtup, origsz);
origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
/*
* Before releasing the lock, check if we can attempt a same-page
......@@ -363,6 +363,9 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
FmgrInfo *consistentFn;
MemoryContext oldcxt;
MemoryContext perRangeCxt;
BrinMemTuple *dtup;
BrinTuple *btup = NULL;
Size btupsz = 0;
opaque = (BrinOpaque *) scan->opaque;
bdesc = opaque->bo_bdesc;
......@@ -384,6 +387,9 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
*/
consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
/* allocate an initial in-memory tuple, out of the per-range memcxt */
dtup = brin_new_memtuple(bdesc);
/*
* Setup and use a per-range memory context, which is reset every time we
* loop below. This avoids having to free the tuples within the loop.
......@@ -401,6 +407,7 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
{
bool addrange;
bool gottuple = false;
BrinTuple *tup;
OffsetNumber off;
Size size;
......@@ -414,7 +421,8 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
scan->xs_snapshot);
if (tup)
{
tup = brin_copy_tuple(tup, size);
gottuple = true;
btup = brin_copy_tuple(tup, size, btup, &btupsz);
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
}
......@@ -422,15 +430,13 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
* For page ranges with no indexed tuple, we must return the whole
* range; otherwise, compare it to the scan keys.
*/
if (tup == NULL)
if (!gottuple)
{
addrange = true;
}
else
{
BrinMemTuple *dtup;
dtup = brin_deform_tuple(bdesc, tup);
dtup = brin_deform_tuple(bdesc, btup, dtup);
if (dtup->bt_placeholder)
{
/*
......@@ -1210,7 +1216,7 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
/* the placeholder tuple must exist */
if (phtup == NULL)
elog(ERROR, "missing placeholder tuple");
phtup = brin_copy_tuple(phtup, phsz);
phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
/* merge it into the tuple from the heap scan */
......@@ -1358,7 +1364,7 @@ union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
"brin union",
ALLOCSET_DEFAULT_SIZES);
oldcxt = MemoryContextSwitchTo(cxt);
db = brin_deform_tuple(bdesc, b);
db = brin_deform_tuple(bdesc, b, NULL);
MemoryContextSwitchTo(oldcxt);
for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
......
......@@ -548,6 +548,8 @@ brin_evacuate_page(Relation idxRel, BlockNumber pagesPerRange,
OffsetNumber off;
OffsetNumber maxoff;
Page page;
BrinTuple *btup = NULL;
Size btupsz = 0;
page = BufferGetPage(buf);
......@@ -567,7 +569,7 @@ brin_evacuate_page(Relation idxRel, BlockNumber pagesPerRange,
{
sz = ItemIdGetLength(lp);
tup = (BrinTuple *) PageGetItem(page, lp);
tup = brin_copy_tuple(tup, sz);
tup = brin_copy_tuple(tup, sz, btup, &btupsz);
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
......
......@@ -311,17 +311,26 @@ brin_free_tuple(BrinTuple *tuple)
}
/*
* Create a palloc'd copy of a BrinTuple.
* Given a brin tuple of size len, create a copy of it. If 'dest' is not
* NULL, its size is destsz, and can be used as output buffer; if the tuple
* to be copied does not fit, it is enlarged by repalloc, and the size is
* updated to match. This avoids palloc/free cycles when many brin tuples
* are being processed in loops.
*/
BrinTuple *
brin_copy_tuple(BrinTuple *tuple, Size len)
brin_copy_tuple(BrinTuple *tuple, Size len, BrinTuple *dest, Size *destsz)
{
BrinTuple *newtup;
if (!destsz || *destsz == 0)
dest = palloc(len);
else if (len > *destsz)
{
dest = repalloc(dest, len);
*destsz = len;
}
newtup = palloc(len);
memcpy(newtup, tuple, len);
memcpy(dest, tuple, len);
return newtup;
return dest;
}
/*
......@@ -348,54 +357,69 @@ BrinMemTuple *
brin_new_memtuple(BrinDesc *brdesc)
{
BrinMemTuple *dtup;
char *currdatum;
long basesize;
int i;
basesize = MAXALIGN(sizeof(BrinMemTuple) +
sizeof(BrinValues) * brdesc->bd_tupdesc->natts);
dtup = palloc0(basesize + sizeof(Datum) * brdesc->bd_totalstored);
currdatum = (char *) dtup + basesize;
for (i = 0; i < brdesc->bd_tupdesc->natts; i++)
{
dtup->bt_columns[i].bv_attno = i + 1;
dtup->bt_columns[i].bv_allnulls = true;
dtup->bt_columns[i].bv_hasnulls = false;
dtup->bt_columns[i].bv_values = (Datum *) currdatum;
currdatum += sizeof(Datum) * brdesc->bd_info[i]->oi_nstored;
}
dtup->bt_values = palloc(sizeof(Datum) * brdesc->bd_totalstored);
dtup->bt_allnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
dtup->bt_hasnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
dtup->bt_context = AllocSetContextCreate(CurrentMemoryContext,
"brin dtuple",
ALLOCSET_DEFAULT_SIZES);
brin_memtuple_initialize(dtup, brdesc);
return dtup;
}
/*
* Reset a BrinMemTuple to initial state
* Reset a BrinMemTuple to initial state. We return the same tuple, for
* notational convenience.
*/
void
BrinMemTuple *
brin_memtuple_initialize(BrinMemTuple *dtuple, BrinDesc *brdesc)
{
int i;
char *currdatum;
MemoryContextReset(dtuple->bt_context);
currdatum = (char *) dtuple +
MAXALIGN(sizeof(BrinMemTuple) +
sizeof(BrinValues) * brdesc->bd_tupdesc->natts);
for (i = 0; i < brdesc->bd_tupdesc->natts; i++)
{
dtuple->bt_columns[i].bv_allnulls = true;
dtuple->bt_columns[i].bv_hasnulls = false;
dtuple->bt_columns[i].bv_attno = i + 1;
dtuple->bt_columns[i].bv_allnulls = true;
dtuple->bt_columns[i].bv_hasnulls = false;
dtuple->bt_columns[i].bv_values = (Datum *) currdatum;
currdatum += sizeof(Datum) * brdesc->bd_info[i]->oi_nstored;
}
return dtuple;
}
/*
* Convert a BrinTuple back to a BrinMemTuple. This is the reverse of
* brin_form_tuple.
*
* As an optimization, the caller can pass a previously allocated 'dMemtuple'.
* This avoids having to allocate it here, which can be useful when this
* function is called many times in a loop. It is caller's responsibility
* that the given BrinMemTuple matches what we need here.
*
* Note we don't need the "on disk tupdesc" here; we rely on our own routine to
* deconstruct the tuple from the on-disk format.
*/
BrinMemTuple *
brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple)
brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple, BrinMemTuple *dMemtuple)
{
BrinMemTuple *dtup;
Datum *values;
......@@ -407,15 +431,16 @@ brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple)
int valueno;
MemoryContext oldcxt;
dtup = brin_new_memtuple(brdesc);
dtup = dMemtuple ? brin_memtuple_initialize(dMemtuple, brdesc) :
brin_new_memtuple(brdesc);
if (BrinTupleIsPlaceholder(tuple))
dtup->bt_placeholder = true;
dtup->bt_blkno = tuple->bt_blkno;
values = palloc(sizeof(Datum) * brdesc->bd_totalstored);
allnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
hasnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
values = dtup->bt_values;
allnulls = dtup->bt_allnulls;
hasnulls = dtup->bt_hasnulls;
tp = (char *) tuple + BrinTupleDataOffset(tuple);
......@@ -458,10 +483,6 @@ brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple)
MemoryContextSwitchTo(oldcxt);
pfree(values);
pfree(allnulls);
pfree(hasnulls);
return dtup;
}
......
......@@ -38,6 +38,11 @@ typedef struct BrinMemTuple
bool bt_placeholder; /* this is a placeholder tuple */
BlockNumber bt_blkno; /* heap blkno that the tuple is for */
MemoryContext bt_context; /* memcxt holding the bt_columns values */
/* output arrays for brin_deform_tuple: */
Datum *bt_values; /* values array */
bool *bt_allnulls; /* allnulls array */
bool *bt_hasnulls; /* hasnulls array */
/* not an output array, but must be last */
BrinValues bt_columns[FLEXIBLE_ARRAY_MEMBER];
} BrinMemTuple;
......@@ -83,14 +88,15 @@ extern BrinTuple *brin_form_tuple(BrinDesc *brdesc, BlockNumber blkno,
extern BrinTuple *brin_form_placeholder_tuple(BrinDesc *brdesc,
BlockNumber blkno, Size *size);
extern void brin_free_tuple(BrinTuple *tuple);
extern BrinTuple *brin_copy_tuple(BrinTuple *tuple, Size len);
extern BrinTuple *brin_copy_tuple(BrinTuple *tuple, Size len,
BrinTuple *dest, Size *destsz);
extern bool brin_tuples_equal(const BrinTuple *a, Size alen,
const BrinTuple *b, Size blen);
extern BrinMemTuple *brin_new_memtuple(BrinDesc *brdesc);
extern void brin_memtuple_initialize(BrinMemTuple *dtuple,
extern BrinMemTuple *brin_memtuple_initialize(BrinMemTuple *dtuple,
BrinDesc *brdesc);
extern BrinMemTuple *brin_deform_tuple(BrinDesc *brdesc,
BrinTuple *tuple);
BrinTuple *tuple, BrinMemTuple *dMemtuple);
#endif /* BRIN_TUPLE_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment