Commit 94d626ff authored by Alvaro Herrera's avatar Alvaro Herrera

Use materialize SRF mode in brin_page_items

This function was using the single-value-per-call mechanism, but the
code relied on a relcache entry that wasn't kept open across calls.
This manifested as weird errors in buildfarm during the short time that
the "brin-1" isolation test lived.

Backpatch to 9.5, where it was introduced.
parent 36e863bb
...@@ -37,18 +37,6 @@ typedef struct brin_column_state ...@@ -37,18 +37,6 @@ typedef struct brin_column_state
FmgrInfo outputFn[FLEXIBLE_ARRAY_MEMBER]; FmgrInfo outputFn[FLEXIBLE_ARRAY_MEMBER];
} brin_column_state; } brin_column_state;
typedef struct brin_page_state
{
BrinDesc *bdesc;
Page page;
OffsetNumber offset;
bool unusedItem;
bool done;
AttrNumber attno;
BrinMemTuple *dtup;
brin_column_state *columns[FLEXIBLE_ARRAY_MEMBER];
} brin_page_state;
static Page verify_brin_page(bytea *raw_page, uint16 type, static Page verify_brin_page(bytea *raw_page, uint16 type,
const char *strtype); const char *strtype);
...@@ -119,54 +107,63 @@ verify_brin_page(bytea *raw_page, uint16 type, const char *strtype) ...@@ -119,54 +107,63 @@ verify_brin_page(bytea *raw_page, uint16 type, const char *strtype)
Datum Datum
brin_page_items(PG_FUNCTION_ARGS) brin_page_items(PG_FUNCTION_ARGS)
{ {
brin_page_state *state;
FuncCallContext *fctx;
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be superuser to use raw page functions"))));
if (SRF_IS_FIRSTCALL())
{
bytea *raw_page = PG_GETARG_BYTEA_P(0); bytea *raw_page = PG_GETARG_BYTEA_P(0);
Oid indexRelid = PG_GETARG_OID(1); Oid indexRelid = PG_GETARG_OID(1);
Page page; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc; TupleDesc tupdesc;
MemoryContext mctx; MemoryContext oldcontext;
Tuplestorestate *tupstore;
Relation indexRel; Relation indexRel;
brin_column_state **columns;
BrinDesc *bdesc;
BrinMemTuple *dtup;
Page page;
OffsetNumber offset;
AttrNumber attno; AttrNumber attno;
bool unusedItem;
/* minimally verify the page we got */ if (!superuser())
page = verify_brin_page(raw_page, BRIN_PAGETYPE_REGULAR, "regular"); ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
/* create a function context for cross-call persistence */ (errmsg("must be superuser to use raw page functions"))));
fctx = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */ /* check to see if caller supports us returning a tuplestore */
mctx = MemoryContextSwitchTo(fctx->multi_call_memory_ctx); if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
if (!(rsinfo->allowedModes & SFRM_Materialize) ||
rsinfo->expectedDesc == NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not allowed in this context")));
/* Build a tuple descriptor for our result type */ /* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type"); elog(ERROR, "return type must be a row type");
indexRel = index_open(indexRelid, AccessShareLock); /* Build tuplestore to hold the result rows */
oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
state = palloc(offsetof(brin_page_state, columns) + MemoryContextSwitchTo(oldcontext);
sizeof(brin_column_state) * RelationGetDescr(indexRel)->natts);
state->bdesc = brin_build_desc(indexRel); indexRel = index_open(indexRelid, AccessShareLock);
state->page = page; bdesc = brin_build_desc(indexRel);
state->offset = FirstOffsetNumber;
state->unusedItem = false; /* minimally verify the page we got */
state->done = false; page = verify_brin_page(raw_page, BRIN_PAGETYPE_REGULAR, "regular");
state->dtup = NULL;
/* /*
* Initialize output functions for all indexed datatypes; simplifies * Initialize output functions for all indexed datatypes; simplifies
* calling them later. * calling them later.
*/ */
for (attno = 1; attno <= state->bdesc->bd_tupdesc->natts; attno++) columns = palloc(sizeof(brin_column_state *) * RelationGetDescr(indexRel)->natts);
for (attno = 1; attno <= bdesc->bd_tupdesc->natts; attno++)
{ {
Oid output; Oid output;
bool isVarlena; bool isVarlena;
...@@ -174,7 +171,7 @@ brin_page_items(PG_FUNCTION_ARGS) ...@@ -174,7 +171,7 @@ brin_page_items(PG_FUNCTION_ARGS)
int i; int i;
brin_column_state *column; brin_column_state *column;
opcinfo = state->bdesc->bd_info[attno - 1]; opcinfo = bdesc->bd_info[attno - 1];
column = palloc(offsetof(brin_column_state, outputFn) + column = palloc(offsetof(brin_column_state, outputFn) +
sizeof(FmgrInfo) * opcinfo->oi_nstored); sizeof(FmgrInfo) * opcinfo->oi_nstored);
...@@ -185,23 +182,14 @@ brin_page_items(PG_FUNCTION_ARGS) ...@@ -185,23 +182,14 @@ brin_page_items(PG_FUNCTION_ARGS)
fmgr_info(output, &column->outputFn[i]); fmgr_info(output, &column->outputFn[i]);
} }
state->columns[attno - 1] = column; columns[attno - 1] = column;
} }
index_close(indexRel, AccessShareLock); offset = FirstOffsetNumber;
unusedItem = false;
fctx->user_fctx = state; dtup = NULL;
fctx->tuple_desc = BlessTupleDesc(tupdesc); for (;;)
MemoryContextSwitchTo(mctx);
}
fctx = SRF_PERCALL_SETUP();
state = fctx->user_fctx;
if (!state->done)
{ {
HeapTuple result;
Datum values[7]; Datum values[7];
bool nulls[7]; bool nulls[7];
...@@ -211,39 +199,30 @@ brin_page_items(PG_FUNCTION_ARGS) ...@@ -211,39 +199,30 @@ brin_page_items(PG_FUNCTION_ARGS)
* signal for obtaining and decoding the next one. If that's not the * signal for obtaining and decoding the next one. If that's not the
* case, we output the next attribute. * case, we output the next attribute.
*/ */
if (state->dtup == NULL) if (dtup == NULL)
{ {
BrinTuple *tup;
MemoryContext mctx;
ItemId itemId; ItemId itemId;
/* deformed tuple must live across calls */
mctx = MemoryContextSwitchTo(fctx->multi_call_memory_ctx);
/* verify item status: if there's no data, we can't decode */ /* verify item status: if there's no data, we can't decode */
itemId = PageGetItemId(state->page, state->offset); itemId = PageGetItemId(page, offset);
if (ItemIdIsUsed(itemId)) if (ItemIdIsUsed(itemId))
{ {
tup = (BrinTuple *) PageGetItem(state->page, dtup = brin_deform_tuple(bdesc,
PageGetItemId(state->page, (BrinTuple *) PageGetItem(page, itemId));
state->offset)); attno = 1;
state->dtup = brin_deform_tuple(state->bdesc, tup); unusedItem = false;
state->attno = 1;
state->unusedItem = false;
} }
else else
state->unusedItem = true; unusedItem = true;
MemoryContextSwitchTo(mctx);
} }
else else
state->attno++; attno++;
MemSet(nulls, 0, sizeof(nulls)); MemSet(nulls, 0, sizeof(nulls));
if (state->unusedItem) if (unusedItem)
{ {
values[0] = UInt16GetDatum(state->offset); values[0] = UInt16GetDatum(offset);
nulls[1] = true; nulls[1] = true;
nulls[2] = true; nulls[2] = true;
nulls[3] = true; nulls[3] = true;
...@@ -253,17 +232,17 @@ brin_page_items(PG_FUNCTION_ARGS) ...@@ -253,17 +232,17 @@ brin_page_items(PG_FUNCTION_ARGS)
} }
else else
{ {
int att = state->attno - 1; int att = attno - 1;
values[0] = UInt16GetDatum(state->offset); values[0] = UInt16GetDatum(offset);
values[1] = UInt32GetDatum(state->dtup->bt_blkno); values[1] = UInt32GetDatum(dtup->bt_blkno);
values[2] = UInt16GetDatum(state->attno); values[2] = UInt16GetDatum(attno);
values[3] = BoolGetDatum(state->dtup->bt_columns[att].bv_allnulls); values[3] = BoolGetDatum(dtup->bt_columns[att].bv_allnulls);
values[4] = BoolGetDatum(state->dtup->bt_columns[att].bv_hasnulls); values[4] = BoolGetDatum(dtup->bt_columns[att].bv_hasnulls);
values[5] = BoolGetDatum(state->dtup->bt_placeholder); values[5] = BoolGetDatum(dtup->bt_placeholder);
if (!state->dtup->bt_columns[att].bv_allnulls) if (!dtup->bt_columns[att].bv_allnulls)
{ {
BrinValues *bvalues = &state->dtup->bt_columns[att]; BrinValues *bvalues = &dtup->bt_columns[att];
StringInfoData s; StringInfoData s;
bool first; bool first;
int i; int i;
...@@ -272,14 +251,14 @@ brin_page_items(PG_FUNCTION_ARGS) ...@@ -272,14 +251,14 @@ brin_page_items(PG_FUNCTION_ARGS)
appendStringInfoChar(&s, '{'); appendStringInfoChar(&s, '{');
first = true; first = true;
for (i = 0; i < state->columns[att]->nstored; i++) for (i = 0; i < columns[att]->nstored; i++)
{ {
char *val; char *val;
if (!first) if (!first)
appendStringInfoString(&s, " .. "); appendStringInfoString(&s, " .. ");
first = false; first = false;
val = OutputFunctionCall(&state->columns[att]->outputFn[i], val = OutputFunctionCall(&columns[att]->outputFn[i],
bvalues->bv_values[i]); bvalues->bv_values[i]);
appendStringInfoString(&s, val); appendStringInfoString(&s, val);
pfree(val); pfree(val);
...@@ -295,35 +274,35 @@ brin_page_items(PG_FUNCTION_ARGS) ...@@ -295,35 +274,35 @@ brin_page_items(PG_FUNCTION_ARGS)
} }
} }
result = heap_form_tuple(fctx->tuple_desc, values, nulls); tuplestore_putvalues(tupstore, tupdesc, values, nulls);
/* /*
* If the item was unused, jump straight to the next one; otherwise, * If the item was unused, jump straight to the next one; otherwise,
* the only cleanup needed here is to set our signal to go to the next * the only cleanup needed here is to set our signal to go to the next
* tuple in the following iteration, by freeing the current one. * tuple in the following iteration, by freeing the current one.
*/ */
if (state->unusedItem) if (unusedItem)
state->offset = OffsetNumberNext(state->offset); offset = OffsetNumberNext(offset);
else if (state->attno >= state->bdesc->bd_tupdesc->natts) else if (attno >= bdesc->bd_tupdesc->natts)
{ {
pfree(state->dtup); pfree(dtup);
state->dtup = NULL; dtup = NULL;
state->offset = OffsetNumberNext(state->offset); offset = OffsetNumberNext(offset);
} }
/* /*
* If we're beyond the end of the page, set flag to end the function * If we're beyond the end of the page, we're done.
* in the following iteration.
*/ */
if (state->offset > PageGetMaxOffsetNumber(state->page)) if (offset > PageGetMaxOffsetNumber(page))
state->done = true; break;
SRF_RETURN_NEXT(fctx, HeapTupleGetDatum(result));
} }
brin_free_desc(state->bdesc); /* clean up and return the tuplestore */
brin_free_desc(bdesc);
tuplestore_donestoring(tupstore);
index_close(indexRel, AccessShareLock);
SRF_RETURN_DONE(fctx); return (Datum) 0;
} }
Datum Datum
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment