Commit 67035927 authored by Paras Garg's avatar Paras Garg

readme

parent 0acbed60
LSM tree implemented using standard Postgres B-Tree indexes.
Two levels of index are L0 and L1.
LO is used to store initial inserts and L1 is used.
FOR REMAINING POINT AND INSTALLATION
see the report
\ No newline at end of file
select tablename ,indexname, indexdef from pg_indexes where tablename='t';
2020-12-14 01:01:11.215 IST [53734] FATAL: lock file "postmaster.pid" already exists
2020-12-14 01:01:11.215 IST [53734] HINT: Is another postmaster (PID 2917) running in data directory "/home/paras/postgres13/install/data"?
2020-12-14 01:16:10.828 IST [54122] LOG: starting PostgreSQL 13.0 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0, 64-bit
2020-12-14 01:16:10.828 IST [54122] LOG: listening on IPv4 address "127.0.0.1", port 5432
2020-12-14 01:16:10.926 IST [54122] LOG: listening on Unix socket "/tmp/.s.PGSQL.5432"
2020-12-14 01:16:11.072 IST [54123] LOG: database system was shut down at 2020-12-14 01:16:07 IST
2020-12-14 01:16:11.152 IST [54122] LOG: database system is ready to accept connections
2020-12-14 01:18:04.122 IST [54131] ERROR: syntax error at or near "enalyse" at character 1
2020-12-14 01:18:04.122 IST [54131] STATEMENT: enalyse
;
2020-12-14 01:18:11.225 IST [54131] ERROR: syntax error at or near ";" at character 9
2020-12-14 01:18:11.225 IST [54131] STATEMENT: EXPLAIN ;
2020-12-14 01:19:36.421 IST [54122] LOG: server process (PID 54131) was terminated by signal 11: Segmentation fault
2020-12-14 01:19:36.421 IST [54122] DETAIL: Failed process was running: select * from t where k=3;
2020-12-14 01:19:36.421 IST [54122] LOG: terminating any other active server processes
2020-12-14 01:19:36.421 IST [54127] WARNING: terminating connection because of crash of another server process
2020-12-14 01:19:36.421 IST [54127] DETAIL: The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
2020-12-14 01:19:36.421 IST [54127] HINT: In a moment you should be able to reconnect to the database and repeat your command.
2020-12-14 01:19:36.423 IST [54162] FATAL: the database system is in recovery mode
2020-12-14 01:19:36.424 IST [54122] LOG: all server processes terminated; reinitializing
2020-12-14 01:19:36.534 IST [54163] LOG: database system was interrupted; last known up at 2020-12-14 01:16:11 IST
2020-12-14 01:19:36.806 IST [54163] LOG: database system was not properly shut down; automatic recovery in progress
2020-12-14 01:19:36.868 IST [54163] LOG: redo starts at 0/1C6CFDC0
2020-12-14 01:19:36.887 IST [54163] LOG: invalid record length at 0/1C6D1D90: wanted 24, got 0
2020-12-14 01:19:36.887 IST [54163] LOG: redo done at 0/1C6D1D58
2020-12-14 01:19:37.359 IST [54122] LOG: database system is ready to accept connections
2020-12-13 22:10:51.822 IST [2998] LOG: starting PostgreSQL 13.0 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0, 64-bit
2020-12-13 22:10:51.822 IST [2998] LOG: listening on IPv4 address "127.0.0.1", port 5432
2020-12-13 22:10:51.941 IST [2998] LOG: listening on Unix socket "/tmp/.s.PGSQL.5432"
2020-12-13 22:10:52.142 IST [3001] LOG: database system was interrupted; last known up at 2020-12-14 01:21:27 IST
2020-12-13 22:10:52.589 IST [3001] LOG: database system was not properly shut down; automatic recovery in progress
2020-12-13 22:10:52.667 IST [3001] LOG: redo starts at 0/1C6D1F68
2020-12-13 22:10:52.696 IST [3001] LOG: invalid record length at 0/1C6F5968: wanted 24, got 0
2020-12-13 22:10:52.696 IST [3001] LOG: redo done at 0/1C6F5930
2020-12-13 22:10:53.258 IST [2998] LOG: database system is ready to accept connections
2020-12-13 22:11:30.607 IST [3010] ERROR: type "k" does not exist at character 20
2020-12-13 22:11:30.607 IST [3010] STATEMENT: create table t(int k,int v);
2020-12-13 22:14:39.402 IST [3010] ERROR: relation "lsm_handler" does not exist at character 15
2020-12-13 22:14:39.402 IST [3010] STATEMENT: select * from lsm_handler;
2020-12-14 12:09:49.954 IST [2998] LOG: received fast shutdown request
2020-12-14 12:09:50.047 IST [2998] LOG: aborting any active transactions
2020-12-14 12:09:50.049 IST [2998] LOG: background worker "logical replication launcher" (PID 3007) exited with exit code 1
2020-12-14 12:09:50.050 IST [3002] LOG: shutting down
2020-12-14 12:09:50.266 IST [2998] LOG: database system is shut down
postgres: invalid argument: "log_min_messages=debug4"
Try "postgres --help" for more information.
2020-12-14 12:11:48.309 IST [6452] LOG: starting PostgreSQL 13.0 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0, 64-bit
2020-12-14 12:11:48.309 IST [6452] LOG: listening on IPv4 address "127.0.0.1", port 5432
2020-12-14 12:11:48.394 IST [6452] LOG: listening on Unix socket "/tmp/.s.PGSQL.5432"
2020-12-14 12:11:48.551 IST [6453] LOG: database system was shut down at 2020-12-14 12:09:50 IST
2020-12-14 12:11:48.631 IST [6452] LOG: database system is ready to accept connections
2020-12-14 12:11:55.745 IST [6462] ERROR: syntax error at or near "-" at character 1
2020-12-14 12:11:55.745 IST [6462] STATEMENT: -d
;
2020-12-14 12:12:01.861 IST [6462] ERROR: syntax error at or near "-" at character 1
2020-12-14 12:12:01.861 IST [6462] STATEMENT: -d
;
2020-12-14 12:12:06.535 IST [6462] ERROR: syntax error at or near "d" at character 1
2020-12-14 12:12:06.535 IST [6462] STATEMENT: d;
2020-12-14 12:12:13.264 IST [6462] ERROR: syntax error at or near "debug" at character 1
2020-12-14 12:12:13.264 IST [6462] STATEMENT: debug
;
2020-12-14 12:12:19.468 IST [6462] ERROR: syntax error at or near "debug" at character 1
2020-12-14 12:12:19.468 IST [6462] STATEMENT: debug-level;
2020-12-14 12:20:35.554 IST [6452] LOG: received fast shutdown request
2020-12-14 12:20:35.606 IST [6452] LOG: aborting any active transactions
2020-12-14 12:20:35.609 IST [6452] LOG: background worker "logical replication launcher" (PID 6459) exited with exit code 1
2020-12-14 12:20:35.609 IST [6454] LOG: shutting down
2020-12-14 12:20:36.171 IST [6452] LOG: database system is shut down
#include "postgres.h"
#include "access/attnum.h"
#include "utils/relcache.h"
#include "access/reloptions.h"
#include "access/nbtree.h"
#include "access/table.h"
#include "access/relation.h"
#include "access/relscan.h"
#include "access/xact.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "utils/rel.h"
#include "nodes/makefuncs.h"
#include "catalog/dependency.h"
#include "catalog/pg_operator.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/storage.h"
#include "utils/lsyscache.h"
#include "utils/typcache.h"
#include "utils/builtins.h"
#include "utils/index_selfuncs.h"
#include "miscadmin.h"
#include "tcop/utility.h"
#include "postmaster/bgworker.h"
#include "pgstat.h"
#include "executor/executor.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lock.h"
#include "storage/lmgr.h"
#include "storage/procarray.h"
#include "lsm.h"
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
PG_FUNCTION_INFO_V1(lsm_handler);
PG_FUNCTION_INFO_V1(lsm_btree_wrapper);
PG_FUNCTION_INFO_V1(lsm_get_merge_count);
PG_FUNCTION_INFO_V1(lsm_start_merge);
PG_FUNCTION_INFO_V1(lsm_wait_merge_completion);
PG_FUNCTION_INFO_V1(lsm_top_index_size);
extern void _PG_init(void);
extern void _PG_fini(void);
extern void lsm_merger_main(Datum arg);
/* lsm dictionary (hashtable with control data for all indexes) */
static lsmDictEntry* lsmEntry;
static HTAB* lsmDict;
static LWLock* lsmDictLock;
static List* lsmReleasedLocks;
/* Kind of relation optioms for lsm index */
static relopt_kind lsmReloptKind;
/* lsm kooks */
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
static shmem_startup_hook_type PreviousShmemStartupHook = NULL;
static ExecutorFinish_hook_type PreviousExecutorFinish = NULL;
/* lsm GUCs */
static int lsmMaxIndexes;
static int lsmTopIndexSize;
/* Background worker termination flag */
static volatile bool lsmCancel;
static void
lsm_shmem_startup(void)
{
HASHCTL info;
if (PreviousShmemStartupHook)
{
PreviousShmemStartupHook();
}
memset(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.entrysize = sizeof(lsmDictEntry);
lsmDict = ShmemInitHash("lsm hash",
lsmMaxIndexes, lsmMaxIndexes,
&info,
HASH_ELEM | HASH_BLOBS);
lsmDictLock = &(GetNamedLWLockTranche("lsm"))->lock;
}
/* Initialize lsm control data entry */
static void
lsm_init_entry(lsmDictEntry* entry, Relation index)
{
SpinLockInit(&entry->spinlock);
entry->active_index = 0;
entry->merger = NULL;
entry->merge_in_progress = false;
entry->start_merge = false;
entry->n_merges = 0;
entry->n_inserts = 0;
entry->top[0] = entry->top[1] = InvalidOid;
entry->access_count[0] = entry->access_count[1] = 0;
entry->heap = index->rd_index->indrelid;
entry->db_id = MyDatabaseId;
entry->user_id = GetUserId();
entry->top_index_size = index->rd_options ? ((lsmOptions*)index->rd_options)->top_index_size : 0;
}
/* Get B-Tree index size (number of blocks) */
static BlockNumber
lsm_get_index_size(Oid relid)
{
Relation index = index_open(relid, AccessShareLock);
BlockNumber size = RelationGetNumberOfBlocks(index);
index_close(index, AccessShareLock);
return size;
}
/* Lookup or create lsm control data for this index */
static lsmDictEntry*
lsm_get_entry(Relation index)
{
lsmDictEntry* entry;
bool found = true;
LWLockAcquire(lsmDictLock, LW_SHARED);
entry = (lsmDictEntry*)hash_search(lsmDict, &RelationGetRelid(index), HASH_FIND, &found);
if (entry == NULL)
{
/* We need exclusive lock to create new entry */
LWLockRelease(lsmDictLock);
LWLockAcquire(lsmDictLock, LW_EXCLUSIVE);
entry = (lsmDictEntry*)hash_search(lsmDict, &RelationGetRelid(index), HASH_ENTER, &found);
}
if (!found)
{
char* relname = RelationGetRelationName(index);
lsm_init_entry(entry, index);
for (int i = 0; i < 2; i++)
{
char* topidxname = psprintf("%s_top%d", relname, i);
entry->top[i] = get_relname_relid(topidxname, RelationGetNamespace(index));
if (entry->top[i] == InvalidOid)
{
elog(ERROR, "lsm: failed to lookup %s index", topidxname);
}
}
entry->active_index = lsm_get_index_size(entry->top[0]) >= lsm_get_index_size(entry->top[1]) ? 0 : 1;
}
LWLockRelease(lsmDictLock);
return entry;
}
/* Launch merger bgworker */
static void
lsm_launch_bgworker(lsmDictEntry* entry)
{
BackgroundWorker worker;
BackgroundWorkerHandle *handle;
pid_t bgw_pid;
MemSet(&worker, 0, sizeof(worker));
snprintf(worker.bgw_name, sizeof(worker.bgw_name), "lsm-merger-%d", entry->base);
snprintf(worker.bgw_type, sizeof(worker.bgw_type), "lsm-merger-%d", entry->base);
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_NEVER_RESTART;
strcpy(worker.bgw_function_name, "lsm_merger_main");
strcpy(worker.bgw_library_name, "lsm");
worker.bgw_main_arg = PointerGetDatum(entry);
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
{
elog(ERROR, "lsm: failed to start background worker");
}
if (WaitForBackgroundWorkerStartup(handle, &bgw_pid) != BGWH_STARTED)
{
elog(ERROR, "lsm: startup of background worker is failed");
}
entry->merger = BackendPidGetProc(bgw_pid);
for (int n_attempts = 0; entry->merger == NULL || n_attempts < 100; n_attempts++)
{
pg_usleep(10000); /* wait background worker to be registered in procarray */
entry->merger = BackendPidGetProc(bgw_pid);
}
if (entry->merger == NULL)
{
elog(ERROR, "lsm: background worker %d is crashed", bgw_pid);
}
}
/* Cancel merger bgwroker */
static void
lsm_merge_cancel(int sig)
{
lsmCancel = true;
SetLatch(MyLatch);
}
/* Truncate top index */
static void
lsm_truncate_index(Oid index_oid, Oid heap_oid)
{
Relation index = index_open(index_oid, AccessExclusiveLock);
Relation heap = table_open(heap_oid, AccessShareLock); /* heap is actually not used, because we will not load data to top indexes */
IndexInfo* indexInfo = BuildDummyIndexInfo(index);
RelationTruncate(index, 0);
elog(LOG, "lsm: truncate index %s", RelationGetRelationName(index));
index_build(heap, index, indexInfo, true, false);
index_close(index, AccessExclusiveLock);
table_close(heap, AccessShareLock);
}
/* Merge top index into base index */
static void
lsm_merge_indexes(Oid dst_oid, Oid src_oid, Oid heap_oid)
{
Relation top_index = index_open(src_oid, AccessShareLock);
Relation heap = table_open(heap_oid, AccessShareLock);
Relation base_index = index_open(dst_oid, RowExclusiveLock);
IndexScanDesc scan;
bool ok;
Oid save_am = base_index->rd_rel->relam;
elog(LOG, "lsm: merge top index %s with size %d blocks", RelationGetRelationName(top_index), RelationGetNumberOfBlocks(top_index));
base_index->rd_rel->relam = BTREE_AM_OID;
scan = index_beginscan(heap, top_index, SnapshotAny, 0, 0);
scan->xs_want_itup = true;
btrescan(scan, NULL, 0, 0, 0);
for (ok = _bt_first(scan, ForwardScanDirection); ok; ok = _bt_next(scan, ForwardScanDirection))
{
IndexTuple itup = scan->xs_itup;
if (BTreeTupleIsPosting(itup))
{
/* Some dirty coding here related with handling of posting items (index deduplication).
* If index tuple is posting item, we need to transfer it to normal index tuple.
* Posting list is representing by index tuple with INDEX_ALT_TID_MASK bit set in t_info and
* BT_IS_POSTING bit in TID offset, following by array of TIDs.
* We need to store right TID (taken from xs_heaptid) and correct index tuple length
* (not including size of TIDs array), clearing INDEX_ALT_TID_MASK.
* For efficiency reasons let's do it in place, saving and restoring original values after insertion is done.
*/
ItemPointerData save_tid = itup->t_tid;
unsigned short save_info = itup->t_info;
itup->t_info = (save_info & ~(INDEX_SIZE_MASK | INDEX_ALT_TID_MASK)) + BTreeTupleGetPostingOffset(itup);
itup->t_tid = scan->xs_heaptid;
_bt_doinsert(base_index, itup, false, heap); /* lsm index is not unique so need not to heck for duplica
tes */
itup->t_tid = save_tid;
itup->t_info = save_info;
}
else
{
_bt_doinsert(base_index, itup, false, heap); /* lsm index is not unique so need not to heck for duplica
tes */
}
}
index_endscan(scan);
base_index->rd_rel->relam = save_am;
index_close(top_index, AccessShareLock);
index_close(base_index, RowExclusiveLock);
table_close(heap, AccessShareLock);
}
/* lsm index options.
*/
static bytea *
lsm_options(Datum reloptions, bool validate)
{
static const relopt_parse_elt tab[] = {
{"fillfactor", RELOPT_TYPE_INT, offsetof(BTOptions, fillfactor)},
{"vacuum_cleanup_index_scale_factor", RELOPT_TYPE_REAL,
offsetof(BTOptions, vacuum_cleanup_index_scale_factor)},
{"deduplicate_items", RELOPT_TYPE_BOOL,
offsetof(BTOptions, deduplicate_items)},
{"top_index_size", RELOPT_TYPE_INT, offsetof(lsmOptions, top_index_size)},
{"unique", RELOPT_TYPE_BOOL, offsetof(lsmOptions, unique)}
};
return (bytea *) build_reloptions(reloptions, validate, lsmReloptKind,
sizeof(lsmOptions), tab, lengthof(tab));
}
/* Main function of merger bgwroker */
void
lsm_merger_main(Datum arg)
{
lsmDictEntry* entry = (lsmDictEntry*)DatumGetPointer(arg);
char *appname;
pqsignal(SIGINT, lsm_merge_cancel);
pqsignal(SIGQUIT, lsm_merge_cancel);
pqsignal(SIGTERM, lsm_merge_cancel);
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
BackgroundWorkerInitializeConnectionByOid(entry->db_id, entry->user_id, 0);
appname = psprintf("lsm merger for %d", entry->base);
pgstat_report_appname(appname);
pfree(appname);
while (!lsmCancel)
{
int merge_index= -1;
int wr;
pgstat_report_activity(STATE_IDLE, "waiting");
wr = WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1L, PG_WAIT_EXTENSION);
if ((wr & WL_POSTMASTER_DEATH) || lsmCancel)
{
break;
}
ResetLatch(MyLatch);
/* Check if merge is requested under spinlock */
SpinLockAcquire(&entry->spinlock);
if (entry->start_merge)
{
merge_index = 1 - entry->active_index; /* at this moment active index should already by swapped */
entry->start_merge = false;
}
SpinLockRelease(&entry->spinlock);
if (merge_index >= 0)
{
StartTransactionCommand();
{
pgstat_report_activity(STATE_RUNNING, "merging");
lsm_merge_indexes(entry->base, entry->top[merge_index], entry->heap);
pgstat_report_activity(STATE_RUNNING, "truncate");
lsm_truncate_index(entry->top[merge_index], entry->heap);
}
CommitTransactionCommand();
SpinLockAcquire(&entry->spinlock);
entry->merge_in_progress = false; /* mark merge as completed */
SpinLockRelease(&entry->spinlock);
}
}
entry->merger = NULL;
}
/* Build index tuple comparator context */
static SortSupport
lsm_build_sortkeys(Relation index)
{
int keysz = IndexRelationGetNumberOfKeyAttributes(index);
SortSupport sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
BTScanInsert inskey = _bt_mkscankey(index, NULL);
Oid save_am = index->rd_rel->relam;
index->rd_rel->relam = BTREE_AM_OID;
for (int i = 0; i < keysz; i++)
{
SortSupport sortKey = &sortKeys[i];
ScanKey scanKey = &inskey->scankeys[i];
int16 strategy;
sortKey->ssup_cxt = CurrentMemoryContext;
sortKey->ssup_collation = scanKey->sk_collation;
sortKey->ssup_nulls_first =
(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
sortKey->ssup_attno = scanKey->sk_attno;
/* Abbreviation is not supported here */
sortKey->abbreviate = false;
AssertState(sortKey->ssup_attno != 0);
strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
BTGreaterStrategyNumber : BTLessStrategyNumber;
PrepareSortSupportFromIndexRel(index, strategy, sortKey);
}
index->rd_rel->relam = save_am;
return sortKeys;
}
/* Compare index tuples */
static int
lsm_compare_index_tuples(IndexScanDesc scan1, IndexScanDesc scan2, SortSupport sortKeys)
{
int n_keys = IndexRelationGetNumberOfKeyAttributes(scan1->indexRelation);
for (int i = 1; i <= n_keys; i++)
{
Datum datum[2];
bool isNull[2];
int result;
datum[0] = index_getattr(scan1->xs_itup, i, scan1->xs_itupdesc, &isNull[0]);
datum[1] = index_getattr(scan2->xs_itup, i, scan2->xs_itupdesc, &isNull[1]);
result = ApplySortComparator(datum[0], isNull[0],
datum[1], isNull[1],
&sortKeys[i - 1]);
if (result != 0)
{
return result;
}
}
return ItemPointerCompare(&scan1->xs_heaptid, &scan2->xs_heaptid);
}
/*
* lsm access methods implementation
*/
static IndexBuildResult *
lsm_build(Relation heap, Relation index, IndexInfo *indexInfo)
{
Oid save_am = index->rd_rel->relam;
IndexBuildResult * result;
bool found;
LWLockAcquire(lsmDictLock, LW_EXCLUSIVE); /* Obtain exclusive lock on dictionary: it will be released in utility hook */
lsmEntry = hash_search(lsmDict, &RelationGetRelid(index), HASH_ENTER, &found); /* Setting lsmEntry indicates to utility hook that lsm index was created */
if (!found)
{
lsm_init_entry(lsmEntry, index);
}
index->rd_rel->relam = BTREE_AM_OID;
result = btbuild(heap, index, indexInfo);
index->rd_rel->relam = save_am;
return result;
}
/* Insert in active top index, on overflow swap active indexes and initiate merge to base index */
static bool
lsm_insert(Relation rel, Datum *values, bool *isnull,
ItemPointer ht_ctid, Relation heapRel,
IndexUniqueCheck checkUnique,
IndexInfo *indexInfo)
{
lsmDictEntry* entry = lsm_get_entry(rel);
int active_index;
uint64 n_merges; /* used to check if merge was initiated by somebody else */
Relation index;
Oid save_am;
bool overflow;
int top_index_size = entry->top_index_size ? entry->top_index_size : lsmTopIndexSize;
/* Obtain current active index and increment access counter under spinlock */
SpinLockAcquire(&entry->spinlock);
active_index = entry->active_index;
entry->access_count[active_index] += 1;
n_merges = entry->n_merges;
SpinLockRelease(&entry->spinlock);
/* Do insert in top index */
index = index_open(entry->top[active_index], RowExclusiveLock);
index->rd_rel->relam = BTREE_AM_OID;
save_am = index->rd_rel->relam;
btinsert(index, values, isnull, ht_ctid, heapRel, checkUnique, indexInfo);
index_close(index, RowExclusiveLock);
index->rd_rel->relam = save_am;
overflow = !entry->merge_in_progress /* do not check for overflow if merge was already initiated */
&& (entry->n_inserts % lsm_CHECK_TOP_INDEX_SIZE_PERIOD) == 0 /* perform check only each N-th insert */
&& RelationGetNumberOfBlocks(index)*(BLCKSZ/1024) > top_index_size;
SpinLockAcquire(&entry->spinlock);
/* If merge was not initiated before by somebody else, then do it */
if (overflow && !entry->merge_in_progress && entry->n_merges == n_merges)
{
Assert(entry->active_index == active_index);
entry->merge_in_progress = true;
entry->active_index ^= 1; /* swap top indexes */
entry->n_merges += 1;
}
Assert(entry->access_count[active_index] > 0);
entry->access_count[active_index] -= 1;
entry->n_inserts += 1;
if (entry->merge_in_progress)
{
LOCKTAG tag;
SET_LOCKTAG_RELATION(tag,
MyDatabaseId,
entry->top[1-active_index]);
/* Holding lock on non-ative index prevent merger bgworker from truncation this index */
if (LockHeldByMe(&tag, RowExclusiveLock))
{
LockRelease(&tag, RowExclusiveLock, false);
lsmReleasedLocks = lappend_oid(lsmReleasedLocks, entry->top[1-active_index]);
}
/* If all inserts in previous active index are completed then we can start merge */
if (entry->active_index != active_index && entry->access_count[active_index] == 0)
{
entry->start_merge = true;
if (entry->merger == NULL) /* lazy start of bgworker */
{
lsm_launch_bgworker(entry);
}
SetLatch(&entry->merger->procLatch);
}
}
SpinLockRelease(&entry->spinlock);
return false;
}
static IndexScanDesc
lsm_beginscan(Relation rel, int nkeys, int norderbys)
{
IndexScanDesc scan;
lsmScanOpaque* so;
int i;
/* no order by operators allowed */
Assert(norderbys == 0);
/* get the scan */
scan = RelationGetIndexScan(rel, nkeys, norderbys);
scan->xs_itupdesc = RelationGetDescr(rel);
so = (lsmScanOpaque*)palloc(sizeof(lsmScanOpaque));
so->entry = lsm_get_entry(rel);
so->sortKeys = lsm_build_sortkeys(rel);
for (i = 0; i < 2; i++)
{
so->top_index[i] = index_open(so->entry->top[i], AccessShareLock);
so->scan[i] = btbeginscan(so->top_index[i], nkeys, norderbys);
}
so->scan[2] = btbeginscan(rel, nkeys, norderbys);
for (i = 0; i < 3; i++)
{
so->eof[i] = false;
so->scan[i]->xs_want_itup = true;
so->scan[i]->parallel_scan = NULL;
}
so->unique = rel->rd_options ? ((lsmOptions*)rel->rd_options)->unique : false;
so->curr_index = -1;
scan->opaque = so;
return scan;
}
static void
lsm_rescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
ScanKey orderbys, int norderbys)
{
lsmScanOpaque* so = (lsmScanOpaque*) scan->opaque;
so->curr_index = -1;
for (int i = 0; i < 3; i++)
{
btrescan(so->scan[i], scankey, nscankeys, orderbys, norderbys);
so->eof[i] = false;
}
}
static void
lsm_endscan(IndexScanDesc scan)
{
lsmScanOpaque* so = (lsmScanOpaque*) scan->opaque;
for (int i = 0; i < 3; i++)
{
btendscan(so->scan[i]);
if (i < 2)
{
index_close(so->top_index[i], AccessShareLock);
}
}
pfree(so);
}
static bool
lsm_gettuple(IndexScanDesc scan, ScanDirection dir)
{
lsmScanOpaque* so = (lsmScanOpaque*) scan->opaque;
int min = -1;
int curr = so->curr_index;
/* We start with active top index, then merging index and last of all: largest base index */
int try_index_order[3] = {so->entry->active_index, 1-so->entry->active_index, 2};
/* btree indexes are never lossy */
scan->xs_recheck = false;
if (curr >= 0) /* lazy advance of current index */
{
so->eof[curr] = !_bt_next(so->scan[curr], dir); /* move forward current index */
}
for (int j = 0; j < 3; j++)
{
int i = try_index_order[j];
BTScanOpaque bto = (BTScanOpaque)so->scan[i]->opaque;
so->scan[i]->xs_snapshot = scan->xs_snapshot;
if (!so->eof[i] && !BTScanPosIsValid(bto->currPos))
{
so->eof[i] = !_bt_first(so->scan[i], dir);
if (!so->eof[i] && so->unique && scan->numberOfKeys == scan->indexRelation->rd_index->indnkeyatts)
{
/* If index is marked as unique and we perform lookup using all index keys,
* then we can stop after locating first occurrence.
* If make it possible to avoid lookups of all three indexes.
*/
elog(DEBUG1, "lsm: lookup %d indexes", j+1);
while (++j < 3) /* prevent search of all remanining indexes */
{
so->eof[try_index_order[j]] = true;
}
min = i;
break;
}
}
if (!so->eof[i])
{
if (min < 0)
{
min = i;
}
else
{
int result = lsm_compare_index_tuples(so->scan[i], so->scan[min], so->sortKeys);
if (result == 0)
{
/* Duplicate: it can happen during merge when same tid is both in top and base index */
so->eof[i] = !_bt_next(so->scan[i], dir); /* just skip one of entries */
}
else if ((result < 0) == ScanDirectionIsForward(dir))
{
min = i;
}
}
}
}
if (min < 0) /* all indexes are traversed */
{
return false;
}
else
{
scan->xs_heaptid = so->scan[min]->xs_heaptid; /* copy TID */
if (scan->xs_want_itup) {
scan->xs_itup = so->scan[min]->xs_itup;
}
so->curr_index = min; /*will be advance at next call of gettuple */
return true;
}
}
static int64
lsm_getbitmap(IndexScanDesc scan, TIDBitmap *tbm)
{
lsmScanOpaque* so = (lsmScanOpaque*)scan->opaque;
int64 ntids = 0;
for (int i = 0; i < 3; i++)
{
so->scan[i]->xs_snapshot = scan->xs_snapshot;
ntids += btgetbitmap(so->scan[i], tbm);
}
return ntids;
}
Datum
lsm_handler(PG_FUNCTION_ARGS)
{
IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
amroutine->amstrategies = BTMaxStrategyNumber;
amroutine->amsupport = BTNProcs;
amroutine->amoptsprocnum = BTOPTIONS_PROC;
amroutine->amcanorder = true;
amroutine->amcanorderbyop = false;
amroutine->amcanbackward = true;
amroutine->amcanunique = false; /* We can't check that index is unique without accessing base index */
amroutine->amcanmulticol = true;
amroutine->amoptionalkey = true;
amroutine->amsearcharray = false; /* TODO: not sure if it will work correctly with merge */
amroutine->amsearchnulls = true;
amroutine->amstorage = false;
amroutine->amclusterable = true;
amroutine->ampredlocks = true;
amroutine->amcanparallel = false; /* TODO: parallel scac is not supported yet */
amroutine->amcaninclude = true;
amroutine->amusemaintenanceworkmem = false;
amroutine->amparallelvacuumoptions = 0;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = lsm_build;
amroutine->ambuildempty = btbuildempty;
amroutine->aminsert = lsm_insert;
amroutine->ambulkdelete = btbulkdelete;
amroutine->amvacuumcleanup = btvacuumcleanup;
amroutine->amcanreturn = btcanreturn;
amroutine->amcostestimate = btcostestimate;
amroutine->amoptions = lsm_options;
amroutine->amproperty = btproperty;
amroutine->ambuildphasename = btbuildphasename;
amroutine->amvalidate = btvalidate;
amroutine->ambeginscan = lsm_beginscan;
amroutine->amrescan = lsm_rescan;
amroutine->amgettuple = lsm_gettuple;
amroutine->amgetbitmap = lsm_getbitmap;
amroutine->amendscan = lsm_endscan;
amroutine->ammarkpos = NULL; /* When do we need index_markpos? Can we live without it? */
amroutine->amrestrpos = NULL;
amroutine->amestimateparallelscan = NULL;
amroutine->aminitparallelscan = NULL;
amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
/*
* Access methods for B-Tree wrapper: actually we aonly want to disable inserts.
*/
/* We do not need to load data in top top index: just initialize index metadata */
static IndexBuildResult *
lsm_build_empty(Relation heap, Relation index, IndexInfo *indexInfo)
{
Page metapage;
/* Construct metapage. */
metapage = (Page) palloc(BLCKSZ);
_bt_initmetapage(metapage, BTREE_METAPAGE, 0, _bt_allequalimage(index, false));
RelationOpenSmgr(index);
/*
* Write the page and log it. It might seem that an immediate sync would
* be sufficient to guarantee that the file exists on disk, but recovery
* itself might remove it while replaying, for example, an
* XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record. Therefore, we need
* this even when wal_level=minimal.
*/
PageSetChecksumInplace(metapage, BTREE_METAPAGE);
smgrextend(index->rd_smgr, MAIN_FORKNUM, BTREE_METAPAGE,
(char *) metapage, true);
log_newpage(&index->rd_smgr->smgr_rnode.node, MAIN_FORKNUM,
BTREE_METAPAGE, metapage, true);
/*
* An immediate sync is required even if we xlog'd the page, because the
* write did not go through shared_buffers and therefore a concurrent
* checkpoint may have moved the redo pointer past our xlog record.
*/
smgrimmedsync(index->rd_smgr, MAIN_FORKNUM);
RelationCloseSmgr(index);
return (IndexBuildResult *) palloc0(sizeof(IndexBuildResult));
}
static bool
lsm_dummy_insert(Relation rel, Datum *values, bool *isnull,
ItemPointer ht_ctid, Relation heapRel,
IndexUniqueCheck checkUnique,
IndexInfo *indexInfo)
{
return false;
}
Datum
lsm_btree_wrapper(PG_FUNCTION_ARGS)
{
IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
amroutine->amstrategies = BTMaxStrategyNumber;
amroutine->amsupport = BTNProcs;
amroutine->amoptsprocnum = BTOPTIONS_PROC;
amroutine->amcanorder = true;
amroutine->amcanorderbyop = false;
amroutine->amcanbackward = true;
amroutine->amcanunique = false;
amroutine->amcanmulticol = true;
amroutine->amoptionalkey = true;
amroutine->amsearcharray = true;
amroutine->amsearchnulls = true;
amroutine->amstorage = false;
amroutine->amclusterable = true;
amroutine->ampredlocks = true;
amroutine->amcanparallel = false;
amroutine->amcaninclude = true;
amroutine->amusemaintenanceworkmem = false;
amroutine->amparallelvacuumoptions = 0;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = lsm_build_empty;
amroutine->ambuildempty = btbuildempty;
amroutine->aminsert = lsm_dummy_insert;
amroutine->ambulkdelete = btbulkdelete;
amroutine->amvacuumcleanup = btvacuumcleanup;
amroutine->amcanreturn = btcanreturn;
amroutine->amcostestimate = btcostestimate;
amroutine->amoptions = lsm_options;
amroutine->amproperty = btproperty;
amroutine->ambuildphasename = btbuildphasename;
amroutine->amvalidate = btvalidate;
amroutine->ambeginscan = btbeginscan;
amroutine->amrescan = btrescan;
amroutine->amgettuple = btgettuple;
amroutine->amgetbitmap = btgetbitmap;
amroutine->amendscan = btendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
amroutine->amestimateparallelscan = NULL;
amroutine->aminitparallelscan = NULL;
amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
/*
* Utulity hook handling creation of lsm indexes
*/
static void
lsm_process_utility(PlannedStmt *plannedStmt,
const char *queryString,
ProcessUtilityContext context,
ParamListInfo paramListInfo,
QueryEnvironment *queryEnvironment,
DestReceiver *destReceiver,
#if PG_VERSION_NUM>=130000
QueryCompletion *completionTag
#else
char *completionTag
#endif
)
{
Node *parseTree = plannedStmt->utilityStmt;
DropStmt* drop = NULL;
ObjectAddresses *drop_objects = NULL;
List* drop_oids = NULL;
ListCell* cell;
lsmEntry = NULL; /* Reset entry to check it after utility statement execution */
if (IsA(parseTree, DropStmt))
{
drop = (DropStmt*)parseTree;
if (drop->removeType == OBJECT_INDEX)
{
foreach (cell, drop->objects)
{
RangeVar* rv = makeRangeVarFromNameList((List *) lfirst(cell));
Relation index = relation_openrv(rv, ExclusiveLock);
if (index->rd_indam->ambuild == lsm_build)
{
lsmDictEntry* entry = lsm_get_entry(index);
if (drop_objects == NULL)
{
drop_objects = new_object_addresses();
}
for (int i = 0; i < 2; i++)
{
ObjectAddress obj;
obj.classId = RelationRelationId;
obj.objectId = entry->top[i];
obj.objectSubId = 0;
add_exact_object_address(&obj, drop_objects);
}
drop_oids = lappend_oid(drop_oids, RelationGetRelid(index));
}
relation_close(index, ExclusiveLock);
}
}
}
(PreviousProcessUtilityHook ? PreviousProcessUtilityHook : standard_ProcessUtility)
(plannedStmt,
queryString,
context,
paramListInfo,
queryEnvironment,
destReceiver,
completionTag);
if (lsmEntry)
{
if (IsA(parseTree, IndexStmt)) /* This is lsm creation statement */
{
IndexStmt* stmt = (IndexStmt*)parseTree;
char* originIndexName = stmt->idxname;
char* originAccessMethod = stmt->accessMethod;
for (int i = 0; i < 2; i++)
{
if (stmt->concurrent)
{
PushActiveSnapshot(GetTransactionSnapshot());
}
stmt->accessMethod = "lsm_btree_wrapper";
stmt->idxname = psprintf("%s_top%d", get_rel_name(lsmEntry->base), i);
lsmEntry->top[i] = DefineIndex(lsmEntry->heap,
stmt,
InvalidOid,
InvalidOid,
InvalidOid,
false,
false,
false,
false,
true).objectId;
}
stmt->accessMethod = originAccessMethod;
stmt->idxname = originIndexName;
}
else
{
for (int i = 0; i < 2; i++)
{
if (lsmEntry->top[i] == InvalidOid)
{
char* topidxname = psprintf("%s_top%d", get_rel_name(lsmEntry->base), i);
lsmEntry->top[i] = get_relname_relid(topidxname, get_rel_namespace(lsmEntry->base));
if (lsmEntry->top[i] == InvalidOid)
{
elog(ERROR, "lsm: failed to lookup %s index", topidxname);
}
}
}
}
if (ActiveSnapshotSet())
{
PopActiveSnapshot();
}
CommitTransactionCommand();
StartTransactionCommand();
/* Mark top index as invalid to prevent planner from using it in queries */
for (int i = 0; i < 2; i++)
{
index_set_state_flags(lsmEntry->top[i], INDEX_DROP_CLEAR_VALID);
}
LWLockRelease(lsmDictLock); /* Release lock set by lsm_build */
}
else if (drop_objects)
{
performMultipleDeletions(drop_objects, drop->behavior, 0);
LWLockAcquire(lsmDictLock, LW_EXCLUSIVE);
foreach (cell, drop_oids)
{
hash_search(lsmDict, &lfirst_oid(cell), HASH_REMOVE, NULL);
}
LWLockRelease(lsmDictLock);
}
}
/*
* Executor sinish hookto reclaim released locks on non-active top indexes
* to avoid "you don't own a lock of type RowExclusiveLock" warning
*/
static void
lsm_executor_finish(QueryDesc *queryDesc)
{
if (lsmReleasedLocks)
{
ListCell* cell;
foreach (cell, lsmReleasedLocks)
{
Oid indexOid = lfirst_oid(cell);
LockRelationOid(indexOid, RowExclusiveLock);
}
list_free(lsmReleasedLocks);
lsmReleasedLocks = NULL;
}
if (PreviousExecutorFinish)
PreviousExecutorFinish(queryDesc);
else
standard_ExecutorFinish(queryDesc);
}
void
_PG_init(void)
{
if (!process_shared_preload_libraries_in_progress)
{
elog(ERROR, "lsm: this extension should be loaded via shared_preload_libraries");
}
DefineCustomIntVariable("lsm.top_index_size",
"Size of top index B-Tree (kb)",
NULL,
&lsmTopIndexSize,
64*1024,
BLCKSZ/1024,
INT_MAX,
PGC_SIGHUP,
GUC_UNIT_KB,
NULL,
NULL,
NULL);
DefineCustomIntVariable("lsm.max_indexes",
"Maximal number of lsm indexes.",
NULL,
&lsmMaxIndexes,
1024,
1,
INT_MAX,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
lsmReloptKind = add_reloption_kind();
add_bool_reloption(lsmReloptKind, "unique",
"Index contains no duplicates",
false, AccessExclusiveLock);
add_int_reloption(lsmReloptKind, "top_index_size",
"Size of top index (kb)",
0, 0, INT_MAX, AccessExclusiveLock);
add_int_reloption(lsmReloptKind, "fillfactor",
"Packs btree index pages only to this percentage",
BTREE_DEFAULT_FILLFACTOR, BTREE_MIN_FILLFACTOR, 100, ShareUpdateExclusiveLock);
add_real_reloption(lsmReloptKind, "vacuum_cleanup_index_scale_factor",
"Packs btree index pages only to this percentage",
-1, 0.0, 1e10, ShareUpdateExclusiveLock);
add_bool_reloption(lsmReloptKind, "deduplicate_items",
"Enables \"deduplicate items\" feature for this btree index",
true, AccessExclusiveLock);
RequestAddinShmemSpace(hash_estimate_size(lsmMaxIndexes, sizeof(lsmDictEntry)));
RequestNamedLWLockTranche("lsm", 1);
PreviousShmemStartupHook = shmem_startup_hook;
shmem_startup_hook = lsm_shmem_startup;
PreviousProcessUtilityHook = ProcessUtility_hook;
ProcessUtility_hook = lsm_process_utility;
PreviousExecutorFinish = ExecutorFinish_hook;
ExecutorFinish_hook = lsm_executor_finish;
}
void _PG_fini(void)
{
ProcessUtility_hook = PreviousProcessUtilityHook;
shmem_startup_hook = PreviousShmemStartupHook;
}
Datum
lsm_get_merge_count(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
Relation index = index_open(relid, AccessShareLock);
lsmDictEntry* entry = lsm_get_entry(index);
index_close(index, AccessShareLock);
if (entry == NULL)
PG_RETURN_NULL();
else
PG_RETURN_INT64(entry->n_merges);
}
Datum
lsm_start_merge(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
Relation index = index_open(relid, AccessShareLock);
lsmDictEntry* entry = lsm_get_entry(index);
index_close(index, AccessShareLock);
SpinLockAcquire(&entry->spinlock);
if (!entry->merge_in_progress)
{
entry->merge_in_progress = true;
entry->active_index ^= 1;
entry->n_merges += 1;
if (entry->access_count[1-entry->active_index] == 0)
{
entry->start_merge = true;
if (entry->merger == NULL) /* lazy start of bgworker */
{
lsm_launch_bgworker(entry);
}
SetLatch(&entry->merger->procLatch);
}
}
SpinLockRelease(&entry->spinlock);
PG_RETURN_NULL();
}
Datum
lsm_wait_merge_completion(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
Relation index = index_open(relid, AccessShareLock);
lsmDictEntry* entry = lsm_get_entry(index);
index_close(index, AccessShareLock);
while (entry->merge_in_progress)
{
pg_usleep(1000000); /* one second */
}
PG_RETURN_NULL();
}
Datum
lsm_top_index_size(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
Relation index = index_open(relid, AccessShareLock);
lsmDictEntry* entry = lsm_get_entry(index);
index_close(index, AccessShareLock);
PG_RETURN_INT64((uint64)lsm_get_index_size(lsm_get_index_size(entry->top[entry->active_index]))*BLCKSZ);
}
/*
* It is too expensive to check index size at each insert because it requires traverse of all index file segments and calling lseek for each.
* But we do not need precise size, so it is enough to do it at each n-th insert. The lagest B-Tree key size is abut 2kb,
* so with N=64K in the worst case error will be less than 128Mb and for 32-bit key just 1Mb.
*/
#define LSM3_CHECK_TOP_INDEX_SIZE_PERIOD (64*1024) /* should be power of two */
/*
* Control structure for Lsm3 index located in shared memory
*/
typedef struct
{
Oid base; /* Oid of base index */
Oid heap; /* Oid of indexed relation */
Oid top[2]; /* Oids of two top indexes */
int access_count[2]; /* Access counter for top indexes */
int active_index; /* Index used for insert */
uint64 n_merges; /* Number of performed merges since database open */
uint64 n_inserts; /* Number of performed inserts since database open */
volatile bool start_merge; /* Start merging of top index with base index */
volatile bool merge_in_progress; /* Overflow of top index intiate merge process */
PGPROC* merger; /* Merger background worker */
Oid db_id; /* user ID (for background worker) */
Oid user_id; /* database Id (for background worker) */
int top_index_size; /* Size of top index */
slock_t spinlock; /* Spinlock to synchronize access */
} Lsm3DictEntry;
/*
* Opaque part of index scan descriptor
*/
typedef struct
{
Lsm3DictEntry* entry; /* Lsm3 control structure */
Relation top_index[2]; /* Opened top index relations */
SortSupport sortKeys; /* Context for comparing index tuples */
IndexScanDesc scan[3]; /* Scan descriptors for two top indexes and base index */
bool eof[3]; /* Indicators that end of index was reached */
bool unique; /* Whether index is "unique" and we can stop scan after locating first occurrence */
int curr_index; /* Index from which last tuple was selected (or -1 if none) */
} Lsm3ScanOpaque;
/* Lsm3 index options */
typedef struct
{
BTOptions nbt_opts; /* Standard B-Tree options */
int top_index_size; /* Size of top index (overrode lsm3.top_index_size GUC */
bool unique; /* Index may not contain duplicates. We prohibit unique constraint for Lsm3 index
* because it can not be enforced. But presence of this index option allows to optimize
* index lookup: if key is found in active top index, do not search other two indexes.
*/
} Lsm3Options;
...@@ -85,7 +85,6 @@ lsm_truncate_index(Relation index, Oid heap_oid) ...@@ -85,7 +85,6 @@ lsm_truncate_index(Relation index, Oid heap_oid)
static void static void
lsm_merge_indexes(Oid dst_oid, Relation top_index, Oid heap_oid) lsm_merge_indexes(Oid dst_oid, Relation top_index, Oid heap_oid)
{ {
//Relation top_index = index_open(src_oid, AccessShareLock);
elog(NOTICE,"merge dest_oid %d,heap rel Oid %d",dst_oid,heap_oid); elog(NOTICE,"merge dest_oid %d,heap rel Oid %d",dst_oid,heap_oid);
Relation heap = table_open(heap_oid, AccessShareLock); Relation heap = table_open(heap_oid, AccessShareLock);
Relation base_index = index_open(dst_oid, RowExclusiveLock); Relation base_index = index_open(dst_oid, RowExclusiveLock);
...@@ -104,7 +103,7 @@ lsm_merge_indexes(Oid dst_oid, Relation top_index, Oid heap_oid) ...@@ -104,7 +103,7 @@ lsm_merge_indexes(Oid dst_oid, Relation top_index, Oid heap_oid)
IndexTuple itup = scan->xs_itup; IndexTuple itup = scan->xs_itup;
if (BTreeTupleIsPosting(itup)) if (BTreeTupleIsPosting(itup))
{ {
/* Some dirty coding here related with handling of posting items (index deduplication). /*
* If index tuple is posting item, we need to transfer it to normal index tuple. * If index tuple is posting item, we need to transfer it to normal index tuple.
* Posting list is representing by index tuple with INDEX_ALT_TID_MASK bit set in t_info and * Posting list is representing by index tuple with INDEX_ALT_TID_MASK bit set in t_info and
* BT_IS_POSTING bit in TID offset, following by array of TIDs. * BT_IS_POSTING bit in TID offset, following by array of TIDs.
...@@ -129,7 +128,6 @@ tes */ ...@@ -129,7 +128,6 @@ tes */
} }
index_endscan(scan); index_endscan(scan);
base_index->rd_rel->relam = save_am; base_index->rd_rel->relam = save_am;
// index_close(top_index, AccessShareLock);
index_close(base_index, RowExclusiveLock); index_close(base_index, RowExclusiveLock);
table_close(heap, AccessShareLock); table_close(heap, AccessShareLock);
} }
...@@ -214,14 +212,14 @@ lsm_create_l1_if_not_exits(Relation heap,Relation index,LsmMetaData* lsmMetaCopy ...@@ -214,14 +212,14 @@ lsm_create_l1_if_not_exits(Relation heap,Relation index,LsmMetaData* lsmMetaCopy
char* newName; char* newName;
char* l0name; char* l0name;
Relation l1; Relation l1;
l0name=index->rd_rel->relname.data;
newName=(char*)palloc(NAMEDATALEN+2);
newName[0]='L';
newName[1]='1';
for(int i=0;i<NAMEDATALEN;++i){
newName[i+2]=l0name[i];
}
if(lsmMetaCopy->l1==InvalidOid){ if(lsmMetaCopy->l1==InvalidOid){
l0name=index->rd_rel->relname.data;
newName=(char*)palloc(NAMEDATALEN+2);
newName[0]='L';
newName[1]='1';
for(int i=0;i<NAMEDATALEN;++i){
newName[i+2]=l0name[i];
}
lsmMetaCopy->l1= index_concurrently_create_copy( lsmMetaCopy->l1= index_concurrently_create_copy(
heap,/*heap relation*/ heap,/*heap relation*/
index->rd_id,/*old oid*/ index->rd_id,/*old oid*/
......
shared_preload_libraries = 'lsm'
lsm.top_index_size=1MB
...@@ -10,6 +10,10 @@ typedef struct ...@@ -10,6 +10,10 @@ typedef struct
int top_index_size; /* Size of top index */ int top_index_size; /* Size of top index */
} LsmMetaData; } LsmMetaData;
void
index_rebuild(Relation heapRelation,
Relation indexRelation,
IndexInfo *indexInfo);
void void
lsm_create_l1_if_not_exits( lsm_create_l1_if_not_exits(
......
No preview for this file type
No preview for this file type
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment