Commit e7032610 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Use a pairing heap for the priority queue in kNN-GiST searches.

This performs slightly better, uses less memory, and needs slightly less
code in GiST, than the Red-Black tree previously used.

Reviewed by Peter Geoghegan
parent 699300a1
......@@ -18,6 +18,7 @@
#include "access/relscan.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "lib/pairingheap.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/rel.h"
......@@ -243,8 +244,6 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
GISTPageOpaque opaque;
OffsetNumber maxoff;
OffsetNumber i;
GISTSearchTreeItem *tmpItem = so->tmpTreeItem;
bool isNew;
MemoryContext oldcxt;
Assert(!GISTSearchItemIsHeap(*pageItem));
......@@ -275,18 +274,15 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
oldcxt = MemoryContextSwitchTo(so->queueCxt);
/* Create new GISTSearchItem for the right sibling index page */
item = palloc(sizeof(GISTSearchItem));
item->next = NULL;
item = palloc(SizeOfGISTSearchItem(scan->numberOfOrderBys));
item->blkno = opaque->rightlink;
item->data.parentlsn = pageItem->data.parentlsn;
/* Insert it into the queue using same distances as for this page */
tmpItem->head = item;
tmpItem->lastHeap = NULL;
memcpy(tmpItem->distances, myDistances,
memcpy(item->distances, myDistances,
sizeof(double) * scan->numberOfOrderBys);
(void) rb_insert(so->queue, (RBNode *) tmpItem, &isNew);
pairingheap_add(so->queue, &item->phNode);
MemoryContextSwitchTo(oldcxt);
}
......@@ -348,8 +344,7 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
oldcxt = MemoryContextSwitchTo(so->queueCxt);
/* Create new GISTSearchItem for this item */
item = palloc(sizeof(GISTSearchItem));
item->next = NULL;
item = palloc(SizeOfGISTSearchItem(scan->numberOfOrderBys));
if (GistPageIsLeaf(page))
{
......@@ -372,12 +367,10 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
}
/* Insert it into the queue using new distance data */
tmpItem->head = item;
tmpItem->lastHeap = GISTSearchItemIsHeap(*item) ? item : NULL;
memcpy(tmpItem->distances, so->distances,
memcpy(item->distances, so->distances,
sizeof(double) * scan->numberOfOrderBys);
(void) rb_insert(so->queue, (RBNode *) tmpItem, &isNew);
pairingheap_add(so->queue, &item->phNode);
MemoryContextSwitchTo(oldcxt);
}
......@@ -390,44 +383,24 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
* Extract next item (in order) from search queue
*
* Returns a GISTSearchItem or NULL. Caller must pfree item when done with it.
*
* NOTE: on successful return, so->curTreeItem is the GISTSearchTreeItem that
* contained the result item. Callers can use so->curTreeItem->distances as
* the distances value for the item.
*/
static GISTSearchItem *
getNextGISTSearchItem(GISTScanOpaque so)
{
for (;;)
{
GISTSearchItem *item;
/* Update curTreeItem if we don't have one */
if (so->curTreeItem == NULL)
{
so->curTreeItem = (GISTSearchTreeItem *) rb_leftmost(so->queue);
/* Done when tree is empty */
if (so->curTreeItem == NULL)
break;
}
GISTSearchItem *item;
item = so->curTreeItem->head;
if (item != NULL)
{
/* Delink item from chain */
so->curTreeItem->head = item->next;
if (item == so->curTreeItem->lastHeap)
so->curTreeItem->lastHeap = NULL;
/* Return item; caller is responsible to pfree it */
return item;
}
/* curTreeItem is exhausted, so remove it from rbtree */
rb_delete(so->queue, (RBNode *) so->curTreeItem);
so->curTreeItem = NULL;
if (!pairingheap_is_empty(so->queue))
{
item = (GISTSearchItem *) pairingheap_remove_first(so->queue);
}
else
{
/* Done when both heaps are empty */
item = NULL;
}
return NULL;
/* Return item; caller is responsible to pfree it */
return item;
}
/*
......@@ -458,7 +431,7 @@ getNextNearest(IndexScanDesc scan)
/* visit an index page, extract its items into queue */
CHECK_FOR_INTERRUPTS();
gistScanPage(scan, item, so->curTreeItem->distances, NULL, NULL);
gistScanPage(scan, item, item->distances, NULL, NULL);
}
pfree(item);
......@@ -491,7 +464,6 @@ gistgettuple(PG_FUNCTION_ARGS)
pgstat_count_index_scan(scan->indexRelation);
so->firstCall = false;
so->curTreeItem = NULL;
so->curPageData = so->nPageData = 0;
fakeItem.blkno = GIST_ROOT_BLKNO;
......@@ -534,7 +506,7 @@ gistgettuple(PG_FUNCTION_ARGS)
* this page, we fall out of the inner "do" and loop around to
* return them.
*/
gistScanPage(scan, item, so->curTreeItem->distances, NULL, NULL);
gistScanPage(scan, item, item->distances, NULL, NULL);
pfree(item);
} while (so->nPageData == 0);
......@@ -560,7 +532,6 @@ gistgetbitmap(PG_FUNCTION_ARGS)
pgstat_count_index_scan(scan->indexRelation);
/* Begin the scan by processing the root page */
so->curTreeItem = NULL;
so->curPageData = so->nPageData = 0;
fakeItem.blkno = GIST_ROOT_BLKNO;
......@@ -580,7 +551,7 @@ gistgetbitmap(PG_FUNCTION_ARGS)
CHECK_FOR_INTERRUPTS();
gistScanPage(scan, item, so->curTreeItem->distances, tbm, &ntids);
gistScanPage(scan, item, item->distances, tbm, &ntids);
pfree(item);
}
......
......@@ -22,14 +22,13 @@
/*
* RBTree support functions for the GISTSearchTreeItem queue
* Pairing heap comparison function for the GISTSearchItem queue
*/
static int
GISTSearchTreeItemComparator(const RBNode *a, const RBNode *b, void *arg)
pairingheap_GISTSearchItem_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
{
const GISTSearchTreeItem *sa = (const GISTSearchTreeItem *) a;
const GISTSearchTreeItem *sb = (const GISTSearchTreeItem *) b;
const GISTSearchItem *sa = (const GISTSearchItem *) a;
const GISTSearchItem *sb = (const GISTSearchItem *) b;
IndexScanDesc scan = (IndexScanDesc) arg;
int i;
......@@ -37,59 +36,16 @@ GISTSearchTreeItemComparator(const RBNode *a, const RBNode *b, void *arg)
for (i = 0; i < scan->numberOfOrderBys; i++)
{
if (sa->distances[i] != sb->distances[i])
return (sa->distances[i] > sb->distances[i]) ? 1 : -1;
}
return 0;
}
static void
GISTSearchTreeItemCombiner(RBNode *existing, const RBNode *newrb, void *arg)
{
GISTSearchTreeItem *scurrent = (GISTSearchTreeItem *) existing;
const GISTSearchTreeItem *snew = (const GISTSearchTreeItem *) newrb;
GISTSearchItem *newitem = snew->head;
/* snew should have just one item in its chain */
Assert(newitem && newitem->next == NULL);
/*
* If new item is heap tuple, it goes to front of chain; otherwise insert
* it before the first index-page item, so that index pages are visited in
* LIFO order, ensuring depth-first search of index pages. See comments
* in gist_private.h.
*/
if (GISTSearchItemIsHeap(*newitem))
{
newitem->next = scurrent->head;
scurrent->head = newitem;
if (scurrent->lastHeap == NULL)
scurrent->lastHeap = newitem;
return (sa->distances[i] < sb->distances[i]) ? 1 : -1;
}
else if (scurrent->lastHeap == NULL)
{
newitem->next = scurrent->head;
scurrent->head = newitem;
}
else
{
newitem->next = scurrent->lastHeap->next;
scurrent->lastHeap->next = newitem;
}
}
static RBNode *
GISTSearchTreeItemAllocator(void *arg)
{
IndexScanDesc scan = (IndexScanDesc) arg;
return palloc(GSTIHDRSZ + sizeof(double) * scan->numberOfOrderBys);
}
/* Heap items go before inner pages, to ensure a depth-first search */
if (GISTSearchItemIsHeap(*sa) && !GISTSearchItemIsHeap(*sb))
return -1;
if (!GISTSearchItemIsHeap(*sa) && GISTSearchItemIsHeap(*sb))
return 1;
static void
GISTSearchTreeItemDeleter(RBNode *rb, void *arg)
{
pfree(rb);
return 0;
}
......@@ -127,7 +83,6 @@ gistbeginscan(PG_FUNCTION_ARGS)
so->queueCxt = giststate->scanCxt; /* see gistrescan */
/* workspaces with size dependent on numberOfOrderBys: */
so->tmpTreeItem = palloc(GSTIHDRSZ + sizeof(double) * scan->numberOfOrderBys);
so->distances = palloc(sizeof(double) * scan->numberOfOrderBys);
so->qual_ok = true; /* in case there are zero keys */
......@@ -188,15 +143,9 @@ gistrescan(PG_FUNCTION_ARGS)
/* create new, empty RBTree for search queue */
oldCxt = MemoryContextSwitchTo(so->queueCxt);
so->queue = rb_create(GSTIHDRSZ + sizeof(double) * scan->numberOfOrderBys,
GISTSearchTreeItemComparator,
GISTSearchTreeItemCombiner,
GISTSearchTreeItemAllocator,
GISTSearchTreeItemDeleter,
scan);
so->queue = pairingheap_allocate(pairingheap_GISTSearchItem_cmp, scan);
MemoryContextSwitchTo(oldCxt);
so->curTreeItem = NULL;
so->firstCall = true;
/* Update scan key, if a new one is given */
......
......@@ -12,6 +12,6 @@ subdir = src/backend/lib
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
OBJS = ilist.o binaryheap.o stringinfo.o
OBJS = ilist.o binaryheap.o pairingheap.o stringinfo.o
include $(top_srcdir)/src/backend/common.mk
This directory contains a general purpose data structures, for use anywhere
in the backend:
binaryheap.c - a binary heap
pairingheap.c - a pairing heap
ilist.c - single and double-linked lists.
stringinfo.c - an extensible string type
Aside from the inherent characteristics of the data structures, there are a
few practical differences between the binary heap and the pairing heap. The
binary heap is fully allocated at creation, and cannot be expanded beyond the
allocated size. The pairing heap on the other hand has no inherent maximum
size, but the caller needs to allocate each element being stored in the heap,
while the binary heap works with plain Datums or pointers.
The linked-lists in ilist.c can be embedded directly into other structs, as
opposed to the List interface in nodes/pg_list.h.
In addition to these, there is an implementation of a Red-Black tree in
src/backend/utils/adt/rbtree.c.
/*-------------------------------------------------------------------------
*
* pairingheap.c
* A Pairing Heap implementation
*
* A pairing heap is a data structure that's useful for implementing
* priority queues. It is simple to implement, and provides amortized O(1)
* insert and find-min operations, and amortized O(log n) delete-min.
*
* The pairing heap was first described in this paper:
*
* Michael L. Fredman, Robert Sedgewick, Daniel D. Sleator, and Robert E.
* Tarjan. 1986.
* The pairing heap: a new form of self-adjusting heap.
* Algorithmica 1, 1 (January 1986), pages 111-129. DOI: 10.1007/BF01840439
*
* Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/lib/pairingheap.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "lib/pairingheap.h"
static pairingheap_node *merge(pairingheap *heap, pairingheap_node *a,
pairingheap_node *b);
static pairingheap_node *merge_children(pairingheap *heap,
pairingheap_node *children);
/*
* pairingheap_allocate
*
* Returns a pointer to a newly-allocated heap, with the heap property defined
* by the given comparator function, which will be invoked with the additional
* argument specified by 'arg'.
*/
pairingheap *
pairingheap_allocate(pairingheap_comparator compare, void *arg)
{
pairingheap *heap;
heap = (pairingheap *) palloc(sizeof(pairingheap));
heap->ph_compare = compare;
heap->ph_arg = arg;
heap->ph_root = NULL;
return heap;
}
/*
* pairingheap_free
*
* Releases memory used by the given pairingheap.
*
* Note: The nodes in the heap are not freed!
*/
void
pairingheap_free(pairingheap *heap)
{
pfree(heap);
}
/*
* A helper function to merge two subheaps into one.
*
* The subheap with smaller value is put as a child of the other one (assuming
* a max-heap).
*/
static pairingheap_node *
merge(pairingheap *heap, pairingheap_node *a, pairingheap_node *b)
{
if (a == NULL)
return b;
if (b == NULL)
return a;
/* swap 'a' and 'b' so that 'a' is the one with larger value */
if (heap->ph_compare(a, b, heap->ph_arg) < 0)
{
pairingheap_node *tmp;
tmp = a;
a = b;
b = tmp;
}
/* and put 'b' as a child of 'a' */
if (a->first_child)
a->first_child->prev_or_parent = b;
b->prev_or_parent = a;
b->next_sibling = a->first_child;
a->first_child = b;
return a;
}
/*
* pairingheap_add
*
* Adds the given node to the heap in O(1) time.
*/
void
pairingheap_add(pairingheap *heap, pairingheap_node *node)
{
node->first_child = NULL;
/* Link the new node as a new tree */
heap->ph_root = merge(heap, heap->ph_root, node);
}
/*
* pairingheap_first
*
* Returns a pointer to the first (root, topmost) node in the heap without
* modifying the heap. The caller must ensure that this routine is not used on
* an empty heap. Always O(1).
*/
pairingheap_node *
pairingheap_first(pairingheap *heap)
{
Assert(!pairingheap_is_empty(heap));
return heap->ph_root;
}
/*
* pairingheap_remove_first
*
* Removes the first (root, topmost) node in the heap and returns a pointer to
* it after rebalancing the heap. The caller must ensure that this routine is
* not used on an empty heap. O(log n) amortized.
*/
pairingheap_node *
pairingheap_remove_first(pairingheap *heap)
{
pairingheap_node *result;
pairingheap_node *children;
Assert(!pairingheap_is_empty(heap));
/* Remove the root, and form a new heap of its children. */
result = heap->ph_root;
children = result->first_child;
heap->ph_root = merge_children(heap, children);
return result;
}
/*
* Remove 'node' from the heap. O(log n) amortized.
*/
void
pairingheap_remove(pairingheap *heap, pairingheap_node *node)
{
pairingheap_node *children;
pairingheap_node *replacement;
pairingheap_node *next_sibling;
pairingheap_node **prev_ptr;
/*
* If the removed node happens to be the root node, do it with
* pairingheap_remove_first().
*/
if (node == heap->ph_root)
{
(void) pairingheap_remove_first(heap);
return;
}
/*
* Before we modify anything, remember the removed node's first_child and
* next_sibling pointers.
*/
children = node->first_child;
next_sibling = node->next_sibling;
/*
* Also find the pointer to the removed node in its previous sibling, or
* if this is the first child of its parent, in its parent.
*/
if (node->prev_or_parent->first_child == node)
prev_ptr = &node->prev_or_parent->first_child;
else
prev_ptr = &node->prev_or_parent->next_sibling;
Assert(*prev_ptr == node);
/*
* If this node has children, make a new subheap of the children and link
* the subheap in place of the removed node. Otherwise just unlink this
* node.
*/
if (children)
{
replacement = merge_children(heap, children);
replacement->prev_or_parent = node->prev_or_parent;
replacement->next_sibling = node->next_sibling;
*prev_ptr = replacement;
if (next_sibling)
next_sibling->prev_or_parent = replacement;
}
else
{
*prev_ptr = next_sibling;
if (next_sibling)
next_sibling->prev_or_parent = node->prev_or_parent;
}
}
/*
* Merge a list of subheaps into a single heap.
*
* This implements the basic two-pass merging strategy, first forming pairs
* from left to right, and then merging the pairs.
*/
static pairingheap_node *
merge_children(pairingheap *heap, pairingheap_node *children)
{
pairingheap_node *curr,
*next;
pairingheap_node *pairs;
pairingheap_node *newroot;
if (children == NULL || children->next_sibling == NULL)
return children;
/* Walk the subheaps from left to right, merging in pairs */
next = children;
pairs = NULL;
for (;;)
{
curr = next;
if (curr == NULL)
break;
if (curr->next_sibling == NULL)
{
/* last odd node at the end of list */
curr->next_sibling = pairs;
pairs = curr;
break;
}
next = curr->next_sibling->next_sibling;
/* merge this and the next subheap, and add to 'pairs' list. */
curr = merge(heap, curr, curr->next_sibling);
curr->next_sibling = pairs;
pairs = curr;
}
/*
* Merge all the pairs together to form a single heap.
*/
newroot = pairs;
next = pairs->next_sibling;
while (next)
{
curr = next;
next = curr->next_sibling;
newroot = merge(heap, newroot, curr);
}
return newroot;
}
......@@ -18,9 +18,9 @@
#include "access/itup.h"
#include "access/xlogreader.h"
#include "fmgr.h"
#include "lib/pairingheap.h"
#include "storage/bufmgr.h"
#include "storage/buffile.h"
#include "utils/rbtree.h"
#include "utils/hsearch.h"
/*
......@@ -123,7 +123,7 @@ typedef struct GISTSearchHeapItem
/* Unvisited item, either index page or heap tuple */
typedef struct GISTSearchItem
{
struct GISTSearchItem *next; /* list link */
pairingheap_node phNode;
BlockNumber blkno; /* index page number, or InvalidBlockNumber */
union
{
......@@ -131,24 +131,12 @@ typedef struct GISTSearchItem
/* we must store parentlsn to detect whether a split occurred */
GISTSearchHeapItem heap; /* heap info, if heap tuple */
} data;
double distances[1]; /* array with numberOfOrderBys entries */
} GISTSearchItem;
#define GISTSearchItemIsHeap(item) ((item).blkno == InvalidBlockNumber)
/*
* Within a GISTSearchTreeItem's chain, heap items always appear before
* index-page items, since we want to visit heap items first. lastHeap points
* to the last heap item in the chain, or is NULL if there are none.
*/
typedef struct GISTSearchTreeItem
{
RBNode rbnode; /* this is an RBTree item */
GISTSearchItem *head; /* first chain member */
GISTSearchItem *lastHeap; /* last heap-tuple member, if any */
double distances[1]; /* array with numberOfOrderBys entries */
} GISTSearchTreeItem;
#define GSTIHDRSZ offsetof(GISTSearchTreeItem, distances)
#define SizeOfGISTSearchItem(n_distances) (offsetof(GISTSearchItem, distances) + sizeof(double) * (n_distances))
/*
* GISTScanOpaqueData: private state for a scan of a GiST index
......@@ -156,15 +144,12 @@ typedef struct GISTSearchTreeItem
typedef struct GISTScanOpaqueData
{
GISTSTATE *giststate; /* index information, see above */
RBTree *queue; /* queue of unvisited items */
pairingheap *queue; /* queue of unvisited items */
MemoryContext queueCxt; /* context holding the queue */
bool qual_ok; /* false if qual can never be satisfied */
bool firstCall; /* true until first gistgettuple call */
GISTSearchTreeItem *curTreeItem; /* current queue item, if any */
/* pre-allocated workspace arrays */
GISTSearchTreeItem *tmpTreeItem; /* workspace to pass to rb_insert */
double *distances; /* output area for gistindex_keytest */
/* In a non-ordered search, returnable heap items are stored here: */
......
/*
* pairingheap.h
*
* A Pairing Heap implementation
*
* Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group
*
* src/include/lib/pairingheap.h
*/
#ifndef PAIRINGHEAP_H
#define PAIRINGHEAP_H
/*
* This represents an element stored in the heap. Embed this in a larger
* struct containing the actual data you're storing.
*
* A node can have multiple children, which form a double-linked list.
* first_child points to the node's first child, and the subsequent children
* can be found by following the next_sibling pointers. The last child has
* next_sibling == NULL. The prev_or_parent pointer points to the node's
* previous sibling, or if the node is its parent's first child, to the
* parent.
*/
typedef struct pairingheap_node
{
struct pairingheap_node *first_child;
struct pairingheap_node *next_sibling;
struct pairingheap_node *prev_or_parent;
} pairingheap_node;
/*
* For a max-heap, the comparator must return <0 iff a < b, 0 iff a == b,
* and >0 iff a > b. For a min-heap, the conditions are reversed.
*/
typedef int (*pairingheap_comparator) (const pairingheap_node *a,
const pairingheap_node *b,
void *arg);
/*
* A pairing heap.
*
* You can use pairingheap_allocate() to create a new palloc'd heap, or embed
* this in a larger struct, set ph_compare and ph_arg directly and initialize
* ph_root to NULL.
*/
typedef struct pairingheap
{
pairingheap_comparator ph_compare; /* comparison function */
void *ph_arg; /* opaque argument to ph_compare */
pairingheap_node *ph_root; /* current root of the heap */
} pairingheap;
extern pairingheap *pairingheap_allocate(pairingheap_comparator compare,
void *arg);
extern void pairingheap_free(pairingheap *heap);
extern void pairingheap_add(pairingheap *heap, pairingheap_node *node);
extern pairingheap_node *pairingheap_first(pairingheap *heap);
extern pairingheap_node *pairingheap_remove_first(pairingheap *heap);
extern void pairingheap_remove(pairingheap *heap, pairingheap_node *node);
/* Resets the heap to be empty. */
#define pairingheap_reset(h) ((h)->ph_root = NULL)
/* Is the heap empty? */
#define pairingheap_is_empty(h) ((h)->ph_root == NULL)
/* Is there exactly one node in the heap? */
#define pairingheap_is_singular(h) \
((h)->ph_root && (h)->ph_root->first_child == NULL)
#endif /* PAIRINGHEAP_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment