Commit 8ae4ef4f authored by Amit Kapila's avatar Amit Kapila

Remove incorrect assertion in reorderbuffer.c.

We start recording changes in ReorderBufferTXN even before we reach
SNAPBUILD_CONSISTENT state so that if the commit is encountered after
reaching that we should be able to send the changes of the entire transaction.
Now, while recording changes if the reorder buffer memory has exceeded
logical_decoding_work_mem then we can start streaming if it is allowed and
we haven't yet streamed that data. However, we must not allow streaming to
start unless the snapshot has reached SNAPBUILD_CONSISTENT state.

In passing, improve the comments atop ReorderBufferResetTXN to mention the
case when we need to continue streaming after getting an error.

Author: Amit Kapila
Reviewed-by: Dilip Kumar
Discussion: https://postgr.es/m/CAA4eK1KoOH0byboyYY40NBcC7Fe812trwTa+WY3jQF7WQWZbQg@mail.gmail.com
parent bd94a9c0
...@@ -1891,6 +1891,8 @@ ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -1891,6 +1891,8 @@ ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn,
* Helper function for ReorderBufferProcessTXN to handle the concurrent * Helper function for ReorderBufferProcessTXN to handle the concurrent
* abort of the streaming transaction. This resets the TXN such that it * abort of the streaming transaction. This resets the TXN such that it
* can be used to stream the remaining data of transaction being processed. * can be used to stream the remaining data of transaction being processed.
* This can happen when the subtransaction is aborted and we still want to
* continue processing the main or other subtransactions data.
*/ */
static void static void
ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
...@@ -3461,6 +3463,10 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb) ...@@ -3461,6 +3463,10 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
LogicalDecodingContext *ctx = rb->private_data; LogicalDecodingContext *ctx = rb->private_data;
SnapBuild *builder = ctx->snapshot_builder; SnapBuild *builder = ctx->snapshot_builder;
/* We can't start streaming unless a consistent state is reached. */
if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
return false;
/* /*
* We can't start streaming immediately even if the streaming is enabled * We can't start streaming immediately even if the streaming is enabled
* because we previously decoded this transaction and now just are * because we previously decoded this transaction and now just are
...@@ -3468,11 +3474,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb) ...@@ -3468,11 +3474,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
*/ */
if (ReorderBufferCanStream(rb) && if (ReorderBufferCanStream(rb) &&
!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr)) !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
{
/* We must have a consistent snapshot by this time */
Assert(SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT);
return true; return true;
}
return false; return false;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment