Commit 29d58fd3 authored by Robert Haas's avatar Robert Haas

Transfer state pertaining to pending REINDEX operations to workers.

This will allow the pending patch for parallel CREATE INDEX to work
on system catalogs, and to provide the same level of protection
against use of user indexes while they are being rebuilt that we
have for non-parallel CREATE INDEX.

Patch by me, reviewed by Peter Geoghegan.

Discussion: http://postgr.es/m/CA+TgmoYN-YQU9JsGQcqFLovZ-C+Xgp1_xhJQad=cunGG-_p5gg@mail.gmail.com
Discussion: http://postgr.es/m/CAH2-Wzkv4UNkXYhqQRqk-u9rS7h5c-4cCW+EqQ8K_WSeS43aZg@mail.gmail.com
parent 4e54dd2e
...@@ -122,6 +122,9 @@ worker. This includes: ...@@ -122,6 +122,9 @@ worker. This includes:
values are restored, this incidentally sets SessionUserId and OuterUserId values are restored, this incidentally sets SessionUserId and OuterUserId
to the correct values. This final step restores CurrentUserId. to the correct values. This final step restores CurrentUserId.
- State related to pending REINDEX operations, which prevents access to
an index that is currently being rebuilt.
To prevent undetected or unprincipled deadlocks when running in parallel mode, To prevent undetected or unprincipled deadlocks when running in parallel mode,
this could should eventually handle heavyweight locks in some way. This is this could should eventually handle heavyweight locks in some way. This is
not implemented yet. not implemented yet.
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "access/session.h" #include "access/session.h"
#include "access/xact.h" #include "access/xact.h"
#include "access/xlog.h" #include "access/xlog.h"
#include "catalog/index.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "commands/async.h" #include "commands/async.h"
#include "executor/execParallel.h" #include "executor/execParallel.h"
...@@ -67,6 +68,7 @@ ...@@ -67,6 +68,7 @@
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A) #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
#define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000B)
/* Fixed-size parallel state. */ /* Fixed-size parallel state. */
typedef struct FixedParallelState typedef struct FixedParallelState
...@@ -200,6 +202,7 @@ InitializeParallelDSM(ParallelContext *pcxt) ...@@ -200,6 +202,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
Size tsnaplen = 0; Size tsnaplen = 0;
Size asnaplen = 0; Size asnaplen = 0;
Size tstatelen = 0; Size tstatelen = 0;
Size reindexlen = 0;
Size segsize = 0; Size segsize = 0;
int i; int i;
FixedParallelState *fps; FixedParallelState *fps;
...@@ -249,8 +252,10 @@ InitializeParallelDSM(ParallelContext *pcxt) ...@@ -249,8 +252,10 @@ InitializeParallelDSM(ParallelContext *pcxt)
tstatelen = EstimateTransactionStateSpace(); tstatelen = EstimateTransactionStateSpace();
shm_toc_estimate_chunk(&pcxt->estimator, tstatelen); shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle)); shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
reindexlen = EstimateReindexStateSpace();
shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
/* If you add more chunks here, you probably need to add keys. */ /* If you add more chunks here, you probably need to add keys. */
shm_toc_estimate_keys(&pcxt->estimator, 7); shm_toc_estimate_keys(&pcxt->estimator, 8);
/* Estimate space need for error queues. */ /* Estimate space need for error queues. */
StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
...@@ -319,6 +324,7 @@ InitializeParallelDSM(ParallelContext *pcxt) ...@@ -319,6 +324,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
char *tsnapspace; char *tsnapspace;
char *asnapspace; char *asnapspace;
char *tstatespace; char *tstatespace;
char *reindexspace;
char *error_queue_space; char *error_queue_space;
char *session_dsm_handle_space; char *session_dsm_handle_space;
char *entrypointstate; char *entrypointstate;
...@@ -360,6 +366,11 @@ InitializeParallelDSM(ParallelContext *pcxt) ...@@ -360,6 +366,11 @@ InitializeParallelDSM(ParallelContext *pcxt)
SerializeTransactionState(tstatelen, tstatespace); SerializeTransactionState(tstatelen, tstatespace);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
/* Serialize reindex state. */
reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
SerializeReindexState(reindexlen, reindexspace);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
/* Allocate space for worker information. */ /* Allocate space for worker information. */
pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers); pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
...@@ -972,6 +983,7 @@ ParallelWorkerMain(Datum main_arg) ...@@ -972,6 +983,7 @@ ParallelWorkerMain(Datum main_arg)
char *tsnapspace; char *tsnapspace;
char *asnapspace; char *asnapspace;
char *tstatespace; char *tstatespace;
char *reindexspace;
StringInfoData msgbuf; StringInfoData msgbuf;
char *session_dsm_handle_space; char *session_dsm_handle_space;
...@@ -1137,6 +1149,10 @@ ParallelWorkerMain(Datum main_arg) ...@@ -1137,6 +1149,10 @@ ParallelWorkerMain(Datum main_arg)
/* Set ParallelMasterBackendId so we know how to address temp relations. */ /* Set ParallelMasterBackendId so we know how to address temp relations. */
ParallelMasterBackendId = fps->parallel_master_backend_id; ParallelMasterBackendId = fps->parallel_master_backend_id;
/* Restore reindex state. */
reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
RestoreReindexState(reindexspace);
/* /*
* We've initialized all of our state now; nothing should change * We've initialized all of our state now; nothing should change
* hereafter. * hereafter.
......
...@@ -86,6 +86,18 @@ typedef struct ...@@ -86,6 +86,18 @@ typedef struct
tups_inserted; tups_inserted;
} v_i_state; } v_i_state;
/*
* Pointer-free representation of variables used when reindexing system
* catalogs; we use this to propagate those values to parallel workers.
*/
typedef struct
{
Oid currentlyReindexedHeap;
Oid currentlyReindexedIndex;
int numPendingReindexedIndexes;
Oid pendingReindexedIndexes[FLEXIBLE_ARRAY_MEMBER];
} SerializedReindexState;
/* non-export function prototypes */ /* non-export function prototypes */
static bool relationHasPrimaryKey(Relation rel); static bool relationHasPrimaryKey(Relation rel);
static TupleDesc ConstructTupleDescriptor(Relation heapRelation, static TupleDesc ConstructTupleDescriptor(Relation heapRelation,
...@@ -3653,7 +3665,8 @@ reindex_relation(Oid relid, int flags, int options) ...@@ -3653,7 +3665,8 @@ reindex_relation(Oid relid, int flags, int options)
* When we are busy reindexing a system index, this code provides support * When we are busy reindexing a system index, this code provides support
* for preventing catalog lookups from using that index. We also make use * for preventing catalog lookups from using that index. We also make use
* of this to catch attempted uses of user indexes during reindexing of * of this to catch attempted uses of user indexes during reindexing of
* those indexes. * those indexes. This information is propagated to parallel workers;
* attempting to change it during a parallel operation is not permitted.
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
...@@ -3719,6 +3732,8 @@ SetReindexProcessing(Oid heapOid, Oid indexOid) ...@@ -3719,6 +3732,8 @@ SetReindexProcessing(Oid heapOid, Oid indexOid)
static void static void
ResetReindexProcessing(void) ResetReindexProcessing(void)
{ {
if (IsInParallelMode())
elog(ERROR, "cannot modify reindex state during a parallel operation");
currentlyReindexedHeap = InvalidOid; currentlyReindexedHeap = InvalidOid;
currentlyReindexedIndex = InvalidOid; currentlyReindexedIndex = InvalidOid;
} }
...@@ -3736,6 +3751,8 @@ SetReindexPending(List *indexes) ...@@ -3736,6 +3751,8 @@ SetReindexPending(List *indexes)
/* Reindexing is not re-entrant. */ /* Reindexing is not re-entrant. */
if (pendingReindexedIndexes) if (pendingReindexedIndexes)
elog(ERROR, "cannot reindex while reindexing"); elog(ERROR, "cannot reindex while reindexing");
if (IsInParallelMode())
elog(ERROR, "cannot modify reindex state during a parallel operation");
pendingReindexedIndexes = list_copy(indexes); pendingReindexedIndexes = list_copy(indexes);
} }
...@@ -3746,6 +3763,8 @@ SetReindexPending(List *indexes) ...@@ -3746,6 +3763,8 @@ SetReindexPending(List *indexes)
static void static void
RemoveReindexPending(Oid indexOid) RemoveReindexPending(Oid indexOid)
{ {
if (IsInParallelMode())
elog(ERROR, "cannot modify reindex state during a parallel operation");
pendingReindexedIndexes = list_delete_oid(pendingReindexedIndexes, pendingReindexedIndexes = list_delete_oid(pendingReindexedIndexes,
indexOid); indexOid);
} }
...@@ -3757,5 +3776,59 @@ RemoveReindexPending(Oid indexOid) ...@@ -3757,5 +3776,59 @@ RemoveReindexPending(Oid indexOid)
static void static void
ResetReindexPending(void) ResetReindexPending(void)
{ {
if (IsInParallelMode())
elog(ERROR, "cannot modify reindex state during a parallel operation");
pendingReindexedIndexes = NIL; pendingReindexedIndexes = NIL;
} }
/*
* EstimateReindexStateSpace
* Estimate space needed to pass reindex state to parallel workers.
*/
extern Size
EstimateReindexStateSpace(void)
{
return offsetof(SerializedReindexState, pendingReindexedIndexes)
+ mul_size(sizeof(Oid), list_length(pendingReindexedIndexes));
}
/*
* SerializeReindexState
* Serialize reindex state for parallel workers.
*/
extern void
SerializeReindexState(Size maxsize, char *start_address)
{
SerializedReindexState *sistate = (SerializedReindexState *) start_address;
int c = 0;
ListCell *lc;
sistate->currentlyReindexedHeap = currentlyReindexedHeap;
sistate->currentlyReindexedIndex = currentlyReindexedIndex;
sistate->numPendingReindexedIndexes = list_length(pendingReindexedIndexes);
foreach(lc, pendingReindexedIndexes)
sistate->pendingReindexedIndexes[c++] = lfirst_oid(lc);
}
/*
* RestoreReindexState
* Restore reindex state in a parallel worker.
*/
extern void
RestoreReindexState(void *reindexstate)
{
SerializedReindexState *sistate = (SerializedReindexState *) reindexstate;
int c = 0;
MemoryContext oldcontext;
currentlyReindexedHeap = sistate->currentlyReindexedHeap;
currentlyReindexedIndex = sistate->currentlyReindexedIndex;
Assert(pendingReindexedIndexes == NIL);
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
for (c = 0; c < sistate->numPendingReindexedIndexes; ++c)
pendingReindexedIndexes =
lappend_oid(pendingReindexedIndexes,
sistate->pendingReindexedIndexes[c]);
MemoryContextSwitchTo(oldcontext);
}
...@@ -134,4 +134,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid); ...@@ -134,4 +134,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid);
extern bool ReindexIsProcessingIndex(Oid indexOid); extern bool ReindexIsProcessingIndex(Oid indexOid);
extern Oid IndexGetRelation(Oid indexId, bool missing_ok); extern Oid IndexGetRelation(Oid indexId, bool missing_ok);
extern Size EstimateReindexStateSpace(void);
extern void SerializeReindexState(Size maxsize, char *start_address);
extern void RestoreReindexState(void *reindexstate);
#endif /* INDEX_H */ #endif /* INDEX_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment