Commit 423ec087 authored by Robert Haas's avatar Robert Haas

Transfer current command counter ID to parallel workers.

Commit 924bcf4f correctly forbade
parallel workers to modify the command counter while in parallel mode,
but it inexplicably neglected to actually transfer the current command
counter from leader to workers.  This can result in the workers seeing
a different set of tuples from the leader, which is bad.  Repair.
parent 2ad5c27b
...@@ -4786,8 +4786,8 @@ Size ...@@ -4786,8 +4786,8 @@ Size
EstimateTransactionStateSpace(void) EstimateTransactionStateSpace(void)
{ {
TransactionState s; TransactionState s;
Size nxids = 5; /* iso level, deferrable, top & current XID, Size nxids = 6; /* iso level, deferrable, top & current XID,
* XID count */ * command counter, XID count */
for (s = CurrentTransactionState; s != NULL; s = s->parent) for (s = CurrentTransactionState; s != NULL; s = s->parent)
{ {
...@@ -4807,12 +4807,13 @@ EstimateTransactionStateSpace(void) ...@@ -4807,12 +4807,13 @@ EstimateTransactionStateSpace(void)
* *
* We need to save and restore XactDeferrable, XactIsoLevel, and the XIDs * We need to save and restore XactDeferrable, XactIsoLevel, and the XIDs
* associated with this transaction. The first eight bytes of the result * associated with this transaction. The first eight bytes of the result
* contain XactDeferrable and XactIsoLevel; the next eight bytes contain the * contain XactDeferrable and XactIsoLevel; the next twelve bytes contain the
* XID of the top-level transaction and the XID of the current transaction * XID of the top-level transaction, the XID of the current transaction
* (or, in each case, InvalidTransactionId if none). After that, the next 4 * (or, in each case, InvalidTransactionId if none), and the current command
* bytes contain a count of how many additional XIDs follow; this is followed * counter. After that, the next 4 bytes contain a count of how many
* by all of those XIDs one after another. We emit the XIDs in sorted order * additional XIDs follow; this is followed by all of those XIDs one after
* for the convenience of the receiving process. * another. We emit the XIDs in sorted order for the convenience of the
* receiving process.
*/ */
void void
SerializeTransactionState(Size maxsize, char *start_address) SerializeTransactionState(Size maxsize, char *start_address)
...@@ -4820,14 +4821,16 @@ SerializeTransactionState(Size maxsize, char *start_address) ...@@ -4820,14 +4821,16 @@ SerializeTransactionState(Size maxsize, char *start_address)
TransactionState s; TransactionState s;
Size nxids = 0; Size nxids = 0;
Size i = 0; Size i = 0;
Size c = 0;
TransactionId *workspace; TransactionId *workspace;
TransactionId *result = (TransactionId *) start_address; TransactionId *result = (TransactionId *) start_address;
Assert(maxsize >= 5 * sizeof(TransactionId)); result[c++] = (TransactionId) XactIsoLevel;
result[0] = (TransactionId) XactIsoLevel; result[c++] = (TransactionId) XactDeferrable;
result[1] = (TransactionId) XactDeferrable; result[c++] = XactTopTransactionId;
result[2] = XactTopTransactionId; result[c++] = CurrentTransactionState->transactionId;
result[3] = CurrentTransactionState->transactionId; result[c++] = (TransactionId) currentCommandId;
Assert(maxsize >= c * sizeof(TransactionId));
/* /*
* If we're running in a parallel worker and launching a parallel worker * If we're running in a parallel worker and launching a parallel worker
...@@ -4836,9 +4839,9 @@ SerializeTransactionState(Size maxsize, char *start_address) ...@@ -4836,9 +4839,9 @@ SerializeTransactionState(Size maxsize, char *start_address)
*/ */
if (nParallelCurrentXids > 0) if (nParallelCurrentXids > 0)
{ {
Assert(maxsize > (nParallelCurrentXids + 4) * sizeof(TransactionId)); result[c++] = nParallelCurrentXids;
result[4] = nParallelCurrentXids; Assert(maxsize >= (nParallelCurrentXids + c) * sizeof(TransactionId));
memcpy(&result[5], ParallelCurrentXids, memcpy(&result[c], ParallelCurrentXids,
nParallelCurrentXids * sizeof(TransactionId)); nParallelCurrentXids * sizeof(TransactionId));
return; return;
} }
...@@ -4853,7 +4856,7 @@ SerializeTransactionState(Size maxsize, char *start_address) ...@@ -4853,7 +4856,7 @@ SerializeTransactionState(Size maxsize, char *start_address)
nxids = add_size(nxids, 1); nxids = add_size(nxids, 1);
nxids = add_size(nxids, s->nChildXids); nxids = add_size(nxids, s->nChildXids);
} }
Assert(nxids * sizeof(TransactionId) < maxsize); Assert((c + 1 + nxids) * sizeof(TransactionId) <= maxsize);
/* Copy them to our scratch space. */ /* Copy them to our scratch space. */
workspace = palloc(nxids * sizeof(TransactionId)); workspace = palloc(nxids * sizeof(TransactionId));
...@@ -4871,8 +4874,8 @@ SerializeTransactionState(Size maxsize, char *start_address) ...@@ -4871,8 +4874,8 @@ SerializeTransactionState(Size maxsize, char *start_address)
qsort(workspace, nxids, sizeof(TransactionId), xidComparator); qsort(workspace, nxids, sizeof(TransactionId), xidComparator);
/* Copy data into output area. */ /* Copy data into output area. */
result[4] = (TransactionId) nxids; result[c++] = (TransactionId) nxids;
memcpy(&result[5], workspace, nxids * sizeof(TransactionId)); memcpy(&result[c], workspace, nxids * sizeof(TransactionId));
} }
/* /*
...@@ -4892,8 +4895,9 @@ StartParallelWorkerTransaction(char *tstatespace) ...@@ -4892,8 +4895,9 @@ StartParallelWorkerTransaction(char *tstatespace)
XactDeferrable = (bool) tstate[1]; XactDeferrable = (bool) tstate[1];
XactTopTransactionId = tstate[2]; XactTopTransactionId = tstate[2];
CurrentTransactionState->transactionId = tstate[3]; CurrentTransactionState->transactionId = tstate[3];
nParallelCurrentXids = (int) tstate[4]; currentCommandId = tstate[4];
ParallelCurrentXids = &tstate[5]; nParallelCurrentXids = (int) tstate[5];
ParallelCurrentXids = &tstate[6];
CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS; CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment