Commit d2ddee63 authored by Tom Lane's avatar Tom Lane

Improve SP-GiST opclass API to better support unlabeled nodes.

Previously, the spgSplitTuple action could only create a new upper tuple
containing a single labeled node.  This made it useless for opclasses
that prefer to work with fixed sets of nodes (labeled or otherwise),
which meant that restrictive prefixes could not be used with such
node definitions.  Change the output field set for the choose() method
to allow it to specify any valid node set for the new upper tuple,
and to specify which of these nodes to place the modified lower tuple in.

In addition to its primary use for fixed node sets, this feature could
allow existing opclasses that use variable node sets to skip a separate
spgAddNode action when splitting a tuple, by setting up the node needed
for the incoming value as part of the spgSplitTuple action.  However, care
would have to be taken to add the extra node only when it would not make
the tuple bigger than before.  (spgAddNode can enlarge the tuple,
spgSplitTuple can't.)

This is a prerequisite for an upcoming SP-GiST inet opclass, but is
being committed separately to increase the visibility of the API change.

In passing, improve the documentation about the traverse-values feature
that was added by commit ccd6eb49.

Emre Hasegeli, with cosmetic adjustments and documentation rework by me

Discussion: <CAE2gYzxtth9qatW_OAqdOjykS0bxq7AYHLuyAQLPgT7H9ZU0Cw@mail.gmail.com>
parent 86f31695
This diff is collapsed.
...@@ -1705,17 +1705,40 @@ spgSplitNodeAction(Relation index, SpGistState *state, ...@@ -1705,17 +1705,40 @@ spgSplitNodeAction(Relation index, SpGistState *state,
/* Should not be applied to nulls */ /* Should not be applied to nulls */
Assert(!SpGistPageStoresNulls(current->page)); Assert(!SpGistPageStoresNulls(current->page));
/* Check opclass gave us sane values */
if (out->result.splitTuple.prefixNNodes <= 0 ||
out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
elog(ERROR, "invalid number of prefix nodes: %d",
out->result.splitTuple.prefixNNodes);
if (out->result.splitTuple.childNodeN < 0 ||
out->result.splitTuple.childNodeN >=
out->result.splitTuple.prefixNNodes)
elog(ERROR, "invalid child node number: %d",
out->result.splitTuple.childNodeN);
/* /*
* Construct new prefix tuple, containing a single node with the specified * Construct new prefix tuple with requested number of nodes. We'll fill
* label. (We'll update the node's downlink to point to the new postfix * in the childNodeN'th node's downlink below.
* tuple, below.)
*/ */
node = spgFormNodeTuple(state, out->result.splitTuple.nodeLabel, false); nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
out->result.splitTuple.prefixNNodes);
for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
{
Datum label = (Datum) 0;
bool labelisnull;
labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
if (!labelisnull)
label = out->result.splitTuple.prefixNodeLabels[i];
nodes[i] = spgFormNodeTuple(state, label, labelisnull);
}
prefixTuple = spgFormInnerTuple(state, prefixTuple = spgFormInnerTuple(state,
out->result.splitTuple.prefixHasPrefix, out->result.splitTuple.prefixHasPrefix,
out->result.splitTuple.prefixPrefixDatum, out->result.splitTuple.prefixPrefixDatum,
1, &node); out->result.splitTuple.prefixNNodes,
nodes);
/* it must fit in the space that innerTuple now occupies */ /* it must fit in the space that innerTuple now occupies */
if (prefixTuple->size > innerTuple->size) if (prefixTuple->size > innerTuple->size)
...@@ -1807,10 +1830,12 @@ spgSplitNodeAction(Relation index, SpGistState *state, ...@@ -1807,10 +1830,12 @@ spgSplitNodeAction(Relation index, SpGistState *state,
* the postfix tuple first.) We have to update the local copy of the * the postfix tuple first.) We have to update the local copy of the
* prefixTuple too, because that's what will be written to WAL. * prefixTuple too, because that's what will be written to WAL.
*/ */
spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset); spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
postfixBlkno, postfixOffset);
prefixTuple = (SpGistInnerTuple) PageGetItem(current->page, prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
PageGetItemId(current->page, current->offnum)); PageGetItemId(current->page, current->offnum));
spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset); spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
postfixBlkno, postfixOffset);
MarkBufferDirty(current->buffer); MarkBufferDirty(current->buffer);
......
...@@ -212,9 +212,14 @@ spg_text_choose(PG_FUNCTION_ARGS) ...@@ -212,9 +212,14 @@ spg_text_choose(PG_FUNCTION_ARGS)
out->result.splitTuple.prefixPrefixDatum = out->result.splitTuple.prefixPrefixDatum =
formTextDatum(prefixStr, commonLen); formTextDatum(prefixStr, commonLen);
} }
out->result.splitTuple.nodeLabel = out->result.splitTuple.prefixNNodes = 1;
out->result.splitTuple.prefixNodeLabels =
(Datum *) palloc(sizeof(Datum));
out->result.splitTuple.prefixNodeLabels[0] =
Int16GetDatum(*(unsigned char *) (prefixStr + commonLen)); Int16GetDatum(*(unsigned char *) (prefixStr + commonLen));
out->result.splitTuple.childNodeN = 0;
if (prefixSize - commonLen == 1) if (prefixSize - commonLen == 1)
{ {
out->result.splitTuple.postfixHasPrefix = false; out->result.splitTuple.postfixHasPrefix = false;
...@@ -280,7 +285,10 @@ spg_text_choose(PG_FUNCTION_ARGS) ...@@ -280,7 +285,10 @@ spg_text_choose(PG_FUNCTION_ARGS)
out->resultType = spgSplitTuple; out->resultType = spgSplitTuple;
out->result.splitTuple.prefixHasPrefix = in->hasPrefix; out->result.splitTuple.prefixHasPrefix = in->hasPrefix;
out->result.splitTuple.prefixPrefixDatum = in->prefixDatum; out->result.splitTuple.prefixPrefixDatum = in->prefixDatum;
out->result.splitTuple.nodeLabel = Int16GetDatum(-2); out->result.splitTuple.prefixNNodes = 1;
out->result.splitTuple.prefixNodeLabels = (Datum *) palloc(sizeof(Datum));
out->result.splitTuple.prefixNodeLabels[0] = Int16GetDatum(-2);
out->result.splitTuple.childNodeN = 0;
out->result.splitTuple.postfixHasPrefix = false; out->result.splitTuple.postfixHasPrefix = false;
} }
else else
......
...@@ -90,10 +90,13 @@ typedef struct spgChooseOut ...@@ -90,10 +90,13 @@ typedef struct spgChooseOut
} addNode; } addNode;
struct /* results for spgSplitTuple */ struct /* results for spgSplitTuple */
{ {
/* Info to form new inner tuple with one node */ /* Info to form new upper-level inner tuple with one child tuple */
bool prefixHasPrefix; /* tuple should have a prefix? */ bool prefixHasPrefix; /* tuple should have a prefix? */
Datum prefixPrefixDatum; /* if so, its value */ Datum prefixPrefixDatum; /* if so, its value */
Datum nodeLabel; /* node's label */ int prefixNNodes; /* number of nodes */
Datum *prefixNodeLabels; /* their labels (or NULL for
* no labels) */
int childNodeN; /* which node gets child tuple */
/* Info to form new lower-level inner tuple with all old nodes */ /* Info to form new lower-level inner tuple with all old nodes */
bool postfixHasPrefix; /* tuple should have a prefix? */ bool postfixHasPrefix; /* tuple should have a prefix? */
...@@ -134,7 +137,8 @@ typedef struct spgInnerConsistentIn ...@@ -134,7 +137,8 @@ typedef struct spgInnerConsistentIn
Datum reconstructedValue; /* value reconstructed at parent */ Datum reconstructedValue; /* value reconstructed at parent */
void *traversalValue; /* opclass-specific traverse value */ void *traversalValue; /* opclass-specific traverse value */
MemoryContext traversalMemoryContext; MemoryContext traversalMemoryContext; /* put new traverse values
* here */
int level; /* current level (counting from zero) */ int level; /* current level (counting from zero) */
bool returnData; /* original data must be returned? */ bool returnData; /* original data must be returned? */
...@@ -163,8 +167,8 @@ typedef struct spgLeafConsistentIn ...@@ -163,8 +167,8 @@ typedef struct spgLeafConsistentIn
ScanKey scankeys; /* array of operators and comparison values */ ScanKey scankeys; /* array of operators and comparison values */
int nkeys; /* length of array */ int nkeys; /* length of array */
void *traversalValue; /* opclass-specific traverse value */
Datum reconstructedValue; /* value reconstructed at parent */ Datum reconstructedValue; /* value reconstructed at parent */
void *traversalValue; /* opclass-specific traverse value */
int level; /* current level (counting from zero) */ int level; /* current level (counting from zero) */
bool returnData; /* original data must be returned? */ bool returnData; /* original data must be returned? */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment