Commit d4663350 authored by Tom Lane's avatar Tom Lane

Don't be so trusting that shm_toc_lookup() will always succeed.

Given the possibility of race conditions and so on, it seems entirely
unsafe to just assume that shm_toc_lookup() always finds the key it's
looking for --- but that was exactly what all but one call site were
doing.  To fix, add a "bool noError" argument, similarly to what we
have in many other functions, and throw an error on an unexpected
lookup failure.  Remove now-redundant Asserts that a rather random
subset of call sites had.

I doubt this will throw any light on buildfarm member lorikeet's
recent failures, because if an unnoticed lookup failure were involved,
you'd kind of expect a null-pointer-dereference crash rather than the
observed symptom.  But you never know ... and this is better coding
practice even if it never catches anything.

Discussion: https://postgr.es/m/9697.1496675981@sss.pgh.pa.us
parent af51fea0
...@@ -392,12 +392,12 @@ ReinitializeParallelDSM(ParallelContext *pcxt) ...@@ -392,12 +392,12 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
} }
/* Reset a few bits of fixed parallel state to a clean state. */ /* Reset a few bits of fixed parallel state to a clean state. */
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
fps->last_xlog_end = 0; fps->last_xlog_end = 0;
/* Recreate error queues. */ /* Recreate error queues. */
error_queue_space = error_queue_space =
shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE); shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
for (i = 0; i < pcxt->nworkers; ++i) for (i = 0; i < pcxt->nworkers; ++i)
{ {
char *start; char *start;
...@@ -536,7 +536,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) ...@@ -536,7 +536,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
{ {
FixedParallelState *fps; FixedParallelState *fps;
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
if (fps->last_xlog_end > XactLastRecEnd) if (fps->last_xlog_end > XactLastRecEnd)
XactLastRecEnd = fps->last_xlog_end; XactLastRecEnd = fps->last_xlog_end;
} }
...@@ -973,8 +973,7 @@ ParallelWorkerMain(Datum main_arg) ...@@ -973,8 +973,7 @@ ParallelWorkerMain(Datum main_arg)
errmsg("invalid magic number in dynamic shared memory segment"))); errmsg("invalid magic number in dynamic shared memory segment")));
/* Look up fixed parallel state. */ /* Look up fixed parallel state. */
fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED); fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
Assert(fps != NULL);
MyFixedParallelState = fps; MyFixedParallelState = fps;
/* /*
...@@ -983,7 +982,7 @@ ParallelWorkerMain(Datum main_arg) ...@@ -983,7 +982,7 @@ ParallelWorkerMain(Datum main_arg)
* errors that happen here will not be reported back to the process that * errors that happen here will not be reported back to the process that
* requested that this worker be launched. * requested that this worker be launched.
*/ */
error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE); error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
mq = (shm_mq *) (error_queue_space + mq = (shm_mq *) (error_queue_space +
ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE); ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
shm_mq_set_sender(mq, MyProc); shm_mq_set_sender(mq, MyProc);
...@@ -1027,8 +1026,7 @@ ParallelWorkerMain(Datum main_arg) ...@@ -1027,8 +1026,7 @@ ParallelWorkerMain(Datum main_arg)
* this before restoring GUCs, because the libraries might define custom * this before restoring GUCs, because the libraries might define custom
* variables. * variables.
*/ */
libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY); libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
Assert(libraryspace != NULL);
RestoreLibraryState(libraryspace); RestoreLibraryState(libraryspace);
/* /*
...@@ -1036,8 +1034,7 @@ ParallelWorkerMain(Datum main_arg) ...@@ -1036,8 +1034,7 @@ ParallelWorkerMain(Datum main_arg)
* loading an additional library, though most likely the entry point is in * loading an additional library, though most likely the entry point is in
* the core backend or in a library we just loaded. * the core backend or in a library we just loaded.
*/ */
entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT); entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
Assert(entrypointstate != NULL);
library_name = entrypointstate; library_name = entrypointstate;
function_name = entrypointstate + strlen(library_name) + 1; function_name = entrypointstate + strlen(library_name) + 1;
...@@ -1054,30 +1051,26 @@ ParallelWorkerMain(Datum main_arg) ...@@ -1054,30 +1051,26 @@ ParallelWorkerMain(Datum main_arg)
SetClientEncoding(GetDatabaseEncoding()); SetClientEncoding(GetDatabaseEncoding());
/* Restore GUC values from launching backend. */ /* Restore GUC values from launching backend. */
gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC); gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
Assert(gucspace != NULL);
StartTransactionCommand(); StartTransactionCommand();
RestoreGUCState(gucspace); RestoreGUCState(gucspace);
CommitTransactionCommand(); CommitTransactionCommand();
/* Crank up a transaction state appropriate to a parallel worker. */ /* Crank up a transaction state appropriate to a parallel worker. */
tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE); tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
StartParallelWorkerTransaction(tstatespace); StartParallelWorkerTransaction(tstatespace);
/* Restore combo CID state. */ /* Restore combo CID state. */
combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID); combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
Assert(combocidspace != NULL);
RestoreComboCIDState(combocidspace); RestoreComboCIDState(combocidspace);
/* Restore transaction snapshot. */ /* Restore transaction snapshot. */
tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT); tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false);
Assert(tsnapspace != NULL);
RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace), RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
fps->parallel_master_pgproc); fps->parallel_master_pgproc);
/* Restore active snapshot. */ /* Restore active snapshot. */
asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT); asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
Assert(asnapspace != NULL);
PushActiveSnapshot(RestoreSnapshot(asnapspace)); PushActiveSnapshot(RestoreSnapshot(asnapspace));
/* /*
......
...@@ -341,7 +341,7 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) ...@@ -341,7 +341,7 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
mul_size(PARALLEL_TUPLE_QUEUE_SIZE, mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
pcxt->nworkers)); pcxt->nworkers));
else else
tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE); tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
/* Create the queues, and become the receiver for each. */ /* Create the queues, and become the receiver for each. */
for (i = 0; i < pcxt->nworkers; ++i) for (i = 0; i < pcxt->nworkers; ++i)
...@@ -684,7 +684,7 @@ ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc) ...@@ -684,7 +684,7 @@ ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
char *mqspace; char *mqspace;
shm_mq *mq; shm_mq *mq;
mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE); mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE; mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
mq = (shm_mq *) mqspace; mq = (shm_mq *) mqspace;
shm_mq_set_sender(mq, MyProc); shm_mq_set_sender(mq, MyProc);
...@@ -705,14 +705,14 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, ...@@ -705,14 +705,14 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
char *queryString; char *queryString;
/* Get the query string from shared memory */ /* Get the query string from shared memory */
queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT); queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
/* Reconstruct leader-supplied PlannedStmt. */ /* Reconstruct leader-supplied PlannedStmt. */
pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT); pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
pstmt = (PlannedStmt *) stringToNode(pstmtspace); pstmt = (PlannedStmt *) stringToNode(pstmtspace);
/* Reconstruct ParamListInfo. */ /* Reconstruct ParamListInfo. */
paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS); paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS, false);
paramLI = RestoreParamList(&paramspace); paramLI = RestoreParamList(&paramspace);
/* /*
...@@ -843,7 +843,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ...@@ -843,7 +843,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
receiver = ExecParallelGetReceiver(seg, toc); receiver = ExecParallelGetReceiver(seg, toc);
instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION); instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
if (instrumentation != NULL) if (instrumentation != NULL)
instrument_options = instrumentation->instrument_options; instrument_options = instrumentation->instrument_options;
queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options); queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
...@@ -858,7 +858,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ...@@ -858,7 +858,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
InstrStartParallelQuery(); InstrStartParallelQuery();
/* Attach to the dynamic shared memory area. */ /* Attach to the dynamic shared memory area. */
area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA); area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
area = dsa_attach_in_place(area_space, seg); area = dsa_attach_in_place(area_space, seg);
/* Start up the executor */ /* Start up the executor */
...@@ -875,7 +875,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ...@@ -875,7 +875,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
ExecutorFinish(queryDesc); ExecutorFinish(queryDesc);
/* Report buffer usage during parallel execution. */ /* Report buffer usage during parallel execution. */
buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE); buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]); InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
/* Report instrumentation data if any instrumentation options are set. */ /* Report instrumentation data if any instrumentation options are set. */
......
...@@ -1005,7 +1005,7 @@ ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc) ...@@ -1005,7 +1005,7 @@ ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc)
ParallelBitmapHeapState *pstate; ParallelBitmapHeapState *pstate;
Snapshot snapshot; Snapshot snapshot;
pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id); pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
node->pstate = pstate; node->pstate = pstate;
snapshot = RestoreSnapshot(pstate->phs_snapshot_data); snapshot = RestoreSnapshot(pstate->phs_snapshot_data);
......
...@@ -194,7 +194,7 @@ ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) ...@@ -194,7 +194,7 @@ ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
int plan_node_id = node->ss.ps.plan->plan_node_id; int plan_node_id = node->ss.ps.plan->plan_node_id;
void *coordinate; void *coordinate;
coordinate = shm_toc_lookup(toc, plan_node_id); coordinate = shm_toc_lookup(toc, plan_node_id, false);
methods->InitializeWorkerCustomScan(node, toc, coordinate); methods->InitializeWorkerCustomScan(node, toc, coordinate);
} }
} }
......
...@@ -344,7 +344,7 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc) ...@@ -344,7 +344,7 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
int plan_node_id = node->ss.ps.plan->plan_node_id; int plan_node_id = node->ss.ps.plan->plan_node_id;
void *coordinate; void *coordinate;
coordinate = shm_toc_lookup(toc, plan_node_id); coordinate = shm_toc_lookup(toc, plan_node_id, false);
fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate); fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
} }
} }
......
...@@ -676,7 +676,7 @@ ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, shm_toc *toc) ...@@ -676,7 +676,7 @@ ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, shm_toc *toc)
{ {
ParallelIndexScanDesc piscan; ParallelIndexScanDesc piscan;
piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id); piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
node->ioss_ScanDesc = node->ioss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation, index_beginscan_parallel(node->ss.ss_currentRelation,
node->ioss_RelationDesc, node->ioss_RelationDesc,
......
...@@ -1714,7 +1714,7 @@ ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc) ...@@ -1714,7 +1714,7 @@ ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc)
{ {
ParallelIndexScanDesc piscan; ParallelIndexScanDesc piscan;
piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id); piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
node->iss_ScanDesc = node->iss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation, index_beginscan_parallel(node->ss.ss_currentRelation,
node->iss_RelationDesc, node->iss_RelationDesc,
......
...@@ -332,7 +332,7 @@ ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc) ...@@ -332,7 +332,7 @@ ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc)
{ {
ParallelHeapScanDesc pscan; ParallelHeapScanDesc pscan;
pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id); pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
node->ss.ss_currentScanDesc = node->ss.ss_currentScanDesc =
heap_beginscan_parallel(node->ss.ss_currentRelation, pscan); heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
} }
...@@ -208,6 +208,9 @@ shm_toc_insert(shm_toc *toc, uint64 key, void *address) ...@@ -208,6 +208,9 @@ shm_toc_insert(shm_toc *toc, uint64 key, void *address)
/* /*
* Look up a TOC entry. * Look up a TOC entry.
* *
* If the key is not found, returns NULL if noError is true, otherwise
* throws elog(ERROR).
*
* Unlike the other functions in this file, this operation acquires no lock; * Unlike the other functions in this file, this operation acquires no lock;
* it uses only barriers. It probably wouldn't hurt concurrency very much even * it uses only barriers. It probably wouldn't hurt concurrency very much even
* if it did get a lock, but since it's reasonably likely that a group of * if it did get a lock, but since it's reasonably likely that a group of
...@@ -215,7 +218,7 @@ shm_toc_insert(shm_toc *toc, uint64 key, void *address) ...@@ -215,7 +218,7 @@ shm_toc_insert(shm_toc *toc, uint64 key, void *address)
* right around the same time, there seems to be some value in avoiding it. * right around the same time, there seems to be some value in avoiding it.
*/ */
void * void *
shm_toc_lookup(shm_toc *toc, uint64 key) shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
{ {
uint64 nentry; uint64 nentry;
uint64 i; uint64 i;
...@@ -226,10 +229,15 @@ shm_toc_lookup(shm_toc *toc, uint64 key) ...@@ -226,10 +229,15 @@ shm_toc_lookup(shm_toc *toc, uint64 key)
/* Now search for a matching entry. */ /* Now search for a matching entry. */
for (i = 0; i < nentry; ++i) for (i = 0; i < nentry; ++i)
{
if (toc->toc_entry[i].key == key) if (toc->toc_entry[i].key == key)
return ((char *) toc) + toc->toc_entry[i].offset; return ((char *) toc) + toc->toc_entry[i].offset;
}
/* No matching entry was found. */ /* No matching entry was found. */
if (!noError)
elog(ERROR, "could not find key " UINT64_FORMAT " in shm TOC at %p",
key, toc);
return NULL; return NULL;
} }
......
...@@ -32,7 +32,7 @@ extern shm_toc *shm_toc_attach(uint64 magic, void *address); ...@@ -32,7 +32,7 @@ extern shm_toc *shm_toc_attach(uint64 magic, void *address);
extern void *shm_toc_allocate(shm_toc *toc, Size nbytes); extern void *shm_toc_allocate(shm_toc *toc, Size nbytes);
extern Size shm_toc_freespace(shm_toc *toc); extern Size shm_toc_freespace(shm_toc *toc);
extern void shm_toc_insert(shm_toc *toc, uint64 key, void *address); extern void shm_toc_insert(shm_toc *toc, uint64 key, void *address);
extern void *shm_toc_lookup(shm_toc *toc, uint64 key); extern void *shm_toc_lookup(shm_toc *toc, uint64 key, bool noError);
/* /*
* Tools for estimating how large a chunk of shared memory will be needed * Tools for estimating how large a chunk of shared memory will be needed
......
...@@ -95,7 +95,7 @@ test_shm_mq_main(Datum main_arg) ...@@ -95,7 +95,7 @@ test_shm_mq_main(Datum main_arg)
* find it. Our worker number gives our identity: there may be just one * find it. Our worker number gives our identity: there may be just one
* worker involved in this parallel operation, or there may be many. * worker involved in this parallel operation, or there may be many.
*/ */
hdr = shm_toc_lookup(toc, 0); hdr = shm_toc_lookup(toc, 0, false);
SpinLockAcquire(&hdr->mutex); SpinLockAcquire(&hdr->mutex);
myworkernumber = ++hdr->workers_attached; myworkernumber = ++hdr->workers_attached;
SpinLockRelease(&hdr->mutex); SpinLockRelease(&hdr->mutex);
...@@ -158,10 +158,10 @@ attach_to_queues(dsm_segment *seg, shm_toc *toc, int myworkernumber, ...@@ -158,10 +158,10 @@ attach_to_queues(dsm_segment *seg, shm_toc *toc, int myworkernumber,
shm_mq *inq; shm_mq *inq;
shm_mq *outq; shm_mq *outq;
inq = shm_toc_lookup(toc, myworkernumber); inq = shm_toc_lookup(toc, myworkernumber, false);
shm_mq_set_receiver(inq, MyProc); shm_mq_set_receiver(inq, MyProc);
*inqhp = shm_mq_attach(inq, seg, NULL); *inqhp = shm_mq_attach(inq, seg, NULL);
outq = shm_toc_lookup(toc, myworkernumber + 1); outq = shm_toc_lookup(toc, myworkernumber + 1, false);
shm_mq_set_sender(outq, MyProc); shm_mq_set_sender(outq, MyProc);
*outqhp = shm_mq_attach(outq, seg, NULL); *outqhp = shm_mq_attach(outq, seg, NULL);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment