Commit 0acbed60 authored by Paras Garg's avatar Paras Garg

insertion and merging

parent 94f282b1
......@@ -4,7 +4,8 @@
"name": "Linux",
"includePath": [
"${workspaceFolder}/**",
"/home/paras/git/postgres/install/include/server"
"/home/paras/postgres13/install/include/server",
"/home/paras/postgres13/src/"
],
"defines": [],
"compilerPath": "/usr/bin/gcc",
......
2020-12-14 01:01:11.215 IST [53734] FATAL: lock file "postmaster.pid" already exists
2020-12-14 01:01:11.215 IST [53734] HINT: Is another postmaster (PID 2917) running in data directory "/home/paras/postgres13/install/data"?
2020-12-14 01:16:10.828 IST [54122] LOG: starting PostgreSQL 13.0 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0, 64-bit
2020-12-14 01:16:10.828 IST [54122] LOG: listening on IPv4 address "127.0.0.1", port 5432
2020-12-14 01:16:10.926 IST [54122] LOG: listening on Unix socket "/tmp/.s.PGSQL.5432"
2020-12-14 01:16:11.072 IST [54123] LOG: database system was shut down at 2020-12-14 01:16:07 IST
2020-12-14 01:16:11.152 IST [54122] LOG: database system is ready to accept connections
2020-12-14 01:18:04.122 IST [54131] ERROR: syntax error at or near "enalyse" at character 1
2020-12-14 01:18:04.122 IST [54131] STATEMENT: enalyse
;
2020-12-14 01:18:11.225 IST [54131] ERROR: syntax error at or near ";" at character 9
2020-12-14 01:18:11.225 IST [54131] STATEMENT: EXPLAIN ;
2020-12-14 01:19:36.421 IST [54122] LOG: server process (PID 54131) was terminated by signal 11: Segmentation fault
2020-12-14 01:19:36.421 IST [54122] DETAIL: Failed process was running: select * from t where k=3;
2020-12-14 01:19:36.421 IST [54122] LOG: terminating any other active server processes
2020-12-14 01:19:36.421 IST [54127] WARNING: terminating connection because of crash of another server process
2020-12-14 01:19:36.421 IST [54127] DETAIL: The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
2020-12-14 01:19:36.421 IST [54127] HINT: In a moment you should be able to reconnect to the database and repeat your command.
2020-12-14 01:19:36.423 IST [54162] FATAL: the database system is in recovery mode
2020-12-14 01:19:36.424 IST [54122] LOG: all server processes terminated; reinitializing
2020-12-14 01:19:36.534 IST [54163] LOG: database system was interrupted; last known up at 2020-12-14 01:16:11 IST
2020-12-14 01:19:36.806 IST [54163] LOG: database system was not properly shut down; automatic recovery in progress
2020-12-14 01:19:36.868 IST [54163] LOG: redo starts at 0/1C6CFDC0
2020-12-14 01:19:36.887 IST [54163] LOG: invalid record length at 0/1C6D1D90: wanted 24, got 0
2020-12-14 01:19:36.887 IST [54163] LOG: redo done at 0/1C6D1D58
2020-12-14 01:19:37.359 IST [54122] LOG: database system is ready to accept connections
2020-12-13 22:10:51.822 IST [2998] LOG: starting PostgreSQL 13.0 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0, 64-bit
2020-12-13 22:10:51.822 IST [2998] LOG: listening on IPv4 address "127.0.0.1", port 5432
2020-12-13 22:10:51.941 IST [2998] LOG: listening on Unix socket "/tmp/.s.PGSQL.5432"
2020-12-13 22:10:52.142 IST [3001] LOG: database system was interrupted; last known up at 2020-12-14 01:21:27 IST
2020-12-13 22:10:52.589 IST [3001] LOG: database system was not properly shut down; automatic recovery in progress
2020-12-13 22:10:52.667 IST [3001] LOG: redo starts at 0/1C6D1F68
2020-12-13 22:10:52.696 IST [3001] LOG: invalid record length at 0/1C6F5968: wanted 24, got 0
2020-12-13 22:10:52.696 IST [3001] LOG: redo done at 0/1C6F5930
2020-12-13 22:10:53.258 IST [2998] LOG: database system is ready to accept connections
2020-12-13 22:11:30.607 IST [3010] ERROR: type "k" does not exist at character 20
2020-12-13 22:11:30.607 IST [3010] STATEMENT: create table t(int k,int v);
2020-12-13 22:14:39.402 IST [3010] ERROR: relation "lsm_handler" does not exist at character 15
2020-12-13 22:14:39.402 IST [3010] STATEMENT: select * from lsm_handler;
2020-12-14 12:09:49.954 IST [2998] LOG: received fast shutdown request
2020-12-14 12:09:50.047 IST [2998] LOG: aborting any active transactions
2020-12-14 12:09:50.049 IST [2998] LOG: background worker "logical replication launcher" (PID 3007) exited with exit code 1
2020-12-14 12:09:50.050 IST [3002] LOG: shutting down
2020-12-14 12:09:50.266 IST [2998] LOG: database system is shut down
postgres: invalid argument: "log_min_messages=debug4"
Try "postgres --help" for more information.
2020-12-14 12:11:48.309 IST [6452] LOG: starting PostgreSQL 13.0 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0, 64-bit
2020-12-14 12:11:48.309 IST [6452] LOG: listening on IPv4 address "127.0.0.1", port 5432
2020-12-14 12:11:48.394 IST [6452] LOG: listening on Unix socket "/tmp/.s.PGSQL.5432"
2020-12-14 12:11:48.551 IST [6453] LOG: database system was shut down at 2020-12-14 12:09:50 IST
2020-12-14 12:11:48.631 IST [6452] LOG: database system is ready to accept connections
2020-12-14 12:11:55.745 IST [6462] ERROR: syntax error at or near "-" at character 1
2020-12-14 12:11:55.745 IST [6462] STATEMENT: -d
;
2020-12-14 12:12:01.861 IST [6462] ERROR: syntax error at or near "-" at character 1
2020-12-14 12:12:01.861 IST [6462] STATEMENT: -d
;
2020-12-14 12:12:06.535 IST [6462] ERROR: syntax error at or near "d" at character 1
2020-12-14 12:12:06.535 IST [6462] STATEMENT: d;
2020-12-14 12:12:13.264 IST [6462] ERROR: syntax error at or near "debug" at character 1
2020-12-14 12:12:13.264 IST [6462] STATEMENT: debug
;
2020-12-14 12:12:19.468 IST [6462] ERROR: syntax error at or near "debug" at character 1
2020-12-14 12:12:19.468 IST [6462] STATEMENT: debug-level;
2020-12-14 12:20:35.554 IST [6452] LOG: received fast shutdown request
2020-12-14 12:20:35.606 IST [6452] LOG: aborting any active transactions
2020-12-14 12:20:35.609 IST [6452] LOG: background worker "logical replication launcher" (PID 6459) exited with exit code 1
2020-12-14 12:20:35.609 IST [6454] LOG: shutting down
2020-12-14 12:20:36.171 IST [6452] LOG: database system is shut down
#include "postgres.h"
#include "access/attnum.h"
#include "utils/relcache.h"
#include "access/reloptions.h"
#include "access/nbtree.h"
#include "access/table.h"
#include "access/relation.h"
#include "access/relscan.h"
#include "access/xact.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "utils/rel.h"
#include "nodes/makefuncs.h"
#include "catalog/dependency.h"
#include "catalog/pg_operator.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/storage.h"
#include "utils/lsyscache.h"
#include "utils/typcache.h"
#include "utils/builtins.h"
#include "utils/index_selfuncs.h"
#include "miscadmin.h"
#include "tcop/utility.h"
#include "postmaster/bgworker.h"
#include "pgstat.h"
#include "executor/executor.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lock.h"
#include "storage/lmgr.h"
#include "storage/procarray.h"
#include "lsm.h"
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
PG_FUNCTION_INFO_V1(lsm_handler);
PG_FUNCTION_INFO_V1(lsm_btree_wrapper);
PG_FUNCTION_INFO_V1(lsm_get_merge_count);
PG_FUNCTION_INFO_V1(lsm_start_merge);
PG_FUNCTION_INFO_V1(lsm_wait_merge_completion);
PG_FUNCTION_INFO_V1(lsm_top_index_size);
extern void _PG_init(void);
extern void _PG_fini(void);
extern void lsm_merger_main(Datum arg);
/* lsm dictionary (hashtable with control data for all indexes) */
static lsmDictEntry* lsmEntry;
static HTAB* lsmDict;
static LWLock* lsmDictLock;
static List* lsmReleasedLocks;
/* Kind of relation optioms for lsm index */
static relopt_kind lsmReloptKind;
/* lsm kooks */
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
static shmem_startup_hook_type PreviousShmemStartupHook = NULL;
static ExecutorFinish_hook_type PreviousExecutorFinish = NULL;
/* lsm GUCs */
static int lsmMaxIndexes;
static int lsmTopIndexSize;
/* Background worker termination flag */
static volatile bool lsmCancel;
static void
lsm_shmem_startup(void)
{
HASHCTL info;
if (PreviousShmemStartupHook)
{
PreviousShmemStartupHook();
}
memset(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.entrysize = sizeof(lsmDictEntry);
lsmDict = ShmemInitHash("lsm hash",
lsmMaxIndexes, lsmMaxIndexes,
&info,
HASH_ELEM | HASH_BLOBS);
lsmDictLock = &(GetNamedLWLockTranche("lsm"))->lock;
}
/* Initialize lsm control data entry */
static void
lsm_init_entry(lsmDictEntry* entry, Relation index)
{
SpinLockInit(&entry->spinlock);
entry->active_index = 0;
entry->merger = NULL;
entry->merge_in_progress = false;
entry->start_merge = false;
entry->n_merges = 0;
entry->n_inserts = 0;
entry->top[0] = entry->top[1] = InvalidOid;
entry->access_count[0] = entry->access_count[1] = 0;
entry->heap = index->rd_index->indrelid;
entry->db_id = MyDatabaseId;
entry->user_id = GetUserId();
entry->top_index_size = index->rd_options ? ((lsmOptions*)index->rd_options)->top_index_size : 0;
}
/* Get B-Tree index size (number of blocks) */
static BlockNumber
lsm_get_index_size(Oid relid)
{
Relation index = index_open(relid, AccessShareLock);
BlockNumber size = RelationGetNumberOfBlocks(index);
index_close(index, AccessShareLock);
return size;
}
/* Lookup or create lsm control data for this index */
static lsmDictEntry*
lsm_get_entry(Relation index)
{
lsmDictEntry* entry;
bool found = true;
LWLockAcquire(lsmDictLock, LW_SHARED);
entry = (lsmDictEntry*)hash_search(lsmDict, &RelationGetRelid(index), HASH_FIND, &found);
if (entry == NULL)
{
/* We need exclusive lock to create new entry */
LWLockRelease(lsmDictLock);
LWLockAcquire(lsmDictLock, LW_EXCLUSIVE);
entry = (lsmDictEntry*)hash_search(lsmDict, &RelationGetRelid(index), HASH_ENTER, &found);
}
if (!found)
{
char* relname = RelationGetRelationName(index);
lsm_init_entry(entry, index);
for (int i = 0; i < 2; i++)
{
char* topidxname = psprintf("%s_top%d", relname, i);
entry->top[i] = get_relname_relid(topidxname, RelationGetNamespace(index));
if (entry->top[i] == InvalidOid)
{
elog(ERROR, "lsm: failed to lookup %s index", topidxname);
}
}
entry->active_index = lsm_get_index_size(entry->top[0]) >= lsm_get_index_size(entry->top[1]) ? 0 : 1;
}
LWLockRelease(lsmDictLock);
return entry;
}
/* Launch merger bgworker */
static void
lsm_launch_bgworker(lsmDictEntry* entry)
{
BackgroundWorker worker;
BackgroundWorkerHandle *handle;
pid_t bgw_pid;
MemSet(&worker, 0, sizeof(worker));
snprintf(worker.bgw_name, sizeof(worker.bgw_name), "lsm-merger-%d", entry->base);
snprintf(worker.bgw_type, sizeof(worker.bgw_type), "lsm-merger-%d", entry->base);
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_NEVER_RESTART;
strcpy(worker.bgw_function_name, "lsm_merger_main");
strcpy(worker.bgw_library_name, "lsm");
worker.bgw_main_arg = PointerGetDatum(entry);
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
{
elog(ERROR, "lsm: failed to start background worker");
}
if (WaitForBackgroundWorkerStartup(handle, &bgw_pid) != BGWH_STARTED)
{
elog(ERROR, "lsm: startup of background worker is failed");
}
entry->merger = BackendPidGetProc(bgw_pid);
for (int n_attempts = 0; entry->merger == NULL || n_attempts < 100; n_attempts++)
{
pg_usleep(10000); /* wait background worker to be registered in procarray */
entry->merger = BackendPidGetProc(bgw_pid);
}
if (entry->merger == NULL)
{
elog(ERROR, "lsm: background worker %d is crashed", bgw_pid);
}
}
/* Cancel merger bgwroker */
static void
lsm_merge_cancel(int sig)
{
lsmCancel = true;
SetLatch(MyLatch);
}
/* Truncate top index */
static void
lsm_truncate_index(Oid index_oid, Oid heap_oid)
{
Relation index = index_open(index_oid, AccessExclusiveLock);
Relation heap = table_open(heap_oid, AccessShareLock); /* heap is actually not used, because we will not load data to top indexes */
IndexInfo* indexInfo = BuildDummyIndexInfo(index);
RelationTruncate(index, 0);
elog(LOG, "lsm: truncate index %s", RelationGetRelationName(index));
index_build(heap, index, indexInfo, true, false);
index_close(index, AccessExclusiveLock);
table_close(heap, AccessShareLock);
}
/* Merge top index into base index */
static void
lsm_merge_indexes(Oid dst_oid, Oid src_oid, Oid heap_oid)
{
Relation top_index = index_open(src_oid, AccessShareLock);
Relation heap = table_open(heap_oid, AccessShareLock);
Relation base_index = index_open(dst_oid, RowExclusiveLock);
IndexScanDesc scan;
bool ok;
Oid save_am = base_index->rd_rel->relam;
elog(LOG, "lsm: merge top index %s with size %d blocks", RelationGetRelationName(top_index), RelationGetNumberOfBlocks(top_index));
base_index->rd_rel->relam = BTREE_AM_OID;
scan = index_beginscan(heap, top_index, SnapshotAny, 0, 0);
scan->xs_want_itup = true;
btrescan(scan, NULL, 0, 0, 0);
for (ok = _bt_first(scan, ForwardScanDirection); ok; ok = _bt_next(scan, ForwardScanDirection))
{
IndexTuple itup = scan->xs_itup;
if (BTreeTupleIsPosting(itup))
{
/* Some dirty coding here related with handling of posting items (index deduplication).
* If index tuple is posting item, we need to transfer it to normal index tuple.
* Posting list is representing by index tuple with INDEX_ALT_TID_MASK bit set in t_info and
* BT_IS_POSTING bit in TID offset, following by array of TIDs.
* We need to store right TID (taken from xs_heaptid) and correct index tuple length
* (not including size of TIDs array), clearing INDEX_ALT_TID_MASK.
* For efficiency reasons let's do it in place, saving and restoring original values after insertion is done.
*/
ItemPointerData save_tid = itup->t_tid;
unsigned short save_info = itup->t_info;
itup->t_info = (save_info & ~(INDEX_SIZE_MASK | INDEX_ALT_TID_MASK)) + BTreeTupleGetPostingOffset(itup);
itup->t_tid = scan->xs_heaptid;
_bt_doinsert(base_index, itup, false, heap); /* lsm index is not unique so need not to heck for duplica
tes */
itup->t_tid = save_tid;
itup->t_info = save_info;
}
else
{
_bt_doinsert(base_index, itup, false, heap); /* lsm index is not unique so need not to heck for duplica
tes */
}
}
index_endscan(scan);
base_index->rd_rel->relam = save_am;
index_close(top_index, AccessShareLock);
index_close(base_index, RowExclusiveLock);
table_close(heap, AccessShareLock);
}
/* lsm index options.
*/
static bytea *
lsm_options(Datum reloptions, bool validate)
{
static const relopt_parse_elt tab[] = {
{"fillfactor", RELOPT_TYPE_INT, offsetof(BTOptions, fillfactor)},
{"vacuum_cleanup_index_scale_factor", RELOPT_TYPE_REAL,
offsetof(BTOptions, vacuum_cleanup_index_scale_factor)},
{"deduplicate_items", RELOPT_TYPE_BOOL,
offsetof(BTOptions, deduplicate_items)},
{"top_index_size", RELOPT_TYPE_INT, offsetof(lsmOptions, top_index_size)},
{"unique", RELOPT_TYPE_BOOL, offsetof(lsmOptions, unique)}
};
return (bytea *) build_reloptions(reloptions, validate, lsmReloptKind,
sizeof(lsmOptions), tab, lengthof(tab));
}
/* Main function of merger bgwroker */
void
lsm_merger_main(Datum arg)
{
lsmDictEntry* entry = (lsmDictEntry*)DatumGetPointer(arg);
char *appname;
pqsignal(SIGINT, lsm_merge_cancel);
pqsignal(SIGQUIT, lsm_merge_cancel);
pqsignal(SIGTERM, lsm_merge_cancel);
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
BackgroundWorkerInitializeConnectionByOid(entry->db_id, entry->user_id, 0);
appname = psprintf("lsm merger for %d", entry->base);
pgstat_report_appname(appname);
pfree(appname);
while (!lsmCancel)
{
int merge_index= -1;
int wr;
pgstat_report_activity(STATE_IDLE, "waiting");
wr = WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1L, PG_WAIT_EXTENSION);
if ((wr & WL_POSTMASTER_DEATH) || lsmCancel)
{
break;
}
ResetLatch(MyLatch);
/* Check if merge is requested under spinlock */
SpinLockAcquire(&entry->spinlock);
if (entry->start_merge)
{
merge_index = 1 - entry->active_index; /* at this moment active index should already by swapped */
entry->start_merge = false;
}
SpinLockRelease(&entry->spinlock);
if (merge_index >= 0)
{
StartTransactionCommand();
{
pgstat_report_activity(STATE_RUNNING, "merging");
lsm_merge_indexes(entry->base, entry->top[merge_index], entry->heap);
pgstat_report_activity(STATE_RUNNING, "truncate");
lsm_truncate_index(entry->top[merge_index], entry->heap);
}
CommitTransactionCommand();
SpinLockAcquire(&entry->spinlock);
entry->merge_in_progress = false; /* mark merge as completed */
SpinLockRelease(&entry->spinlock);
}
}
entry->merger = NULL;
}
/* Build index tuple comparator context */
static SortSupport
lsm_build_sortkeys(Relation index)
{
int keysz = IndexRelationGetNumberOfKeyAttributes(index);
SortSupport sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
BTScanInsert inskey = _bt_mkscankey(index, NULL);
Oid save_am = index->rd_rel->relam;
index->rd_rel->relam = BTREE_AM_OID;
for (int i = 0; i < keysz; i++)
{
SortSupport sortKey = &sortKeys[i];
ScanKey scanKey = &inskey->scankeys[i];
int16 strategy;
sortKey->ssup_cxt = CurrentMemoryContext;
sortKey->ssup_collation = scanKey->sk_collation;
sortKey->ssup_nulls_first =
(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
sortKey->ssup_attno = scanKey->sk_attno;
/* Abbreviation is not supported here */
sortKey->abbreviate = false;
AssertState(sortKey->ssup_attno != 0);
strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
BTGreaterStrategyNumber : BTLessStrategyNumber;
PrepareSortSupportFromIndexRel(index, strategy, sortKey);
}
index->rd_rel->relam = save_am;
return sortKeys;
}
/* Compare index tuples */
static int
lsm_compare_index_tuples(IndexScanDesc scan1, IndexScanDesc scan2, SortSupport sortKeys)
{
int n_keys = IndexRelationGetNumberOfKeyAttributes(scan1->indexRelation);
for (int i = 1; i <= n_keys; i++)
{
Datum datum[2];
bool isNull[2];
int result;
datum[0] = index_getattr(scan1->xs_itup, i, scan1->xs_itupdesc, &isNull[0]);
datum[1] = index_getattr(scan2->xs_itup, i, scan2->xs_itupdesc, &isNull[1]);
result = ApplySortComparator(datum[0], isNull[0],
datum[1], isNull[1],
&sortKeys[i - 1]);
if (result != 0)
{
return result;
}
}
return ItemPointerCompare(&scan1->xs_heaptid, &scan2->xs_heaptid);
}
/*
* lsm access methods implementation
*/
static IndexBuildResult *
lsm_build(Relation heap, Relation index, IndexInfo *indexInfo)
{
Oid save_am = index->rd_rel->relam;
IndexBuildResult * result;
bool found;
LWLockAcquire(lsmDictLock, LW_EXCLUSIVE); /* Obtain exclusive lock on dictionary: it will be released in utility hook */
lsmEntry = hash_search(lsmDict, &RelationGetRelid(index), HASH_ENTER, &found); /* Setting lsmEntry indicates to utility hook that lsm index was created */
if (!found)
{
lsm_init_entry(lsmEntry, index);
}
index->rd_rel->relam = BTREE_AM_OID;
result = btbuild(heap, index, indexInfo);
index->rd_rel->relam = save_am;
return result;
}
/* Insert in active top index, on overflow swap active indexes and initiate merge to base index */
static bool
lsm_insert(Relation rel, Datum *values, bool *isnull,
ItemPointer ht_ctid, Relation heapRel,
IndexUniqueCheck checkUnique,
IndexInfo *indexInfo)
{
lsmDictEntry* entry = lsm_get_entry(rel);
int active_index;
uint64 n_merges; /* used to check if merge was initiated by somebody else */
Relation index;
Oid save_am;
bool overflow;
int top_index_size = entry->top_index_size ? entry->top_index_size : lsmTopIndexSize;
/* Obtain current active index and increment access counter under spinlock */
SpinLockAcquire(&entry->spinlock);
active_index = entry->active_index;
entry->access_count[active_index] += 1;
n_merges = entry->n_merges;
SpinLockRelease(&entry->spinlock);
/* Do insert in top index */
index = index_open(entry->top[active_index], RowExclusiveLock);
index->rd_rel->relam = BTREE_AM_OID;
save_am = index->rd_rel->relam;
btinsert(index, values, isnull, ht_ctid, heapRel, checkUnique, indexInfo);
index_close(index, RowExclusiveLock);
index->rd_rel->relam = save_am;
overflow = !entry->merge_in_progress /* do not check for overflow if merge was already initiated */
&& (entry->n_inserts % lsm_CHECK_TOP_INDEX_SIZE_PERIOD) == 0 /* perform check only each N-th insert */
&& RelationGetNumberOfBlocks(index)*(BLCKSZ/1024) > top_index_size;
SpinLockAcquire(&entry->spinlock);
/* If merge was not initiated before by somebody else, then do it */
if (overflow && !entry->merge_in_progress && entry->n_merges == n_merges)
{
Assert(entry->active_index == active_index);
entry->merge_in_progress = true;
entry->active_index ^= 1; /* swap top indexes */
entry->n_merges += 1;
}
Assert(entry->access_count[active_index] > 0);
entry->access_count[active_index] -= 1;
entry->n_inserts += 1;
if (entry->merge_in_progress)
{
LOCKTAG tag;
SET_LOCKTAG_RELATION(tag,
MyDatabaseId,
entry->top[1-active_index]);
/* Holding lock on non-ative index prevent merger bgworker from truncation this index */
if (LockHeldByMe(&tag, RowExclusiveLock))
{
LockRelease(&tag, RowExclusiveLock, false);
lsmReleasedLocks = lappend_oid(lsmReleasedLocks, entry->top[1-active_index]);
}
/* If all inserts in previous active index are completed then we can start merge */
if (entry->active_index != active_index && entry->access_count[active_index] == 0)
{
entry->start_merge = true;
if (entry->merger == NULL) /* lazy start of bgworker */
{
lsm_launch_bgworker(entry);
}
SetLatch(&entry->merger->procLatch);
}
}
SpinLockRelease(&entry->spinlock);
return false;
}
static IndexScanDesc
lsm_beginscan(Relation rel, int nkeys, int norderbys)
{
IndexScanDesc scan;
lsmScanOpaque* so;
int i;
/* no order by operators allowed */
Assert(norderbys == 0);
/* get the scan */
scan = RelationGetIndexScan(rel, nkeys, norderbys);
scan->xs_itupdesc = RelationGetDescr(rel);
so = (lsmScanOpaque*)palloc(sizeof(lsmScanOpaque));
so->entry = lsm_get_entry(rel);
so->sortKeys = lsm_build_sortkeys(rel);
for (i = 0; i < 2; i++)
{
so->top_index[i] = index_open(so->entry->top[i], AccessShareLock);
so->scan[i] = btbeginscan(so->top_index[i], nkeys, norderbys);
}
so->scan[2] = btbeginscan(rel, nkeys, norderbys);
for (i = 0; i < 3; i++)
{
so->eof[i] = false;
so->scan[i]->xs_want_itup = true;
so->scan[i]->parallel_scan = NULL;
}
so->unique = rel->rd_options ? ((lsmOptions*)rel->rd_options)->unique : false;
so->curr_index = -1;
scan->opaque = so;
return scan;
}
static void
lsm_rescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
ScanKey orderbys, int norderbys)
{
lsmScanOpaque* so = (lsmScanOpaque*) scan->opaque;
so->curr_index = -1;
for (int i = 0; i < 3; i++)
{
btrescan(so->scan[i], scankey, nscankeys, orderbys, norderbys);
so->eof[i] = false;
}
}
static void
lsm_endscan(IndexScanDesc scan)
{
lsmScanOpaque* so = (lsmScanOpaque*) scan->opaque;
for (int i = 0; i < 3; i++)
{
btendscan(so->scan[i]);
if (i < 2)
{
index_close(so->top_index[i], AccessShareLock);
}
}
pfree(so);
}
static bool
lsm_gettuple(IndexScanDesc scan, ScanDirection dir)
{
lsmScanOpaque* so = (lsmScanOpaque*) scan->opaque;
int min = -1;
int curr = so->curr_index;
/* We start with active top index, then merging index and last of all: largest base index */
int try_index_order[3] = {so->entry->active_index, 1-so->entry->active_index, 2};
/* btree indexes are never lossy */
scan->xs_recheck = false;
if (curr >= 0) /* lazy advance of current index */
{
so->eof[curr] = !_bt_next(so->scan[curr], dir); /* move forward current index */
}
for (int j = 0; j < 3; j++)
{
int i = try_index_order[j];
BTScanOpaque bto = (BTScanOpaque)so->scan[i]->opaque;
so->scan[i]->xs_snapshot = scan->xs_snapshot;
if (!so->eof[i] && !BTScanPosIsValid(bto->currPos))
{
so->eof[i] = !_bt_first(so->scan[i], dir);
if (!so->eof[i] && so->unique && scan->numberOfKeys == scan->indexRelation->rd_index->indnkeyatts)
{
/* If index is marked as unique and we perform lookup using all index keys,
* then we can stop after locating first occurrence.
* If make it possible to avoid lookups of all three indexes.
*/
elog(DEBUG1, "lsm: lookup %d indexes", j+1);
while (++j < 3) /* prevent search of all remanining indexes */
{
so->eof[try_index_order[j]] = true;
}
min = i;
break;
}
}
if (!so->eof[i])
{
if (min < 0)
{
min = i;
}
else
{
int result = lsm_compare_index_tuples(so->scan[i], so->scan[min], so->sortKeys);
if (result == 0)
{
/* Duplicate: it can happen during merge when same tid is both in top and base index */
so->eof[i] = !_bt_next(so->scan[i], dir); /* just skip one of entries */
}
else if ((result < 0) == ScanDirectionIsForward(dir))
{
min = i;
}
}
}
}
if (min < 0) /* all indexes are traversed */
{
return false;
}
else
{
scan->xs_heaptid = so->scan[min]->xs_heaptid; /* copy TID */
if (scan->xs_want_itup) {
scan->xs_itup = so->scan[min]->xs_itup;
}
so->curr_index = min; /*will be advance at next call of gettuple */
return true;
}
}
static int64
lsm_getbitmap(IndexScanDesc scan, TIDBitmap *tbm)
{
lsmScanOpaque* so = (lsmScanOpaque*)scan->opaque;
int64 ntids = 0;
for (int i = 0; i < 3; i++)
{
so->scan[i]->xs_snapshot = scan->xs_snapshot;
ntids += btgetbitmap(so->scan[i], tbm);
}