Commit 924bcf4f authored by Robert Haas's avatar Robert Haas

Create an infrastructure for parallel computation in PostgreSQL.

This does four basic things.  First, it provides convenience routines
to coordinate the startup and shutdown of parallel workers.  Second,
it synchronizes various pieces of state (e.g. GUCs, combo CID
mappings, transaction snapshot) from the parallel group leader to the
worker processes.  Third, it prohibits various operations that would
result in unsafe changes to that state while parallelism is active.
Finally, it propagates events that would result in an ErrorResponse,
NoticeResponse, or NotifyResponse message being sent to the client
from the parallel workers back to the master, from which they can then
be sent on to the client.

Robert Haas, Amit Kapila, Noah Misch, Rushabh Lathia, Jeevan Chalke.
Suggestions and review from Andres Freund, Heikki Linnakangas, Noah
Misch, Simon Riggs, Euler Taveira, and Jim Nasby.
parent 669c7d20
......@@ -546,6 +546,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
switch (event)
{
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_COMMIT:
/* Commit all remote transactions during pre-commit */
do_sql_command(entry->conn, "COMMIT TRANSACTION");
......@@ -588,11 +589,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot prepare a transaction that modified remote tables")));
break;
case XACT_EVENT_PARALLEL_COMMIT:
case XACT_EVENT_COMMIT:
case XACT_EVENT_PREPARE:
/* Pre-commit should have closed the open transaction */
elog(ERROR, "missed cleaning up connection during pre-commit");
break;
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_ABORT:
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
......
......@@ -42,6 +42,7 @@
#include "access/heapam_xlog.h"
#include "access/hio.h"
#include "access/multixact.h"
#include "access/parallel.h"
#include "access/relscan.h"
#include "access/sysattr.h"
#include "access/transam.h"
......@@ -1051,7 +1052,13 @@ relation_open(Oid relationId, LOCKMODE lockmode)
/* Make note that we've accessed a temporary relation */
if (RelationUsesLocalBuffers(r))
{
if (IsParallelWorker())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot access temporary tables during a parallel operation")));
MyXactAccessedTempRel = true;
}
pgstat_initstats(r);
......@@ -1097,7 +1104,13 @@ try_relation_open(Oid relationId, LOCKMODE lockmode)
/* Make note that we've accessed a temporary relation */
if (RelationUsesLocalBuffers(r))
{
if (IsParallelWorker())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot access temporary tables during a parallel operation")));
MyXactAccessedTempRel = true;
}
pgstat_initstats(r);
......@@ -2237,6 +2250,17 @@ static HeapTuple
heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
CommandId cid, int options)
{
/*
* For now, parallel operations are required to be strictly read-only.
* Unlike heap_update() and heap_delete(), an insert should never create
* a combo CID, so it might be possible to relax this restriction, but
* not without more thought and testing.
*/
if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot insert tuples during a parallel operation")));
if (relation->rd_rel->relhasoids)
{
#ifdef NOT_USED
......@@ -2648,6 +2672,16 @@ heap_delete(Relation relation, ItemPointer tid,
Assert(ItemPointerIsValid(tid));
/*
* Forbid this during a parallel operation, lets it allocate a combocid.
* Other workers might need that combocid for visibility checks, and we
* have no provision for broadcasting it to them.
*/
if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot delete tuples during a parallel operation")));
block = ItemPointerGetBlockNumber(tid);
buffer = ReadBuffer(relation, block);
page = BufferGetPage(buffer);
......@@ -3099,6 +3133,16 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
Assert(ItemPointerIsValid(otid));
/*
* Forbid this during a parallel operation, lets it allocate a combocid.
* Other workers might need that combocid for visibility checks, and we
* have no provision for broadcasting it to them.
*/
if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot update tuples during a parallel operation")));
/*
* Fetch the list of attributes to be checked for HOT update. This is
* wasted effort if we fail to update or have to put the new tuple on a
......@@ -5400,6 +5444,17 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
uint32 oldlen;
uint32 newlen;
/*
* For now, parallel operations are required to be strictly read-only.
* Unlike a regular update, this should never create a combo CID, so it
* might be possible to relax this restriction, but not without more
* thought and testing. It's not clear that it would be useful, anyway.
*/
if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot update tuples during a parallel operation")));
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&(tuple->t_self)));
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
page = (Page) BufferGetPage(buffer);
......
......@@ -12,7 +12,7 @@ subdir = src/backend/access/transam
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = clog.o commit_ts.o multixact.o rmgr.o slru.o subtrans.o \
OBJS = clog.o commit_ts.o multixact.o parallel.o rmgr.o slru.o subtrans.o \
timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
xact.o xlog.o xlogarchive.o xlogfuncs.o \
xloginsert.o xlogreader.o xlogutils.o
......
This diff is collapsed.
This diff is collapsed.
......@@ -49,6 +49,13 @@ GetNewTransactionId(bool isSubXact)
{
TransactionId xid;
/*
* Workers synchronize transaction state at the beginning of each parallel
* operation, so we can't account for new XIDs after that point.
*/
if (IsInParallelMode())
elog(ERROR, "cannot assign TransactionIds during a parallel operation");
/*
* During bootstrap initialization, we return the special bootstrap
* transaction id.
......
This diff is collapsed.
......@@ -292,6 +292,14 @@ static TimeLineID curFileTLI;
* end+1 of the last record, and is reset when we end a top-level transaction,
* or start a new one; so it can be used to tell if the current transaction has
* created any XLOG records.
*
* While in parallel mode, this may not be fully up to date. When committing,
* a transaction can assume this covers all xlog records written either by the
* user backend or by any parallel worker which was present at any point during
* the transaction. But when aborting, or when still in parallel mode, other
* parallel backends may have written WAL records at later LSNs than the value
* stored here. The parallel leader advances its own copy, when necessary,
* in WaitForParallelWorkersToFinish.
*/
static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
......
......@@ -20,6 +20,7 @@
#include "postgres.h"
#include "access/htup_details.h"
#include "access/parallel.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/dependency.h"
......@@ -3646,6 +3647,12 @@ InitTempTableNamespace(void)
(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
errmsg("cannot create temporary tables during recovery")));
/* Parallel workers can't create temporary tables, either. */
if (IsParallelWorker())
ereport(ERROR,
(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
errmsg("cannot create temporary tables in parallel mode")));
snprintf(namespaceName, sizeof(namespaceName), "pg_temp_%d", MyBackendId);
namespaceId = get_namespace_oid(namespaceName, true);
......@@ -3709,7 +3716,7 @@ InitTempTableNamespace(void)
* End-of-transaction cleanup for namespaces.
*/
void
AtEOXact_Namespace(bool isCommit)
AtEOXact_Namespace(bool isCommit, bool parallel)
{
/*
* If we abort the transaction in which a temp namespace was selected,
......@@ -3719,7 +3726,7 @@ AtEOXact_Namespace(bool isCommit)
* at backend shutdown. (We only want to register the callback once per
* session, so this is a good place to do it.)
*/
if (myTempNamespaceSubID != InvalidSubTransactionId)
if (myTempNamespaceSubID != InvalidSubTransactionId && !parallel)
{
if (isCommit)
before_shmem_exit(RemoveTempRelationsCallback, 0);
......
......@@ -924,9 +924,10 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
{
Assert(rel);
/* check read-only transaction */
/* check read-only transaction and parallel mode */
if (XactReadOnly && !rel->rd_islocaltemp)
PreventCommandIfReadOnly("COPY FROM");
PreventCommandIfParallelMode("COPY FROM");
cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
stmt->attlist, stmt->options);
......
......@@ -565,6 +565,13 @@ nextval_internal(Oid relid)
if (!seqrel->rd_islocaltemp)
PreventCommandIfReadOnly("nextval()");
/*
* Forbid this during parallel operation because, to make it work,
* the cooperating backends would need to share the backend-local cached
* sequence information. Currently, we don't support that.
*/
PreventCommandIfParallelMode("nextval()");
if (elm->last != elm->cached) /* some numbers were cached */
{
Assert(elm->last_valid);
......@@ -862,6 +869,13 @@ do_setval(Oid relid, int64 next, bool iscalled)
if (!seqrel->rd_islocaltemp)
PreventCommandIfReadOnly("setval()");
/*
* Forbid this during parallel operation because, to make it work,
* the cooperating backends would need to share the backend-local cached
* sequence information. Currently, we don't support that.
*/
PreventCommandIfParallelMode("setval()");
/* lock page' buffer and read tuple */
seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
......
......@@ -147,8 +147,20 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
/*
* If the transaction is read-only, we need to check if any writes are
* planned to non-temporary tables. EXPLAIN is considered read-only.
*
* Don't allow writes in parallel mode. Supporting UPDATE and DELETE would
* require (a) storing the combocid hash in shared memory, rather than
* synchronizing it just once at the start of parallelism, and (b) an
* alternative to heap_update()'s reliance on xmax for mutual exclusion.
* INSERT may have no such troubles, but we forbid it to simplify the
* checks.
*
* We have lower-level defenses in CommandCounterIncrement and elsewhere
* against performing unsafe operations in parallel mode, but this gives
* a more user-friendly error message.
*/
if (XactReadOnly && !(eflags & EXEC_FLAG_EXPLAIN_ONLY))
if ((XactReadOnly || IsInParallelMode()) &&
!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
ExecCheckXactReadOnly(queryDesc->plannedstmt);
/*
......@@ -691,18 +703,23 @@ ExecCheckRTEPerms(RangeTblEntry *rte)
}
/*
* Check that the query does not imply any writes to non-temp tables.
* Check that the query does not imply any writes to non-temp tables;
* unless we're in parallel mode, in which case don't even allow writes
* to temp tables.
*
* Note: in a Hot Standby slave this would need to reject writes to temp
* tables as well; but an HS slave can't have created any temp tables
* in the first place, so no need to check that.
* tables just as we do in parallel mode; but an HS slave can't have created
* any temp tables in the first place, so no need to check that.
*/
static void
ExecCheckXactReadOnly(PlannedStmt *plannedstmt)
{
ListCell *l;
/* Fail if write permissions are requested on any non-temp table */
/*
* Fail if write permissions are requested in parallel mode for
* table (temp or non-temp), otherwise fail for any non-temp table.
*/
foreach(l, plannedstmt->rtable)
{
RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);
......@@ -718,6 +735,9 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt)
PreventCommandIfReadOnly(CreateCommandTag((Node *) plannedstmt));
}
if (plannedstmt->commandType != CMD_SELECT || plannedstmt->hasModifyingCTE)
PreventCommandIfParallelMode(CreateCommandTag((Node *) plannedstmt));
}
......
......@@ -513,6 +513,9 @@ init_execution_state(List *queryTree_list,
errmsg("%s is not allowed in a non-volatile function",
CreateCommandTag(stmt))));
if (IsInParallelMode() && !CommandIsReadOnly(stmt))
PreventCommandIfParallelMode(CreateCommandTag(stmt));
/* OK, build the execution_state for this query */
newes = (execution_state *) palloc(sizeof(execution_state));
if (preves)
......
......@@ -23,6 +23,7 @@
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/spi_priv.h"
#include "miscadmin.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
......@@ -1322,13 +1323,14 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan,
}
/*
* If told to be read-only, we'd better check for read-only queries. This
* can't be done earlier because we need to look at the finished, planned
* queries. (In particular, we don't want to do it between GetCachedPlan
* and PortalDefineQuery, because throwing an error between those steps
* would result in leaking our plancache refcount.)
* If told to be read-only, or in parallel mode, verify that this query
* is in fact read-only. This can't be done earlier because we need to
* look at the finished, planned queries. (In particular, we don't want
* to do it between GetCachedPlan and PortalDefineQuery, because throwing
* an error between those steps would result in leaking our plancache
* refcount.)
*/
if (read_only)
if (read_only || IsInParallelMode())
{
ListCell *lc;
......@@ -1337,11 +1339,16 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan,
Node *pstmt = (Node *) lfirst(lc);
if (!CommandIsReadOnly(pstmt))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
/* translator: %s is a SQL statement name */
errmsg("%s is not allowed in a non-volatile function",
CreateCommandTag(pstmt))));
{
if (read_only)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
/* translator: %s is a SQL statement name */
errmsg("%s is not allowed in a non-volatile function",
CreateCommandTag(pstmt))));
else
PreventCommandIfParallelMode(CreateCommandTag(pstmt));
}
}
}
......@@ -2129,6 +2136,9 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
errmsg("%s is not allowed in a non-volatile function",
CreateCommandTag(stmt))));
if (IsInParallelMode() && !CommandIsReadOnly(stmt))
PreventCommandIfParallelMode(CreateCommandTag(stmt));
/*
* If not read-only mode, advance the command counter before each
* command and update the snapshot.
......
......@@ -16,12 +16,15 @@
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
#include "miscadmin.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
static shm_mq *pq_mq;
static shm_mq_handle *pq_mq_handle;
static bool pq_mq_busy = false;
static pid_t pq_mq_parallel_master_pid = 0;
static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId;
static void mq_comm_reset(void);
static int mq_flush(void);
......@@ -57,6 +60,18 @@ pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
FrontendProtocol = PG_PROTOCOL_LATEST;
}
/*
* Arrange to SendProcSignal() to the parallel master each time we transmit
* message data via the shm_mq.
*/
void
pq_set_parallel_master(pid_t pid, BackendId backend_id)
{
Assert(PqCommMethods == &PqCommMqMethods);
pq_mq_parallel_master_pid = pid;
pq_mq_parallel_master_backend_id = backend_id;
}
static void
mq_comm_reset(void)
{
......@@ -120,7 +135,23 @@ mq_putmessage(char msgtype, const char *s, size_t len)
iov[1].len = len;
Assert(pq_mq_handle != NULL);
result = shm_mq_sendv(pq_mq_handle, iov, 2, false);
for (;;)
{
result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
if (pq_mq_parallel_master_pid != 0)
SendProcSignal(pq_mq_parallel_master_pid,
PROCSIG_PARALLEL_MESSAGE,
pq_mq_parallel_master_backend_id);
if (result != SHM_MQ_WOULD_BLOCK)
break;
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
CHECK_FOR_INTERRUPTS();
ResetLatch(&MyProc->procLatch);
}
pq_mq_busy = false;
......
......@@ -995,6 +995,56 @@ WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp)
return status;
}
/*
* Wait for a background worker to stop.
*
* If the worker hasn't yet started, or is running, we wait for it to stop
* and then return BGWH_STOPPED. However, if the postmaster has died, we give
* up and return BGWH_POSTMASTER_DIED, because it's the postmaster that
* notifies us when a worker's state changes.
*/
BgwHandleStatus
WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
{
BgwHandleStatus status;
int rc;
bool save_set_latch_on_sigusr1;
save_set_latch_on_sigusr1 = set_latch_on_sigusr1;
set_latch_on_sigusr1 = true;
PG_TRY();
{
for (;;)
{
pid_t pid;
CHECK_FOR_INTERRUPTS();
status = GetBackgroundWorkerPid(handle, &pid);
if (status == BGWH_STOPPED)
return status;
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
if (rc & WL_POSTMASTER_DEATH)
return BGWH_POSTMASTER_DIED;
ResetLatch(&MyProc->procLatch);
}
}
PG_CATCH();
{
set_latch_on_sigusr1 = save_set_latch_on_sigusr1;
PG_RE_THROW();
}
PG_END_TRY();
set_latch_on_sigusr1 = save_set_latch_on_sigusr1;
return status;
}
/*
* Instruct the postmaster to terminate a background worker.
*
......
......@@ -1682,6 +1682,50 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
return result;
}
/*
* ProcArrayInstallRestoredXmin -- install restored xmin into MyPgXact->xmin
*
* This is like ProcArrayInstallImportedXmin, but we have a pointer to the
* PGPROC of the transaction from which we imported the snapshot, rather than
* an XID.
*
* Returns TRUE if successful, FALSE if source xact is no longer running.
*/
bool
ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
{
bool result = false;
TransactionId xid;
volatile PGXACT *pgxact;
Assert(TransactionIdIsNormal(xmin));
Assert(proc != NULL);
/* Get lock so source xact can't end while we're doing this */
LWLockAcquire(ProcArrayLock, LW_SHARED);
pgxact = &allPgXact[proc->pgprocno];
/*
* Be certain that the referenced PGPROC has an advertised xmin which
* is no later than the one we're installing, so that the system-wide
* xmin can't go backwards. Also, make sure it's running in the same
* database, so that the per-database xmin cannot go backwards.
*/
xid = pgxact->xmin; /* fetch just once */
if (proc->databaseId == MyDatabaseId &&
TransactionIdIsNormal(xid) &&
TransactionIdPrecedesOrEquals(xid, xmin))
{
MyPgXact->xmin = TransactionXmin = xmin;
result = true;
}
LWLockRelease(ProcArrayLock);
return result;
}
/*
* GetRunningTransactionData -- returns information about running transactions.
*
......
......@@ -17,6 +17,7 @@
#include <signal.h>
#include <unistd.h>
#include "access/parallel.h"
#include "commands/async.h"
#include "miscadmin.h"
#include "storage/latch.h"
......@@ -274,6 +275,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_NOTIFY_INTERRUPT))
HandleNotifyInterrupt();
if (CheckProcSignal(PROCSIG_PARALLEL_MESSAGE))
HandleParallelMessageInterrupt();
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);
......
......@@ -1653,6 +1653,14 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
Assert(!RecoveryInProgress());
/*
* Since all parts of a serializable transaction must use the same
* snapshot, it is too late to establish one after a parallel operation
* has begun.
*/
if (IsInParallelMode())
elog(ERROR, "cannot establish serializable snapshot during a parallel operation");
proc = MyProc;
Assert(proc != NULL);
GET_VXID_FROM_PGPROC(vxid, *proc);
......
......@@ -36,6 +36,7 @@
#include "rusagestub.h"
#endif
#include "access/parallel.h"
#include "access/printtup.h"
#include "access/xact.h"
#include "catalog/pg_type.h"
......@@ -2988,7 +2989,8 @@ ProcessInterrupts(void)
}
}
/* If we get here, do nothing (probably, QueryCancelPending was reset) */
if (ParallelMessagePending)
HandleParallelMessages();
}
......
......@@ -128,14 +128,15 @@ CommandIsReadOnly(Node *parsetree)
static void
check_xact_readonly(Node *parsetree)
{
if (!XactReadOnly)
/* Only perform the check if we have a reason to do so. */
if (!XactReadOnly && !IsInParallelMode())
return;
/*
* Note: Commands that need to do more complicated checking are handled
* elsewhere, in particular COPY and plannable statements do their own
* checking. However they should all call PreventCommandIfReadOnly to
* actually throw the error.
* checking. However they should all call PreventCommandIfReadOnly
* or PreventCommandIfParallelMode to actually throw the error.
*/
switch (nodeTag(parsetree))
......@@ -208,6 +209,7 @@ check_xact_readonly(Node *parsetree)
case T_ImportForeignSchemaStmt:
case T_SecLabelStmt:
PreventCommandIfReadOnly(CreateCommandTag(parsetree));
PreventCommandIfParallelMode(CreateCommandTag(parsetree));
break;
default:
/* do nothing */
......@@ -232,6 +234,24 @@ PreventCommandIfReadOnly(const char *cmdname)
cmdname)));
}
/*
* PreventCommandIfParallelMode: throw error if current (sub)transaction is
* in parallel mode.
*
* This is useful mainly to ensure consistency of the error message wording;
* most callers have checked IsInParallelMode() for themselves.
*/
void
PreventCommandIfParallelMode(const char *cmdname)
{
if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
/* translator: %s is name of a SQL command, eg CREATE */
errmsg("cannot execute %s during a parallel operation",
cmdname)));
}
/*
* PreventCommandDuringRecovery: throw error if RecoveryInProgress
*
......@@ -618,6 +638,7 @@ standard_ProcessUtility(Node *parsetree,
case T_ClusterStmt:
/* we choose to allow this during "read only" transactions */
PreventCommandDuringRecovery("CLUSTER");
/* forbidden in parallel mode due to CommandIsReadOnly */
cluster((ClusterStmt *) parsetree, isTopLevel);
break;
......@@ -628,6 +649,7 @@ standard_ProcessUtility(Node *parsetree,
/* we choose to allow this during "read only" transactions */
PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ?
"VACUUM" : "ANALYZE");
/* forbidden in parallel mode due to CommandIsReadOnly */
ExecVacuum(stmt, isTopLevel);
}
break;
......@@ -704,6 +726,7 @@ standard_ProcessUtility(Node *parsetree,
* outside a transaction block is presumed to be user error.
*/
RequireTransactionChain(isTopLevel, "LOCK TABLE");
/* forbidden in parallel mode due to CommandIsReadOnly */
LockTableCommand((LockStmt *) parsetree);
break;
......@@ -735,6 +758,7 @@ standard_ProcessUtility(Node *parsetree,
/* we choose to allow this during "read only" transactions */
PreventCommandDuringRecovery("REINDEX");
/* forbidden in parallel mode due to CommandIsReadOnly */
switch (stmt->kind)
{
case REINDEX_OBJECT_INDEX:
......
......@@ -13,6 +13,7 @@
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "miscadmin.h"
......@@ -411,6 +412,15 @@ pg_lock_status(PG_FUNCTION_ARGS)
#define SET_LOCKTAG_INT32(tag, key1, key2) \
SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, key1, key2, 2)
static void
PreventAdvisoryLocksInParallelMode(void)
{
if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot use advisory locks during a parallel operation")));
}
/*
* pg_advisory_lock(int8) - acquire exclusive lock on an int8 key
*/
......@@ -420,6 +430,7 @@ pg_advisory_lock_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
(void) LockAcquire(&tag, ExclusiveLock, true, false);
......@@ -437,6 +448,7 @@ pg_advisory_xact_lock_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
(void) LockAcquire(&tag, ExclusiveLock, false, false);
......@@ -453,6 +465,7 @@ pg_advisory_lock_shared_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
(void) LockAcquire(&tag, ShareLock, true, false);
......@@ -470,6 +483,7 @@ pg_advisory_xact_lock_shared_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
(void) LockAcquire(&tag, ShareLock, false, false);
......@@ -489,6 +503,7 @@ pg_try_advisory_lock_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
res = LockAcquire(&tag, ExclusiveLock, true, true);
......@@ -509,6 +524,7 @@ pg_try_advisory_xact_lock_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
res = LockAcquire(&tag, ExclusiveLock, false, true);
......@@ -528,6 +544,7 @@ pg_try_advisory_lock_shared_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
res = LockAcquire(&tag, ShareLock, true, true);
......@@ -548,6 +565,7 @@ pg_try_advisory_xact_lock_shared_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
res = LockAcquire(&tag, ShareLock, false, true);
......@@ -567,6 +585,7 @@ pg_advisory_unlock_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
bool res;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
res = LockRelease(&tag, ExclusiveLock, true);
......@@ -586,6 +605,7 @@ pg_advisory_unlock_shared_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
bool res;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
res = LockRelease(&tag, ShareLock, true);
......@@ -603,6 +623,7 @@ pg_advisory_lock_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
(void) LockAcquire(&tag, ExclusiveLock, true, false);
......@@ -621,6 +642,7 @@ pg_advisory_xact_lock_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
(void) LockAcquire(&tag, ExclusiveLock, false, false);
......@@ -638,6 +660,7 @@ pg_advisory_lock_shared_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
(void) LockAcquire(&tag, ShareLock, true, false);
......@@ -656,6 +679,7 @@ pg_advisory_xact_lock_shared_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
(void) LockAcquire(&tag, ShareLock, false, false);
......@@ -676,6 +700,7 @@ pg_try_advisory_lock_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockAcquire(&tag, ExclusiveLock, true, true);
......@@ -697,6 +722,7 @@ pg_try_advisory_xact_lock_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockAcquire(&tag, ExclusiveLock, false, true);
......@@ -717,6 +743,7 @@ pg_try_advisory_lock_shared_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockAcquire(&tag, ShareLock, true, true);
......@@ -738,6 +765,7 @@ pg_try_advisory_xact_lock_shared_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockAcquire(&tag, ShareLock, false, true);
......@@ -758,6 +786,7 @@ pg_advisory_unlock_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
bool res;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockRelease(&tag, ExclusiveLock, true);
......@@ -778,6 +807,7 @@ pg_advisory_unlock_shared_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
bool res;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockRelease(&tag, ShareLock, true);
......
......@@ -23,6 +23,7 @@
#endif
#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "storage/shmem.h"
#include "utils/dynamic_loader.h"
#include "utils/hsearch.h"
......@@ -692,3 +693,56 @@ find_rendezvous_variable(const char *varName)
return &hentry->varValue;
}
/*
* Estimate the amount of space needed to serialize the list of libraries
* we have loaded.
*/
Size
EstimateLibraryStateSpace(void)
{
DynamicFileList *file_scanner;
Size size = 1;
for (file_scanner = file_list;
file_scanner != NULL;
file_scanner = file_scanner->next)
size = add_size(size, strlen(file_scanner->filename) + 1);
return size;
}
/*
* Serialize the list of libraries we have loaded to a chunk of memory.
*/
void
SerializeLibraryState(Size maxsize, char *start_address)
{
DynamicFileList *file_scanner;
for (file_scanner = file_list;
file_scanner != NULL;
file_scanner = file_scanner->next)
{
Size len;
len = strlcpy(start_address, file_scanner->filename, maxsize) + 1;
Assert(len < maxsize);
maxsize -= len;
start_address += len;
}
start_address[0] = '\0';
}
/*
* Load every library the serializing backend had loaded.
*/
void
RestoreLibraryState(char *start_address)
{
while (*start_address != '\0')
{
internal_load_library(start_address);
start_address += strlen(start_address) + 1;
}
}
......@@ -5665,6 +5665,20 @@ set_config_option(const char *name, const char *value,
elevel = ERROR;
}
/*
* GUC_ACTION_SAVE changes are acceptable during a parallel operation,
* because the current worker will also pop the change. We're probably
* dealing with a function having a proconfig entry. Only the function's
* body should observe the change, and peer workers do not share in the
* execution of a function call started by this worker.
*
* Other changes might need to affect other workers, so forbid them.
*/
if (IsInParallelMode() && changeVal && action != GUC_ACTION_SAVE)
ereport(elevel,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot set parameters during a parallel operation")));
record = find_option(name, true, elevel);
if (record == NULL)
{
......@@ -6969,6 +6983,15 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel)
{
GucAction action = stmt->is_local ? GUC_ACTION_LOCAL : GUC_ACTION_SET;
/*
* Workers synchronize these parameters at the start of the parallel
* operation; then, we block SET during the operation.
*/
if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot set parameters during a parallel operation")));
switch (stmt->kind)
{
case VAR_SET_VALUE:
......
......@@ -44,6 +44,7 @@
#include "miscadmin.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "storage/shmem.h"
#include "utils/combocid.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
......@@ -286,3 +287,76 @@ GetRealCmax(CommandId combocid)
Assert(combocid < usedComboCids);
return comboCids[combocid].cmax;
}
/*
* Estimate the amount of space required to serialize the current ComboCID
* state.
*/
Size
EstimateComboCIDStateSpace(void)
{
Size size;
/* Add space required for saving usedComboCids */
size = sizeof(int);
/* Add space required for saving the combocids key */
size = add_size(size, mul_size(sizeof(ComboCidKeyData), usedComboCids));
return size;
}
/*
* Serialize the ComboCID state into the memory, beginning at start_address.
* maxsize should be at least as large as the value returned by
* EstimateComboCIDStateSpace.
*/
void
SerializeComboCIDState(Size maxsize, char *start_address)
{
char *endptr;
/* First, we store the number of currently-existing ComboCIDs. */
* (int *) start_address = usedComboCids;
/* If maxsize is too small, throw an error. */
endptr = start_address + sizeof(int) +
(sizeof(ComboCidKeyData) * usedComboCids);
if (endptr < start_address || endptr > start_address + maxsize)
elog(ERROR, "not enough space to serialize ComboCID state");
/* Now, copy the actual cmin/cmax pairs. */
memcpy(start_address + sizeof(int), comboCids,
(sizeof(ComboCidKeyData) * usedComboCids));
}
/*
* Read the ComboCID state at the specified address and initialize this
* backend with the same ComboCIDs. This is only valid in a backend that
* currently has no ComboCIDs (and only makes sense if the transaction state
* is serialized and restored as well).
*/
void
RestoreComboCIDState(char *comboCIDstate)
{
int num_elements;
ComboCidKeyData *keydata;
int i;
CommandId cid;
Assert(!comboCids && !comboHash);
/* First, we retrieve the number of ComboCIDs that were serialized. */
num_elements = * (int *) comboCIDstate;
keydata = (ComboCidKeyData *) (comboCIDstate + sizeof(int));
/* Use GetComboCommandId to restore each ComboCID. */
for (i = 0; i < num_elements; i++)
{
cid = GetComboCommandId(keydata[i].cmin, keydata[i].cmax);
/* Verify that we got the expected answer. */
if (cid != i)
elog(ERROR, "unexpected command ID while restoring combo CIDs");
}
}
......@@ -157,6 +157,22 @@ static Snapshot CopySnapshot(Snapshot snapshot);
static void FreeSnapshot(Snapshot snapshot);
static void SnapshotResetXmin(void);
/*
* Snapshot fields to be serialized.
*
* Only these fields need to be sent to the cooperating backend; the
* remaining ones can (and must) set by the receiver upon restore.
*/
typedef struct SerializedSnapshotData
{
TransactionId xmin;
TransactionId xmax;
uint32 xcnt;
int32 subxcnt;
bool suboverflowed;
bool takenDuringRecovery;
CommandId curcid;
} SerializedSnapshotData;
/*
* GetTransactionSnapshot
......@@ -188,6 +204,10 @@ GetTransactionSnapshot(void)
Assert(pairingheap_is_empty(&RegisteredSnapshots));
Assert(FirstXactSnapshot == NULL);
if (IsInParallelMode())
elog(ERROR,
"cannot take query snapshot during a parallel operation");
/*
* In transaction-snapshot mode, the first snapshot must live until
* end of xact regardless of what the caller does with it, so we must
......@@ -238,6 +258,14 @@ GetTransactionSnapshot(void)
Snapshot
GetLatestSnapshot(void)
{
/*
* We might be able to relax this, but nothing that could otherwise work
* needs it.
*/
if (IsInParallelMode())
elog(ERROR,
"cannot update SecondarySnapshot during a parallel operation");
/*
* So far there are no cases requiring support for GetLatestSnapshot()
* during logical decoding, but it wouldn't be hard to add if required.
......@@ -347,7 +375,8 @@ SnapshotSetCommandId(CommandId curcid)
* in GetTransactionSnapshot.
*/
static void
SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
PGPROC *sourceproc)
{
/* Caller should have checked this already */
Assert(!FirstSnapshotSet);
......@@ -394,7 +423,15 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
* doesn't seem worth contorting the logic here to avoid two calls,
* especially since it's not clear that predicate.c *must* do this.
*/
if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
if (sourceproc != NULL)
{
if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not import the requested snapshot"),
errdetail("The source transaction is not running anymore.")));
}
else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not import the requested snapshot"),
......@@ -550,11 +587,24 @@ PushCopiedSnapshot(Snapshot snapshot)
void
UpdateActiveSnapshotCommandId(void)
{
CommandId save_curcid, curcid;
Assert(ActiveSnapshot != NULL);
Assert(ActiveSnapshot->as_snap->active_count == 1);
Assert(ActiveSnapshot->as_snap->regd_count == 0);
ActiveSnapshot->as_snap->curcid = GetCurrentCommandId(false);
/*
* Don't allow modification of the active snapshot during parallel
* operation. We share the snapshot to worker backends at beginning of
* parallel operation, so any change to snapshot can lead to
* inconsistencies. We have other defenses against
* CommandCounterIncrement, but there are a few places that call this
* directly, so we put an additional guard here.
*/
save_curcid = ActiveSnapshot->as_snap->curcid;
curcid = GetCurrentCommandId(false);
if (IsInParallelMode() && save_curcid != curcid)
elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
ActiveSnapshot->as_snap->curcid = curcid;
}
/*
......@@ -1289,7 +1339,7 @@ ImportSnapshot(const char *idstr)
errmsg("cannot import a snapshot from a different database")));
/* OK, install the snapshot */
SetTransactionSnapshot(&snapshot, src_xid);
SetTransactionSnapshot(&snapshot, src_xid, NULL);
}
/*
......@@ -1393,3 +1443,155 @@ HistoricSnapshotGetTupleCids(void)
Assert(HistoricSnapshotActive());
return tuplecid_data;
}
/*
* EstimateSnapshotSpace
* Returns the size need to store the given snapshot.
*
* We are exporting only required fields from the Snapshot, stored in
* SerializedSnapshotData.
*/
Size
EstimateSnapshotSpace(Snapshot snap)
{
Size size;
Assert(snap != InvalidSnapshot);
Assert(snap->satisfies == HeapTupleSatisfiesMVCC);
/* We allocate any XID arrays needed in the same palloc block. */
size = add_size(sizeof(SerializedSnapshotData),
mul_size(snap->xcnt, sizeof(TransactionId)));
if (snap->subxcnt > 0 &&
(!snap->suboverflowed || snap->takenDuringRecovery))
size = add_size(size,
mul_size(snap->subxcnt, sizeof(TransactionId)));
return size;
}
/*
* SerializeSnapshot
* Dumps the serialized snapshot (extracted from given snapshot) onto the
* memory location at start_address.
*/
void
SerializeSnapshot(Snapshot snapshot, char *start_address)
{
SerializedSnapshotData *serialized_snapshot;
Assert(snapshot->xcnt >= 0);
Assert(snapshot->subxcnt >= 0);
serialized_snapshot = (SerializedSnapshotData *) start_address;
/* Copy all required fields */
serialized_snapshot->xmin = snapshot->xmin;
serialized_snapshot->xmax = snapshot->xmax;
serialized_snapshot->xcnt = snapshot->xcnt;
serialized_snapshot->subxcnt = snapshot->subxcnt;
serialized_snapshot->suboverflowed = snapshot->suboverflowed;
serialized_snapshot->takenDuringRecovery = snapshot->takenDuringRecovery;
serialized_snapshot->curcid = snapshot->curcid;
/*
* Ignore the SubXID array if it has overflowed, unless the snapshot
* was taken during recovey - in that case, top-level XIDs are in subxip
* as well, and we mustn't lose them.
*/
if (serialized_snapshot->suboverflowed && !snapshot->takenDuringRecovery)
serialized_snapshot->subxcnt = 0;
/* Copy XID array */
if (snapshot->xcnt > 0)
memcpy((TransactionId *) (serialized_snapshot + 1),
snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
/*
* Copy SubXID array. Don't bother to copy it if it had overflowed,
* though, because it's not used anywhere in that case. Except if it's a
* snapshot taken during recovery; all the top-level XIDs are in subxip as
* well in that case, so we mustn't lose them.
*/
if (snapshot->subxcnt > 0)
{
Size subxipoff = sizeof(SerializedSnapshotData) +
snapshot->xcnt * sizeof(TransactionId);
memcpy((TransactionId *) ((char *) serialized_snapshot + subxipoff),
snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
}
}
/*
* RestoreSnapshot
* Restore a serialized snapshot from the specified address.
*
* The copy is palloc'd in TopTransactionContext and has initial refcounts set
* to 0. The returned snapshot has the copied flag set.
*/
Snapshot
RestoreSnapshot(char *start_address)
{
SerializedSnapshotData *serialized_snapshot;
Size size;
Snapshot snapshot;
TransactionId *serialized_xids;
serialized_snapshot = (SerializedSnapshotData *) start_address;
serialized_xids = (TransactionId *)
(start_address + sizeof(SerializedSnapshotData));
/* We allocate any XID arrays needed in the same palloc block. */
size = sizeof(SnapshotData)
+ serialized_snapshot->xcnt * sizeof(TransactionId)
+ serialized_snapshot->subxcnt * sizeof(TransactionId);
/* Copy all required fields */
snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
snapshot->satisfies = HeapTupleSatisfiesMVCC;
snapshot->xmin = serialized_snapshot->xmin;
snapshot->xmax = serialized_snapshot->xmax;
snapshot->xip = NULL;
snapshot->xcnt = serialized_snapshot->xcnt;
snapshot->subxip = NULL;
snapshot->subxcnt = serialized_snapshot->subxcnt;
snapshot->suboverflowed = serialized_snapshot->suboverflowed;
snapshot->takenDuringRecovery = serialized_snapshot->takenDuringRecovery;
snapshot->curcid = serialized_snapshot->curcid;
/* Copy XIDs, if present. */
if (serialized_snapshot->xcnt > 0)
{
snapshot->xip = (TransactionId *) (snapshot + 1);
memcpy(snapshot->xip, serialized_xids,
serialized_snapshot->xcnt * sizeof(TransactionId));
}
/* Copy SubXIDs, if present. */
if (serialized_snapshot->subxcnt > 0)
{
snapshot->subxip = snapshot->xip + serialized_snapshot->xcnt;
memcpy(snapshot->subxip, serialized_xids + serialized_snapshot->xcnt,
serialized_snapshot->subxcnt * sizeof(TransactionId));
}
/* Set the copied flag so that the caller will set refcounts correctly. */
snapshot->regd_count = 0;
snapshot->active_count = 0;
snapshot->copied = true;
return snapshot;
}
/*
* Install a restored snapshot as the transaction snapshot.
*
* The second argument is of type void * so that snapmgr.h need not include
* the declaration for PGPROC.
*/
void
RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
{
SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc);
}
/*-------------------------------------------------------------------------
*
* parallel.h
* Infrastructure for launching parallel workers
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/access/parallel.h
*
*-------------------------------------------------------------------------
*/
#ifndef PARALLEL_H
#define PARALLEL_H
#include "access/xlogdefs.h"
#include "lib/ilist.h"
#include "postmaster/bgworker.h"
#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
#include "utils/elog.h"
typedef void (*parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc);
typedef struct ParallelWorkerInfo
{
BackgroundWorkerHandle *bgwhandle;
shm_mq_handle *error_mqh;
int32 pid;
} ParallelWorkerInfo;
typedef struct ParallelContext
{
dlist_node node;
SubTransactionId subid;
int nworkers;
parallel_worker_main_type entrypoint;
char *library_name;
char *function_name;
ErrorContextCallback *error_context_stack;
shm_toc_estimator estimator;
dsm_segment *seg;
void *private;
shm_toc *toc;
ParallelWorkerInfo *worker;
} ParallelContext;
extern bool ParallelMessagePending;
extern int ParallelWorkerNumber;
#define IsParallelWorker() (ParallelWorkerNumber >= 0)
extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers);
extern void InitializeParallelDSM(ParallelContext *);
extern void LaunchParallelWorkers(ParallelContext *);
extern void WaitForParallelWorkersToFinish(ParallelContext *);
extern void DestroyParallelContext(ParallelContext *);
extern bool ParallelContextActive(void);
extern void HandleParallelMessageInterrupt(void);
extern void HandleParallelMessages(void);
extern void AtEOXact_Parallel(bool isCommit);
extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId);
extern void ParallelWorkerReportLastRecEnd(XLogRecPtr);
#endif /* PARALLEL_H */
......@@ -78,9 +78,12 @@ extern bool MyXactAccessedTempRel;
typedef enum
{
XACT_EVENT_COMMIT,
XACT_EVENT_PARALLEL_COMMIT,
XACT_EVENT_ABORT,
XACT_EVENT_PARALLEL_ABORT,
XACT_EVENT_PREPARE,
XACT_EVENT_PRE_COMMIT,
XACT_EVENT_PARALLEL_PRE_COMMIT,
XACT_EVENT_PRE_PREPARE
} XactEvent;
......@@ -332,6 +335,10 @@ extern void BeginInternalSubTransaction(char *name);
extern void ReleaseCurrentSubTransaction(void);
extern void RollbackAndReleaseCurrentSubTransaction(void);
extern bool IsSubTransaction(void);
extern Size EstimateTransactionStateSpace(void);
extern void SerializeTransactionState(Size maxsize, char *start_address);
extern void StartParallelWorkerTransaction(char *tstatespace);
extern void EndParallelWorkerTransaction(void);
extern bool IsTransactionBlock(void);
extern bool IsTransactionOrTransactionBlock(void);
extern char TransactionBlockStatusCode(void);
......@@ -368,4 +375,8 @@ extern const char *xact_identify(uint8 info);
extern void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed);
extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed);
extern void EnterParallelMode(void);
extern void ExitParallelMode(void);
extern bool IsInParallelMode(void);
#endif /* XACT_H */
......@@ -140,7 +140,7 @@ extern Oid FindDefaultConversionProc(int32 for_encoding, int32 to_encoding);
/* initialization & transaction cleanup code */
extern void InitializeSearchPath(void);
extern void AtEOXact_Namespace(bool isCommit);
extern void AtEOXact_Namespace(bool isCommit, bool parallel);
extern void AtEOSubXact_Namespace(bool isCommit, SubTransactionId mySubid,
SubTransactionId parentSubid);
......
......@@ -642,6 +642,9 @@ extern PGFunction load_external_function(char *filename, char *funcname,
extern PGFunction lookup_external_function(void *filehandle, char *funcname);
extern void load_file(const char *filename, bool restricted);
extern void **find_rendezvous_variable(const char *varName);
extern Size EstimateLibraryStateSpace(void);
extern void SerializeLibraryState(Size maxsize, char *start_address);
extern void RestoreLibraryState(char *start_address);
/*
* Support for aggregate functions
......
......@@ -17,6 +17,7 @@
#include "storage/shm_mq.h"
extern void pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *);
extern void pq_set_parallel_master(pid_t pid, BackendId backend_id);
extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
......
......@@ -271,6 +271,7 @@ extern void check_stack_depth(void);
/* in tcop/utility.c */
extern void PreventCommandIfReadOnly(const char *cmdname);
extern void PreventCommandIfParallelMode(const char *cmdname);
extern void PreventCommandDuringRecovery(const char *cmdname);
/* in utils/misc/guc.c */
......
......@@ -112,6 +112,8 @@ extern BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle,
extern BgwHandleStatus
WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *
handle, pid_t *pid);
extern BgwHandleStatus
WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *);
/* Terminate a bgworker */
extern void TerminateBackgroundWorker(BackgroundWorkerHandle *handle);
......
......@@ -46,6 +46,7 @@ extern Snapshot GetSnapshotData(Snapshot snapshot);
extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
TransactionId sourcexid);
extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
extern RunningTransactions GetRunningTransactionData(void);
......
......@@ -31,6 +31,7 @@ typedef enum
{
PROCSIG_CATCHUP_INTERRUPT, /* sinval catchup interrupt */
PROCSIG_NOTIFY_INTERRUPT, /* listen/notify interrupt */
PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */
/* Recovery conflict reasons */
PROCSIG_RECOVERY_CONFLICT_DATABASE,
......
......@@ -21,5 +21,8 @@
*/
extern void AtEOXact_ComboCid(void);
extern void RestoreComboCIDState(char *comboCIDstate);
extern void SerializeComboCIDState(Size maxsize, char *start_address);
extern Size EstimateComboCIDStateSpace(void);
#endif /* COMBOCID_H */
......@@ -64,4 +64,9 @@ extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids)
extern void TeardownHistoricSnapshot(bool is_error);
extern bool HistoricSnapshotActive(void);
extern Size EstimateSnapshotSpace(Snapshot snapshot);
extern void SerializeSnapshot(Snapshot snapshot, char *start_address);
extern Snapshot RestoreSnapshot(char *start_address);
extern void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc);
#endif /* SNAPMGR_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment