Commit 07ee62ce authored by Tom Lane's avatar Tom Lane

Propagate xactStartTimestamp and stmtStartTimestamp to parallel workers.

Previously, a worker process would establish values for these based on
its own start time.  In v10 and up, this can trivially be shown to cause
misbehavior of transaction_timestamp(), timestamp_in(), and related
functions which are (perhaps unwisely?) marked parallel-safe.  It seems
likely that other behaviors might diverge from what happens in the parent
as well.

It's not as trivial to demonstrate problems in 9.6 or 9.5, but I'm sure
it's still possible, so back-patch to all branches containing parallel
worker infrastructure.

In HEAD only, mark now() and statement_timestamp() as parallel-safe
(other affected functions already were).  While in theory we could
still squeeze that change into v11, it doesn't seem important enough
to force a last-minute catversion bump.

Konstantin Knizhnik, whacked around a bit by me

Discussion: https://postgr.es/m/6406dbd2-5d37-4cb6-6eb2-9c44172c7e7c@postgrespro.ru
parent e954a727
...@@ -87,6 +87,8 @@ typedef struct FixedParallelState ...@@ -87,6 +87,8 @@ typedef struct FixedParallelState
PGPROC *parallel_master_pgproc; PGPROC *parallel_master_pgproc;
pid_t parallel_master_pid; pid_t parallel_master_pid;
BackendId parallel_master_backend_id; BackendId parallel_master_backend_id;
TimestampTz xact_ts;
TimestampTz stmt_ts;
/* Mutex protects remaining fields. */ /* Mutex protects remaining fields. */
slock_t mutex; slock_t mutex;
...@@ -318,6 +320,8 @@ InitializeParallelDSM(ParallelContext *pcxt) ...@@ -318,6 +320,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
fps->parallel_master_pgproc = MyProc; fps->parallel_master_pgproc = MyProc;
fps->parallel_master_pid = MyProcPid; fps->parallel_master_pid = MyProcPid;
fps->parallel_master_backend_id = MyBackendId; fps->parallel_master_backend_id = MyBackendId;
fps->xact_ts = GetCurrentTransactionStartTimestamp();
fps->stmt_ts = GetCurrentStatementStartTimestamp();
SpinLockInit(&fps->mutex); SpinLockInit(&fps->mutex);
fps->last_xlog_end = 0; fps->last_xlog_end = 0;
shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
...@@ -1311,6 +1315,13 @@ ParallelWorkerMain(Datum main_arg) ...@@ -1311,6 +1315,13 @@ ParallelWorkerMain(Datum main_arg)
fps->parallel_master_pid)) fps->parallel_master_pid))
return; return;
/*
* Restore transaction and statement start-time timestamps. This must
* happen before anything that would start a transaction, else asserts in
* xact.c will fire.
*/
SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
/* /*
* Identify the entry point to be called. In theory this could result in * Identify the entry point to be called. In theory this could result in
* loading an additional library, though most likely the entry point is in * loading an additional library, though most likely the entry point is in
......
...@@ -674,6 +674,22 @@ GetCurrentCommandId(bool used) ...@@ -674,6 +674,22 @@ GetCurrentCommandId(bool used)
return currentCommandId; return currentCommandId;
} }
/*
* SetParallelStartTimestamps
*
* In a parallel worker, we should inherit the parent transaction's
* timestamps rather than setting our own. The parallel worker
* infrastructure must call this to provide those values before
* calling StartTransaction() or SetCurrentStatementStartTimestamp().
*/
void
SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts)
{
Assert(IsParallelWorker());
xactStartTimestamp = xact_ts;
stmtStartTimestamp = stmt_ts;
}
/* /*
* GetCurrentTransactionStartTimestamp * GetCurrentTransactionStartTimestamp
*/ */
...@@ -708,11 +724,17 @@ GetCurrentTransactionStopTimestamp(void) ...@@ -708,11 +724,17 @@ GetCurrentTransactionStopTimestamp(void)
/* /*
* SetCurrentStatementStartTimestamp * SetCurrentStatementStartTimestamp
*
* In a parallel worker, this should already have been provided by a call
* to SetParallelStartTimestamps().
*/ */
void void
SetCurrentStatementStartTimestamp(void) SetCurrentStatementStartTimestamp(void)
{ {
if (!IsParallelWorker())
stmtStartTimestamp = GetCurrentTimestamp(); stmtStartTimestamp = GetCurrentTimestamp();
else
Assert(stmtStartTimestamp != 0);
} }
/* /*
...@@ -1867,10 +1889,16 @@ StartTransaction(void) ...@@ -1867,10 +1889,16 @@ StartTransaction(void)
/* /*
* set transaction_timestamp() (a/k/a now()). We want this to be the same * set transaction_timestamp() (a/k/a now()). We want this to be the same
* as the first command's statement_timestamp(), so don't do a fresh * as the first command's statement_timestamp(), so don't do a fresh
* GetCurrentTimestamp() call (which'd be expensive anyway). Also, mark * GetCurrentTimestamp() call (which'd be expensive anyway). In a
* xactStopTimestamp as unset. * parallel worker, this should already have been provided by a call to
* SetParallelStartTimestamps().
*
* Also, mark xactStopTimestamp as unset.
*/ */
if (!IsParallelWorker())
xactStartTimestamp = stmtStartTimestamp; xactStartTimestamp = stmtStartTimestamp;
else
Assert(xactStartTimestamp != 0);
xactStopTimestamp = 0; xactStopTimestamp = 0;
pgstat_report_xact_timestamp(xactStartTimestamp); pgstat_report_xact_timestamp(xactStartTimestamp);
......
...@@ -359,6 +359,7 @@ extern SubTransactionId GetCurrentSubTransactionId(void); ...@@ -359,6 +359,7 @@ extern SubTransactionId GetCurrentSubTransactionId(void);
extern void MarkCurrentTransactionIdLoggedIfAny(void); extern void MarkCurrentTransactionIdLoggedIfAny(void);
extern bool SubTransactionIsActive(SubTransactionId subxid); extern bool SubTransactionIsActive(SubTransactionId subxid);
extern CommandId GetCurrentCommandId(bool used); extern CommandId GetCurrentCommandId(bool used);
extern void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts);
extern TimestampTz GetCurrentTransactionStartTimestamp(void); extern TimestampTz GetCurrentTransactionStartTimestamp(void);
extern TimestampTz GetCurrentStatementStartTimestamp(void); extern TimestampTz GetCurrentStatementStartTimestamp(void);
extern TimestampTz GetCurrentTransactionStopTimestamp(void); extern TimestampTz GetCurrentTransactionStopTimestamp(void);
......
...@@ -53,6 +53,6 @@ ...@@ -53,6 +53,6 @@
*/ */
/* yyyymmddN */ /* yyyymmddN */
#define CATALOG_VERSION_NO 201810051 #define CATALOG_VERSION_NO 201810061
#endif #endif
...@@ -2073,7 +2073,8 @@ ...@@ -2073,7 +2073,8 @@
{ oid => '1365', descr => 'make ACL item', { oid => '1365', descr => 'make ACL item',
proname => 'makeaclitem', prorettype => 'aclitem', proname => 'makeaclitem', prorettype => 'aclitem',
proargtypes => 'oid oid text bool', prosrc => 'makeaclitem' }, proargtypes => 'oid oid text bool', prosrc => 'makeaclitem' },
{ oid => '3943', descr => 'show hardwired default privileges, primarily for use by the information schema', { oid => '3943',
descr => 'show hardwired default privileges, primarily for use by the information schema',
proname => 'acldefault', prorettype => '_aclitem', proargtypes => 'char oid', proname => 'acldefault', prorettype => '_aclitem', proargtypes => 'char oid',
prosrc => 'acldefault_sql' }, prosrc => 'acldefault_sql' },
{ oid => '1689', { oid => '1689',
...@@ -2595,13 +2596,13 @@ ...@@ -2595,13 +2596,13 @@
proname => 'timetzdate_pl', prolang => '14', prorettype => 'timestamptz', proname => 'timetzdate_pl', prolang => '14', prorettype => 'timestamptz',
proargtypes => 'timetz date', prosrc => 'select ($2 + $1)' }, proargtypes => 'timetz date', prosrc => 'select ($2 + $1)' },
{ oid => '1299', descr => 'current transaction time', { oid => '1299', descr => 'current transaction time',
proname => 'now', provolatile => 's', proparallel => 'r', proname => 'now', provolatile => 's', prorettype => 'timestamptz',
prorettype => 'timestamptz', proargtypes => '', prosrc => 'now' }, proargtypes => '', prosrc => 'now' },
{ oid => '2647', descr => 'current transaction time', { oid => '2647', descr => 'current transaction time',
proname => 'transaction_timestamp', provolatile => 's', proname => 'transaction_timestamp', provolatile => 's',
prorettype => 'timestamptz', proargtypes => '', prosrc => 'now' }, prorettype => 'timestamptz', proargtypes => '', prosrc => 'now' },
{ oid => '2648', descr => 'current statement time', { oid => '2648', descr => 'current statement time',
proname => 'statement_timestamp', provolatile => 's', proparallel => 'r', proname => 'statement_timestamp', provolatile => 's',
prorettype => 'timestamptz', proargtypes => '', prorettype => 'timestamptz', proargtypes => '',
prosrc => 'statement_timestamp' }, prosrc => 'statement_timestamp' },
{ oid => '2649', descr => 'current clock time', { oid => '2649', descr => 'current clock time',
...@@ -10208,7 +10209,8 @@ ...@@ -10208,7 +10209,8 @@
proname => 'pg_ls_tmpdir', procost => '10', prorows => '20', proretset => 't', proname => 'pg_ls_tmpdir', procost => '10', prorows => '20', proretset => 't',
provolatile => 'v', prorettype => 'record', proargtypes => 'oid', provolatile => 'v', prorettype => 'record', proargtypes => 'oid',
proallargtypes => '{oid,text,int8,timestamptz}', proargmodes => '{i,o,o,o}', proallargtypes => '{oid,text,int8,timestamptz}', proargmodes => '{i,o,o,o}',
proargnames => '{tablespace,name,size,modification}', prosrc => 'pg_ls_tmpdir_1arg' }, proargnames => '{tablespace,name,size,modification}',
prosrc => 'pg_ls_tmpdir_1arg' },
# hash partitioning constraint function # hash partitioning constraint function
{ oid => '5028', descr => 'hash partition CHECK constraint', { oid => '5028', descr => 'hash partition CHECK constraint',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment