Commit bd56e741 authored by Simon Riggs's avatar Simon Riggs

Reset master xmin when hot_standby_feedback disabled.

If walsender has xmin of standby then ensure we
reset the value to 0 when we change from hot_standby_feedback=on
to hot_standby_feedback=off.
parent 62e66640
...@@ -1682,8 +1682,10 @@ The commands accepted in walsender mode are: ...@@ -1682,8 +1682,10 @@ The commands accepted in walsender mode are:
</term> </term>
<listitem> <listitem>
<para> <para>
The standby's current xmin. This may be 0, if the standby does not The standby's current xmin. This may be 0, if the standby is
support feedback, or is not yet in Hot Standby state. sending notification that Hot Standby feedback will no longer
be sent on this connection. Later non-zero messages may
reinitiate the feedback mechanism.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
...@@ -47,6 +47,7 @@ ...@@ -47,6 +47,7 @@
#include <unistd.h> #include <unistd.h>
#include "access/timeline.h" #include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h" #include "access/xlog_internal.h"
#include "libpq/pqformat.h" #include "libpq/pqformat.h"
#include "libpq/pqsignal.h" #include "libpq/pqsignal.h"
...@@ -138,7 +139,7 @@ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); ...@@ -138,7 +139,7 @@ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying); static void XLogWalRcvFlush(bool dying);
static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendReply(bool force, bool requestReply);
static void XLogWalRcvSendHSFeedback(void); static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
/* Signal handlers */ /* Signal handlers */
...@@ -406,6 +407,7 @@ WalReceiverMain(void) ...@@ -406,6 +407,7 @@ WalReceiverMain(void)
{ {
got_SIGHUP = false; got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP); ProcessConfigFile(PGC_SIGHUP);
XLogWalRcvSendHSFeedback(true);
} }
/* Wait a while for data to arrive */ /* Wait a while for data to arrive */
...@@ -496,7 +498,7 @@ WalReceiverMain(void) ...@@ -496,7 +498,7 @@ WalReceiverMain(void)
} }
XLogWalRcvSendReply(requestReply, requestReply); XLogWalRcvSendReply(requestReply, requestReply);
XLogWalRcvSendHSFeedback(); XLogWalRcvSendHSFeedback(false);
} }
} }
...@@ -1059,46 +1061,60 @@ XLogWalRcvSendReply(bool force, bool requestReply) ...@@ -1059,46 +1061,60 @@ XLogWalRcvSendReply(bool force, bool requestReply)
/* /*
* Send hot standby feedback message to primary, plus the current time, * Send hot standby feedback message to primary, plus the current time,
* in case they don't have a watch. * in case they don't have a watch.
*
* If the user disables feedback, send one final message to tell sender
* to forget about the xmin on this standby.
*/ */
static void static void
XLogWalRcvSendHSFeedback(void) XLogWalRcvSendHSFeedback(bool immed)
{ {
TimestampTz now; TimestampTz now;
TransactionId nextXid; TransactionId nextXid;
uint32 nextEpoch; uint32 nextEpoch;
TransactionId xmin; TransactionId xmin;
static TimestampTz sendTime = 0; static TimestampTz sendTime = 0;
static bool master_has_standby_xmin = false;
/* /*
* If the user doesn't want status to be reported to the master, be sure * If the user doesn't want status to be reported to the master, be sure
* to exit before doing anything at all. * to exit before doing anything at all.
*/ */
if (wal_receiver_status_interval <= 0 || !hot_standby_feedback) if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
!master_has_standby_xmin)
return; return;
/* Get current timestamp. */ /* Get current timestamp. */
now = GetCurrentTimestamp(); now = GetCurrentTimestamp();
/* if (!immed)
* Send feedback at most once per wal_receiver_status_interval. {
*/ /*
if (!TimestampDifferenceExceeds(sendTime, now, * Send feedback at most once per wal_receiver_status_interval.
*/
if (!TimestampDifferenceExceeds(sendTime, now,
wal_receiver_status_interval * 1000)) wal_receiver_status_interval * 1000))
return; return;
sendTime = now; sendTime = now;
}
/* /*
* If Hot Standby is not yet active there is nothing to send. Check this * If Hot Standby is not yet active there is nothing to send. Check this
* after the interval has expired to reduce number of calls. * after the interval has expired to reduce number of calls.
*/ */
if (!HotStandbyActive()) if (!HotStandbyActive())
{
Assert(!master_has_standby_xmin);
return; return;
}
/* /*
* Make the expensive call to get the oldest xmin once we are certain * Make the expensive call to get the oldest xmin once we are certain
* everything else has been checked. * everything else has been checked.
*/ */
xmin = GetOldestXmin(true, false); if (hot_standby_feedback)
xmin = GetOldestXmin(true, false);
else
xmin = InvalidTransactionId;
/* /*
* Get epoch and adjust if nextXid and oldestXmin are different sides of * Get epoch and adjust if nextXid and oldestXmin are different sides of
...@@ -1118,6 +1134,10 @@ XLogWalRcvSendHSFeedback(void) ...@@ -1118,6 +1134,10 @@ XLogWalRcvSendHSFeedback(void)
pq_sendint(&reply_message, xmin, 4); pq_sendint(&reply_message, xmin, 4);
pq_sendint(&reply_message, nextEpoch, 4); pq_sendint(&reply_message, nextEpoch, 4);
walrcv_send(reply_message.data, reply_message.len); walrcv_send(reply_message.data, reply_message.len);
if (TransactionIdIsValid(xmin))
master_has_standby_xmin = true;
else
master_has_standby_xmin = false;
} }
/* /*
......
...@@ -870,9 +870,12 @@ ProcessStandbyHSFeedbackMessage(void) ...@@ -870,9 +870,12 @@ ProcessStandbyHSFeedbackMessage(void)
feedbackXmin, feedbackXmin,
feedbackEpoch); feedbackEpoch);
/* Ignore invalid xmin (can't actually happen with current walreceiver) */ /* Unset WalSender's xmin if the feedback message value is invalid */
if (!TransactionIdIsNormal(feedbackXmin)) if (!TransactionIdIsNormal(feedbackXmin))
{
MyPgXact->xmin = InvalidTransactionId;
return; return;
}
/* /*
* Check that the provided xmin/epoch are sane, that is, not in the future * Check that the provided xmin/epoch are sane, that is, not in the future
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment