Commit 4f3924d9 authored by Alvaro Herrera's avatar Alvaro Herrera

Keep CommitTs module in sync in standby and master

We allow this module to be turned off on restarts, so a restart time
check is enough to activate or deactivate the module; however, if there
is a standby replaying WAL emitted from a master which is restarted, but
the standby isn't, the state in the standby becomes inconsistent and can
easily be crashed.

Fix by activating and deactivating the module during WAL replay on
parameter change as well as on system start.

Problem reported by Fujii Masao in
http://www.postgresql.org/message-id/CAHGQGwFhJ3CnHo1CELEfay18yg_RA-XZT-7D8NuWUoYSZ90r4Q@mail.gmail.com

Author: Petr Jelínek
parent e3f1c24b
...@@ -557,6 +557,12 @@ StartupCommitTs(void) ...@@ -557,6 +557,12 @@ StartupCommitTs(void)
TransactionId xid = ShmemVariableCache->nextXid; TransactionId xid = ShmemVariableCache->nextXid;
int pageno = TransactionIdToCTsPage(xid); int pageno = TransactionIdToCTsPage(xid);
if (track_commit_timestamp)
{
ActivateCommitTs();
return;
}
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
/* /*
...@@ -569,8 +575,25 @@ StartupCommitTs(void) ...@@ -569,8 +575,25 @@ StartupCommitTs(void)
/* /*
* This must be called ONCE during postmaster or standalone-backend startup, * This must be called ONCE during postmaster or standalone-backend startup,
* when commit timestamp is enabled. Must be called after recovery has * when commit timestamp is enabled, after recovery has finished.
* finished. */
void
CompleteCommitTsInitialization(void)
{
if (!track_commit_timestamp)
DeactivateCommitTs(true);
}
/*
* Activate this module whenever necessary.
* This must happen during postmaster or standalong-backend startup,
* or during WAL replay anytime the track_commit_timestamp setting is
* changed in the master.
*
* The reason why this SLRU needs separate activation/deactivation functions is
* that it can be enabled/disabled during start and the activation/deactivation
* on master is propagated to slave via replay. Other SLRUs don't have this
* property and they can be just initialized during normal startup.
* *
* This is in charge of creating the currently active segment, if it's not * This is in charge of creating the currently active segment, if it's not
* already there. The reason for this is that the server might have been * already there. The reason for this is that the server might have been
...@@ -578,7 +601,7 @@ StartupCommitTs(void) ...@@ -578,7 +601,7 @@ StartupCommitTs(void)
* the normal creation point. * the normal creation point.
*/ */
void void
CompleteCommitTsInitialization(void) ActivateCommitTs(void)
{ {
TransactionId xid = ShmemVariableCache->nextXid; TransactionId xid = ShmemVariableCache->nextXid;
int pageno = TransactionIdToCTsPage(xid); int pageno = TransactionIdToCTsPage(xid);
...@@ -590,22 +613,6 @@ CompleteCommitTsInitialization(void) ...@@ -590,22 +613,6 @@ CompleteCommitTsInitialization(void)
CommitTsCtl->shared->latest_page_number = pageno; CommitTsCtl->shared->latest_page_number = pageno;
LWLockRelease(CommitTsControlLock); LWLockRelease(CommitTsControlLock);
/*
* If this module is not currently enabled, make sure we don't hand back
* possibly-invalid data; also remove segments of old data.
*/
if (!track_commit_timestamp)
{
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
ShmemVariableCache->newestCommitTs = InvalidTransactionId;
LWLockRelease(CommitTsLock);
TruncateCommitTs(ReadNewTransactionId());
return;
}
/* /*
* If CommitTs is enabled, but it wasn't in the previous server run, we * If CommitTs is enabled, but it wasn't in the previous server run, we
* need to set the oldest and newest values to the next Xid; that way, we * need to set the oldest and newest values to the next Xid; that way, we
...@@ -640,6 +647,37 @@ CompleteCommitTsInitialization(void) ...@@ -640,6 +647,37 @@ CompleteCommitTsInitialization(void)
} }
} }
/*
* Deactivate this module.
*
* This must be called when the track_commit_timestamp parameter is turned off.
* This happens during postmaster or standalone-backend startup, or during WAL
* replay.
*
* Resets CommitTs into invalid state to make sure we don't hand back
* possibly-invalid data; also removes segments of old data.
*/
void
DeactivateCommitTs(bool do_wal)
{
TransactionId xid = ShmemVariableCache->nextXid;
int pageno = TransactionIdToCTsPage(xid);
/*
* Re-Initialize our idea of the latest page number.
*/
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
CommitTsCtl->shared->latest_page_number = pageno;
LWLockRelease(CommitTsControlLock);
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
ShmemVariableCache->newestCommitTs = InvalidTransactionId;
LWLockRelease(CommitTsLock);
TruncateCommitTs(ReadNewTransactionId(), do_wal);
}
/* /*
* This must be called ONCE during postmaster or standalone-backend shutdown * This must be called ONCE during postmaster or standalone-backend shutdown
*/ */
...@@ -705,7 +743,7 @@ ExtendCommitTs(TransactionId newestXact) ...@@ -705,7 +743,7 @@ ExtendCommitTs(TransactionId newestXact)
* Note that we don't need to flush XLOG here. * Note that we don't need to flush XLOG here.
*/ */
void void
TruncateCommitTs(TransactionId oldestXact) TruncateCommitTs(TransactionId oldestXact, bool do_wal)
{ {
int cutoffPage; int cutoffPage;
...@@ -721,7 +759,8 @@ TruncateCommitTs(TransactionId oldestXact) ...@@ -721,7 +759,8 @@ TruncateCommitTs(TransactionId oldestXact)
return; /* nothing to remove */ return; /* nothing to remove */
/* Write XLOG record */ /* Write XLOG record */
WriteTruncateXlogRec(cutoffPage); if (do_wal)
WriteTruncateXlogRec(cutoffPage);
/* Now we can remove the old CommitTs segment(s) */ /* Now we can remove the old CommitTs segment(s) */
SimpleLruTruncate(CommitTsCtl, cutoffPage); SimpleLruTruncate(CommitTsCtl, cutoffPage);
......
...@@ -5688,6 +5688,19 @@ do { \ ...@@ -5688,6 +5688,19 @@ do { \
minValue))); \ minValue))); \
} while(0) } while(0)
#define RecoveryRequiresBoolParameter(param_name, currValue, masterValue) \
do { \
bool _currValue = (currValue); \
bool _masterValue = (masterValue); \
if (_currValue != _masterValue) \
ereport(ERROR, \
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), \
errmsg("hot standby is not possible because it requires \"%s\" to be same on master and standby (master has \"%s\", standby has \"%s\")", \
param_name, \
_masterValue ? "true" : "false", \
_currValue ? "true" : "false"))); \
} while(0)
/* /*
* Check to see if required parameters are set high enough on this server * Check to see if required parameters are set high enough on this server
* for various aspects of recovery operation. * for various aspects of recovery operation.
...@@ -5730,6 +5743,9 @@ CheckRequiredParameterValues(void) ...@@ -5730,6 +5743,9 @@ CheckRequiredParameterValues(void)
RecoveryRequiresIntParameter("max_locks_per_transaction", RecoveryRequiresIntParameter("max_locks_per_transaction",
max_locks_per_xact, max_locks_per_xact,
ControlFile->max_locks_per_xact); ControlFile->max_locks_per_xact);
RecoveryRequiresBoolParameter("track_commit_timestamp",
track_commit_timestamp,
ControlFile->track_commit_timestamp);
} }
} }
...@@ -9118,7 +9134,6 @@ xlog_redo(XLogReaderState *record) ...@@ -9118,7 +9134,6 @@ xlog_redo(XLogReaderState *record)
ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact; ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact;
ControlFile->wal_level = xlrec.wal_level; ControlFile->wal_level = xlrec.wal_level;
ControlFile->wal_log_hints = xlrec.wal_log_hints; ControlFile->wal_log_hints = xlrec.wal_log_hints;
ControlFile->track_commit_timestamp = xlrec.track_commit_timestamp;
/* /*
* Update minRecoveryPoint to ensure that if recovery is aborted, we * Update minRecoveryPoint to ensure that if recovery is aborted, we
...@@ -9136,6 +9151,25 @@ xlog_redo(XLogReaderState *record) ...@@ -9136,6 +9151,25 @@ xlog_redo(XLogReaderState *record)
ControlFile->minRecoveryPointTLI = ThisTimeLineID; ControlFile->minRecoveryPointTLI = ThisTimeLineID;
} }
/*
* Update the commit timestamp tracking. If there was a change
* it needs to be activated or deactivated accordingly.
*/
if (track_commit_timestamp != xlrec.track_commit_timestamp)
{
track_commit_timestamp = xlrec.track_commit_timestamp;
ControlFile->track_commit_timestamp = track_commit_timestamp;
if (track_commit_timestamp)
ActivateCommitTs();
else
/*
* We can't create a new WAL record here, but that's OK as
* master did the WAL logging already and we will replay the
* record from master in case we crash.
*/
DeactivateCommitTs(false);
}
UpdateControlFile(); UpdateControlFile();
LWLockRelease(ControlFileLock); LWLockRelease(ControlFileLock);
......
...@@ -1087,7 +1087,7 @@ vac_truncate_clog(TransactionId frozenXID, ...@@ -1087,7 +1087,7 @@ vac_truncate_clog(TransactionId frozenXID,
* checkpoint. * checkpoint.
*/ */
TruncateCLOG(frozenXID); TruncateCLOG(frozenXID);
TruncateCommitTs(frozenXID); TruncateCommitTs(frozenXID, true);
/* /*
* Update the wrap limit for GetNewTransactionId and creation of new * Update the wrap limit for GetNewTransactionId and creation of new
......
...@@ -39,11 +39,13 @@ extern Size CommitTsShmemSize(void); ...@@ -39,11 +39,13 @@ extern Size CommitTsShmemSize(void);
extern void CommitTsShmemInit(void); extern void CommitTsShmemInit(void);
extern void BootStrapCommitTs(void); extern void BootStrapCommitTs(void);
extern void StartupCommitTs(void); extern void StartupCommitTs(void);
extern void ActivateCommitTs(void);
extern void DeactivateCommitTs(bool do_wal);
extern void CompleteCommitTsInitialization(void); extern void CompleteCommitTsInitialization(void);
extern void ShutdownCommitTs(void); extern void ShutdownCommitTs(void);
extern void CheckPointCommitTs(void); extern void CheckPointCommitTs(void);
extern void ExtendCommitTs(TransactionId newestXact); extern void ExtendCommitTs(TransactionId newestXact);
extern void TruncateCommitTs(TransactionId oldestXact); extern void TruncateCommitTs(TransactionId oldestXact, bool do_wal);
extern void SetCommitTsLimit(TransactionId oldestXact, extern void SetCommitTsLimit(TransactionId oldestXact,
TransactionId newestXact); TransactionId newestXact);
extern void AdvanceOldestCommitTs(TransactionId oldestXact); extern void AdvanceOldestCommitTs(TransactionId oldestXact);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment