Commit 1cbc9480 authored by Andres Freund's avatar Andres Freund

Check interrupts during logical decoding more frequently.

When reading large amounts of preexisting WAL during logical decoding
using the SQL interface we possibly could fail to check interrupts in
due time. Similarly the same could happen on systems with a very high
WAL volume while creating a new logical replication slot, independent
of the used interface.

Previously these checks where only performed in xlogreader's read_page
callbacks, while waiting for new WAL to be produced. That's not
sufficient though, if there's never a need to wait.  Walsender's send
loop already contains a interrupt check.

Backpatch to 9.4 where the logical decoding feature was introduced.
parent 1c6821be
...@@ -451,11 +451,6 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) ...@@ -451,11 +451,6 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
XLogRecord *record; XLogRecord *record;
char *err = NULL; char *err = NULL;
/*
* If the caller requires that interrupts be checked, the read_page
* callback should do so, as those will often wait.
*/
/* the read_page callback waits for new WAL */ /* the read_page callback waits for new WAL */
record = XLogReadRecord(ctx->reader, startptr, &err); record = XLogReadRecord(ctx->reader, startptr, &err);
if (err) if (err)
...@@ -470,6 +465,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) ...@@ -470,6 +465,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
/* only continue till we found a consistent spot */ /* only continue till we found a consistent spot */
if (DecodingContextReady(ctx)) if (DecodingContextReady(ctx))
break; break;
CHECK_FOR_INTERRUPTS();
} }
ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr; ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
......
...@@ -438,6 +438,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ...@@ -438,6 +438,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
if (upto_nchanges != 0 && if (upto_nchanges != 0 &&
upto_nchanges <= p->returned_rows) upto_nchanges <= p->returned_rows)
break; break;
CHECK_FOR_INTERRUPTS();
} }
} }
PG_CATCH(); PG_CATCH();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment