Commit 66b84fa8 authored by Peter Eisentraut's avatar Peter Eisentraut

Receive invalidation messages correctly in tablesync worker

We didn't accept any invalidation messages until the whole sync process
had finished (because it flattens all the remote transactions in the
single one).  So the sync worker didn't learn about subscription
changes/drop until it has finished.  This could lead to "orphaned" sync
workers.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: default avatarMasahiko Sawada <sawada.mshk@gmail.com>
parent 3c9bc215
...@@ -118,7 +118,7 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); ...@@ -118,7 +118,7 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
static void store_flush_position(XLogRecPtr remote_lsn); static void store_flush_position(XLogRecPtr remote_lsn);
static void reread_subscription(void); static void maybe_reread_subscription(void);
/* Flags set by signal handlers */ /* Flags set by signal handlers */
static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGHUP = false;
...@@ -165,8 +165,7 @@ ensure_transaction(void) ...@@ -165,8 +165,7 @@ ensure_transaction(void)
StartTransactionCommand(); StartTransactionCommand();
if (!MySubscriptionValid) maybe_reread_subscription();
reread_subscription();
MemoryContextSwitchTo(ApplyMessageContext); MemoryContextSwitchTo(ApplyMessageContext);
return true; return true;
...@@ -463,6 +462,12 @@ apply_handle_commit(StringInfo s) ...@@ -463,6 +462,12 @@ apply_handle_commit(StringInfo s)
store_flush_position(commit_data.end_lsn); store_flush_position(commit_data.end_lsn);
} }
else
{
/* Process any invalidation messages that might have accumulated. */
AcceptInvalidationMessages();
maybe_reread_subscription();
}
in_remote_transaction = false; in_remote_transaction = false;
...@@ -1119,8 +1124,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -1119,8 +1124,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
* now. * now.
*/ */
AcceptInvalidationMessages(); AcceptInvalidationMessages();
if (!MySubscriptionValid) maybe_reread_subscription();
reread_subscription();
/* Process any table synchronization changes. */ /* Process any table synchronization changes. */
process_syncing_tables(last_received); process_syncing_tables(last_received);
...@@ -1302,17 +1306,20 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) ...@@ -1302,17 +1306,20 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
last_flushpos = flushpos; last_flushpos = flushpos;
} }
/* /*
* Reread subscription info and exit on change. * Reread subscription info if needed. Most changes will be exit.
*/ */
static void static void
reread_subscription(void) maybe_reread_subscription(void)
{ {
MemoryContext oldctx; MemoryContext oldctx;
Subscription *newsub; Subscription *newsub;
bool started_tx = false; bool started_tx = false;
/* When cache state is valid there is nothing to do here. */
if (MySubscriptionValid)
return;
/* This function might be called inside or outside of transaction. */ /* This function might be called inside or outside of transaction. */
if (!IsTransactionState()) if (!IsTransactionState())
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment