Commit 7f60b81e authored by Tom Lane's avatar Tom Lane

Fix failure in CreateCheckPoint on some Alpha boxes --- it's not OK to

assume that TAS() will always succeed the first time, even if the lock
is known to be free.  Also, make sure that code will eventually time out
and report a stuck spinlock, rather than looping forever.  Small cleanups
in s_lock.h, too.
parent 7d363c4c
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.45 2000/12/28 13:00:08 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.46 2000/12/29 21:31:21 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -411,7 +411,7 @@ begin:; ...@@ -411,7 +411,7 @@ begin:;
} }
} }
} }
s_lock_sleep(i++); S_LOCK_SLEEP(&(XLogCtl->insert_lck), i++);
if (!TAS(&(XLogCtl->insert_lck))) if (!TAS(&(XLogCtl->insert_lck)))
break; break;
} }
...@@ -599,17 +599,10 @@ begin:; ...@@ -599,17 +599,10 @@ begin:;
if (updrqst) if (updrqst)
{ {
for (;;) S_LOCK(&(XLogCtl->info_lck));
{
if (!TAS(&(XLogCtl->info_lck)))
{
if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrRqst.Write)) if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrRqst.Write))
XLogCtl->LgwrRqst.Write = LgwrRqst.Write; XLogCtl->LgwrRqst.Write = LgwrRqst.Write;
S_UNLOCK(&(XLogCtl->info_lck)); S_UNLOCK(&(XLogCtl->info_lck));
break;
}
s_lock_sleep(i++);
}
} }
END_CRIT_CODE; END_CRIT_CODE;
...@@ -622,7 +615,7 @@ XLogFlush(XLogRecPtr record) ...@@ -622,7 +615,7 @@ XLogFlush(XLogRecPtr record)
XLogRecPtr WriteRqst; XLogRecPtr WriteRqst;
char buffer[BLCKSZ]; char buffer[BLCKSZ];
char *usebuf = NULL; char *usebuf = NULL;
unsigned i = 0; unsigned spins = 0;
bool force_lgwr = false; bool force_lgwr = false;
if (XLOG_DEBUG) if (XLOG_DEBUG)
...@@ -715,7 +708,7 @@ XLogFlush(XLogRecPtr record) ...@@ -715,7 +708,7 @@ XLogFlush(XLogRecPtr record)
break; break;
} }
} }
s_lock_sleep(i++); S_LOCK_SLEEP(&(XLogCtl->lgwr_lck), spins++);
} }
if (logFile >= 0 && (LgwrResult.Write.xlogid != logId || if (logFile >= 0 && (LgwrResult.Write.xlogid != logId ||
...@@ -740,18 +733,12 @@ XLogFlush(XLogRecPtr record) ...@@ -740,18 +733,12 @@ XLogFlush(XLogRecPtr record)
logId, logSeg); logId, logSeg);
LgwrResult.Flush = LgwrResult.Write; LgwrResult.Flush = LgwrResult.Write;
for (i = 0;;) S_LOCK(&(XLogCtl->info_lck));
{
if (!TAS(&(XLogCtl->info_lck)))
{
XLogCtl->LgwrResult = LgwrResult; XLogCtl->LgwrResult = LgwrResult;
if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write)) if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write))
XLogCtl->LgwrRqst.Write = LgwrResult.Write; XLogCtl->LgwrRqst.Write = LgwrResult.Write;
S_UNLOCK(&(XLogCtl->info_lck)); S_UNLOCK(&(XLogCtl->info_lck));
break;
}
s_lock_sleep(i++);
}
XLogCtl->Write.LgwrResult = LgwrResult; XLogCtl->Write.LgwrResult = LgwrResult;
S_UNLOCK(&(XLogCtl->lgwr_lck)); S_UNLOCK(&(XLogCtl->lgwr_lck));
...@@ -767,6 +754,7 @@ GetFreeXLBuffer() ...@@ -767,6 +754,7 @@ GetFreeXLBuffer()
XLogCtlInsert *Insert = &XLogCtl->Insert; XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogCtlWrite *Write = &XLogCtl->Write; XLogCtlWrite *Write = &XLogCtl->Write;
uint16 curridx = NextBufIdx(Insert->curridx); uint16 curridx = NextBufIdx(Insert->curridx);
unsigned spins = 0;
LgwrRqst.Write = XLogCtl->xlblocks[Insert->curridx]; LgwrRqst.Write = XLogCtl->xlblocks[Insert->curridx];
for (;;) for (;;)
...@@ -809,9 +797,8 @@ GetFreeXLBuffer() ...@@ -809,9 +797,8 @@ GetFreeXLBuffer()
InitXLBuffer(curridx); InitXLBuffer(curridx);
return; return;
} }
S_LOCK_SLEEP(&(XLogCtl->lgwr_lck), spins++);
} }
return;
} }
static void static void
...@@ -820,7 +807,6 @@ XLogWrite(char *buffer) ...@@ -820,7 +807,6 @@ XLogWrite(char *buffer)
XLogCtlWrite *Write = &XLogCtl->Write; XLogCtlWrite *Write = &XLogCtl->Write;
char *from; char *from;
uint32 wcnt = 0; uint32 wcnt = 0;
int i = 0;
bool usexistent; bool usexistent;
for (; XLByteLT(LgwrResult.Write, LgwrRqst.Write);) for (; XLByteLT(LgwrResult.Write, LgwrRqst.Write);)
...@@ -919,18 +905,12 @@ XLogWrite(char *buffer) ...@@ -919,18 +905,12 @@ XLogWrite(char *buffer)
LgwrResult.Flush = LgwrResult.Write; LgwrResult.Flush = LgwrResult.Write;
} }
for (;;) S_LOCK(&(XLogCtl->info_lck));
{
if (!TAS(&(XLogCtl->info_lck)))
{
XLogCtl->LgwrResult = LgwrResult; XLogCtl->LgwrResult = LgwrResult;
if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write)) if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write))
XLogCtl->LgwrRqst.Write = LgwrResult.Write; XLogCtl->LgwrRqst.Write = LgwrResult.Write;
S_UNLOCK(&(XLogCtl->info_lck)); S_UNLOCK(&(XLogCtl->info_lck));
break;
}
s_lock_sleep(i++);
}
Write->LgwrResult = LgwrResult; Write->LgwrResult = LgwrResult;
} }
...@@ -2062,18 +2042,17 @@ CreateCheckPoint(bool shutdown) ...@@ -2062,18 +2042,17 @@ CreateCheckPoint(bool shutdown)
uint32 _logId; uint32 _logId;
uint32 _logSeg; uint32 _logSeg;
char archdir[MAXPGPATH]; char archdir[MAXPGPATH];
unsigned spins = 0;
if (MyLastRecPtr.xrecoff != 0) if (MyLastRecPtr.xrecoff != 0)
elog(ERROR, "CreateCheckPoint: cannot be called inside transaction block"); elog(ERROR, "CreateCheckPoint: cannot be called inside transaction block");
START_CRIT_CODE; START_CRIT_CODE;
/* Grab lock, using larger than normal sleep between tries (1 sec) */
while (TAS(&(XLogCtl->chkp_lck))) while (TAS(&(XLogCtl->chkp_lck)))
{ {
struct timeval delay = {2, 0}; S_LOCK_SLEEP_INTERVAL(&(XLogCtl->chkp_lck), spins++, 1000000);
if (shutdown)
elog(STOP, "Checkpoint lock is busy while data base is shutting down");
(void) select(0, NULL, NULL, NULL, &delay);
} }
memset(&checkPoint, 0, sizeof(checkPoint)); memset(&checkPoint, 0, sizeof(checkPoint));
...@@ -2087,14 +2066,7 @@ CreateCheckPoint(bool shutdown) ...@@ -2087,14 +2066,7 @@ CreateCheckPoint(bool shutdown)
checkPoint.Shutdown = shutdown; checkPoint.Shutdown = shutdown;
/* Get REDO record ptr */ /* Get REDO record ptr */
while (TAS(&(XLogCtl->insert_lck))) S_LOCK(&(XLogCtl->insert_lck));
{
struct timeval delay = {1, 0};
if (shutdown)
elog(STOP, "XLog insert lock is busy while data base is shutting down");
(void) select(0, NULL, NULL, NULL, &delay);
}
freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos; freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos;
if (freespace < SizeOfXLogRecord) if (freespace < SizeOfXLogRecord)
{ {
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.100 2000/12/28 13:00:21 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.101 2000/12/29 21:31:21 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -1987,7 +1987,7 @@ LockBuffer(Buffer buffer, int mode) ...@@ -1987,7 +1987,7 @@ LockBuffer(Buffer buffer, int mode)
while (buf->ri_lock || buf->w_lock) while (buf->ri_lock || buf->w_lock)
{ {
S_UNLOCK(&(buf->cntx_lock)); S_UNLOCK(&(buf->cntx_lock));
s_lock_sleep(i++); S_LOCK_SLEEP(&(buf->cntx_lock), i++);
S_LOCK(&(buf->cntx_lock)); S_LOCK(&(buf->cntx_lock));
} }
(buf->r_locks)++; (buf->r_locks)++;
...@@ -2013,7 +2013,7 @@ LockBuffer(Buffer buffer, int mode) ...@@ -2013,7 +2013,7 @@ LockBuffer(Buffer buffer, int mode)
buf->ri_lock = true; buf->ri_lock = true;
} }
S_UNLOCK(&(buf->cntx_lock)); S_UNLOCK(&(buf->cntx_lock));
s_lock_sleep(i++); S_LOCK_SLEEP(&(buf->cntx_lock), i++);
S_LOCK(&(buf->cntx_lock)); S_LOCK(&(buf->cntx_lock));
} }
buf->w_lock = true; buf->w_lock = true;
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/s_lock.c,v 1.27 2000/12/11 00:49:51 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/s_lock.c,v 1.28 2000/12/29 21:31:20 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -25,19 +25,21 @@ ...@@ -25,19 +25,21 @@
* number of microseconds to wait. This accomplishes pseudo random back-off. * number of microseconds to wait. This accomplishes pseudo random back-off.
* Values are not critical but 10 milliseconds is a common platform * Values are not critical but 10 milliseconds is a common platform
* granularity. * granularity.
* note: total time to cycle through all 16 entries might be about .07 sec. *
* Total time to cycle through all 20 entries might be about .07 sec,
* so the given value of S_MAX_BUSY results in timeout after ~70 sec.
*/ */
#define S_NSPINCYCLE 20 #define S_NSPINCYCLE 20
#define S_MAX_BUSY 1000 * S_NSPINCYCLE #define S_MAX_BUSY 1000 * S_NSPINCYCLE
int s_spincycle[S_NSPINCYCLE] = int s_spincycle[S_NSPINCYCLE] =
{0, 0, 0, 0, 10000, 0, 0, 0, 10000, 0, { 0, 0, 0, 0, 10000, 0, 0, 0, 10000, 0,
0, 10000, 0, 0, 10000, 0, 10000, 0, 10000, 10000 0, 10000, 0, 0, 10000, 0, 10000, 0, 10000, 10000
}; };
/* /*
* s_lock_stuck(lock) - complain about a stuck spinlock * s_lock_stuck() - complain about a stuck spinlock
*/ */
static void static void
s_lock_stuck(volatile slock_t *lock, const char *file, const int line) s_lock_stuck(volatile slock_t *lock, const char *file, const int line)
...@@ -52,13 +54,38 @@ s_lock_stuck(volatile slock_t *lock, const char *file, const int line) ...@@ -52,13 +54,38 @@ s_lock_stuck(volatile slock_t *lock, const char *file, const int line)
} }
/*
* s_lock_sleep() - sleep a pseudo-random amount of time, check for timeout
*
* Normally 'microsec' is 0, specifying to use the next s_spincycle[] value.
* Some callers may pass a nonzero interval, specifying to use exactly that
* delay value rather than a pseudo-random delay.
*/
void void
s_lock_sleep(unsigned spin) s_lock_sleep(unsigned spins, int microsec,
volatile slock_t *lock,
const char *file, const int line)
{ {
struct timeval delay; struct timeval delay;
unsigned max_spins;
if (microsec > 0)
{
delay.tv_sec = 0; delay.tv_sec = 0;
delay.tv_usec = s_spincycle[spin % S_NSPINCYCLE]; delay.tv_usec = microsec;
/* two-minute timeout in this case */
max_spins = 120000000 / microsec;
}
else
{
delay.tv_sec = 0;
delay.tv_usec = s_spincycle[spins % S_NSPINCYCLE];
max_spins = S_MAX_BUSY;
}
if (spins > max_spins)
s_lock_stuck(lock, file, line);
(void) select(0, NULL, NULL, NULL, &delay); (void) select(0, NULL, NULL, NULL, &delay);
} }
...@@ -71,14 +98,13 @@ s_lock(volatile slock_t *lock, const char *file, const int line) ...@@ -71,14 +98,13 @@ s_lock(volatile slock_t *lock, const char *file, const int line)
{ {
unsigned spins = 0; unsigned spins = 0;
/*
* If you are thinking of changing this code, be careful. This same
* loop logic is used in other places that call TAS() directly.
*/
while (TAS(lock)) while (TAS(lock))
{ {
s_lock_sleep(spins); s_lock_sleep(spins++, 0, lock, file, line);
if (++spins > S_MAX_BUSY)
{
/* It's been over a minute... */
s_lock_stuck(lock, file, line);
}
} }
} }
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment