Commit fad153ec authored by Tom Lane's avatar Tom Lane

Rewrite the sinval messaging mechanism to reduce contention and avoid

unnecessary cache resets.  The major changes are:

* When the queue overflows, we only issue a cache reset to the specific
backend or backends that still haven't read the oldest message, rather
than resetting everyone as in the original coding.

* When we observe backend(s) falling well behind, we signal SIGUSR1
to only one backend, the one that is furthest behind and doesn't already
have a signal outstanding for it.  When it finishes catching up, it will
in turn signal SIGUSR1 to the next-furthest-back guy, if there is one that
is far enough behind to justify a signal.  The PMSIGNAL_WAKEN_CHILDREN
mechanism is removed.

* We don't attempt to clean out dead messages after every message-receipt
operation; rather, we do it on the insertion side, and only when the queue
fullness passes certain thresholds.

* Split SInvalLock into SInvalReadLock and SInvalWriteLock so that readers
don't block writers nor vice versa (except during the infrequent queue
cleanout operations).

* Transfer multiple sinval messages for each acquisition of a read or
write lock.
parent 30dc388a
......@@ -37,7 +37,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.558 2008/06/06 22:35:22 alvherre Exp $
* $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.559 2008/06/19 21:32:56 tgl Exp $
*
* NOTES
*
......@@ -3829,16 +3829,6 @@ sigusr1_handler(SIGNAL_ARGS)
load_role();
}
if (CheckPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN))
{
/*
* Send SIGUSR1 to all children (triggers CatchupInterruptHandler).
* See storage/ipc/sinval[adt].c for the use of this.
*/
if (Shutdown <= SmartShutdown)
SignalChildren(SIGUSR1);
}
if (CheckPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER) &&
PgArchPID != 0)
{
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.85 2008/03/17 11:50:26 alvherre Exp $
* $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.86 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -17,9 +17,7 @@
#include "access/xact.h"
#include "commands/async.h"
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/sinvaladt.h"
#include "utils/inval.h"
......@@ -27,9 +25,9 @@
/*
* Because backends sitting idle will not be reading sinval events, we
* need a way to give an idle backend a swift kick in the rear and make
* it catch up before the sinval queue overflows and forces everyone
* through a cache reset exercise. This is done by broadcasting SIGUSR1
* to all backends when the queue is threatening to become full.
* it catch up before the sinval queue overflows and forces it to go
* through a cache reset exercise. This is done by sending SIGUSR1
* to any backend that gets too far behind.
*
* State for catchup events consists of two flags: one saying whether
* the signal handler is currently allowed to call ProcessCatchupEvent
......@@ -47,67 +45,101 @@ static void ProcessCatchupEvent(void);
/*
* SendSharedInvalidMessage
* Add a shared-cache-invalidation message to the global SI message queue.
* SendSharedInvalidMessages
* Add shared-cache-invalidation message(s) to the global SI message queue.
*/
void
SendSharedInvalidMessage(SharedInvalidationMessage *msg)
SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
{
bool insertOK;
insertOK = SIInsertDataEntry(msg);
if (!insertOK)
elog(DEBUG4, "SI buffer overflow");
SIInsertDataEntries(msgs, n);
}
/*
* ReceiveSharedInvalidMessages
* Process shared-cache-invalidation messages waiting for this backend
*
* We guarantee to process all messages that had been queued before the
* routine was entered. It is of course possible for more messages to get
* queued right after our last SIGetDataEntries call.
*
* NOTE: it is entirely possible for this routine to be invoked recursively
* as a consequence of processing inside the invalFunction or resetFunction.
* Hence, we must be holding no SI resources when we call them. The only
* bad side-effect is that SIDelExpiredDataEntries might be called extra
* times on the way out of a nested call.
* Furthermore, such a recursive call must guarantee that all outstanding
* inval messages have been processed before it exits. This is the reason
* for the strange-looking choice to use a statically allocated buffer array
* and counters; it's so that a recursive call can process messages already
* sucked out of sinvaladt.c.
*/
void
ReceiveSharedInvalidMessages(
void (*invalFunction) (SharedInvalidationMessage *msg),
void (*resetFunction) (void))
{
SharedInvalidationMessage data;
int getResult;
bool gotMessage = false;
#define MAXINVALMSGS 32
static SharedInvalidationMessage messages[MAXINVALMSGS];
/*
* We use volatile here to prevent bugs if a compiler doesn't realize
* that recursion is a possibility ...
*/
static volatile int nextmsg = 0;
static volatile int nummsgs = 0;
for (;;)
/* Deal with any messages still pending from an outer recursion */
while (nextmsg < nummsgs)
{
/*
* We can discard any pending catchup event, since we will not exit
* this loop until we're fully caught up.
*/
catchupInterruptOccurred = 0;
SharedInvalidationMessage *msg = &messages[nextmsg++];
getResult = SIGetDataEntry(MyBackendId, &data);
invalFunction(msg);
}
do
{
int getResult;
nextmsg = nummsgs = 0;
/* Try to get some more messages */
getResult = SIGetDataEntries(messages, MAXINVALMSGS);
if (getResult == 0)
break; /* nothing more to do */
if (getResult < 0)
{
/* got a reset message */
elog(DEBUG4, "cache state reset");
resetFunction();
break; /* nothing more to do */
}
else
/* Process them, being wary that a recursive call might eat some */
nextmsg = 0;
nummsgs = getResult;
while (nextmsg < nummsgs)
{
/* got a normal data message */
invalFunction(&data);
SharedInvalidationMessage *msg = &messages[nextmsg++];
invalFunction(msg);
}
gotMessage = true;
}
/* If we got any messages, try to release dead messages */
if (gotMessage)
SIDelExpiredDataEntries(false);
/*
* We only need to loop if the last SIGetDataEntries call (which
* might have been within a recursive call) returned a full buffer.
*/
} while (nummsgs == MAXINVALMSGS);
/*
* We are now caught up. If we received a catchup signal, reset that
* flag, and call SICleanupQueue(). This is not so much because we
* need to flush dead messages right now, as that we want to pass on
* the catchup signal to the next slowest backend. "Daisy chaining" the
* catchup signal this way avoids creating spikes in system load for
* what should be just a background maintenance activity.
*/
if (catchupInterruptOccurred)
{
catchupInterruptOccurred = 0;
elog(DEBUG4, "sinval catchup complete, cleaning queue");
SICleanupQueue(false, 0);
}
}
......
/*-------------------------------------------------------------------------
*
* sinvaladt.c
* POSTGRES shared cache invalidation segment definitions.
* POSTGRES shared cache invalidation data manager.
*
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.70 2008/06/17 20:07:08 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.71 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
......@@ -27,20 +28,44 @@
/*
* Conceptually, the shared cache invalidation messages are stored in an
* infinite array, where maxMsgNum is the next array subscript to store a
* submitted message in, minMsgNum is the smallest array subscript containing a
* message not yet read by all backends, and we always have maxMsgNum >=
* submitted message in, minMsgNum is the smallest array subscript containing
* a message not yet read by all backends, and we always have maxMsgNum >=
* minMsgNum. (They are equal when there are no messages pending.) For each
* active backend, there is a nextMsgNum pointer indicating the next message it
* needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
* backend.
*
* (In the current implementation, minMsgNum is a lower bound for the
* per-process nextMsgNum values, but it isn't rigorously kept equal to the
* smallest nextMsgNum --- it may lag behind. We only update it when
* SICleanupQueue is called, and we try not to do that often.)
*
* In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
* entries. We translate MsgNum values into circular-buffer indexes by
* computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
* MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
* doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
* in the buffer. If the buffer does overflow, we reset it to empty and
* force each backend to "reset", ie, discard all its invalidatable state.
* in the buffer. If the buffer does overflow, we recover by setting the
* "reset" flag for each backend that has fallen too far behind. A backend
* that is in "reset" state is ignored while determining minMsgNum. When
* it does finally attempt to receive inval messages, it must discard all
* its invalidatable state, since it won't know what it missed.
*
* To reduce the probability of needing resets, we send a "catchup" interrupt
* to any backend that seems to be falling unreasonably far behind. The
* normal behavior is that at most one such interrupt is in flight at a time;
* when a backend completes processing a catchup interrupt, it executes
* SICleanupQueue, which will signal the next-furthest-behind backend if
* needed. This avoids undue contention from multiple backends all trying
* to catch up at once. However, the furthest-back backend might be stuck
* in a state where it can't catch up. Eventually it will get reset, so it
* won't cause any more problems for anyone but itself. But we don't want
* to find that a bunch of other backends are now too close to the reset
* threshold to be saved. So SICleanupQueue is designed to occasionally
* send extra catchup interrupts as the queue gets fuller, to backends that
* are far behind and haven't gotten one yet. As long as there aren't a lot
* of "stuck" backends, we won't need a lot of extra interrupts, since ones
* that aren't stuck will propagate their interrupts to the next guy.
*
* We would have problems if the MsgNum values overflow an integer, so
* whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
......@@ -48,6 +73,21 @@
* large so that we don't need to do this often. It must be a multiple of
* MAXNUMMESSAGES so that the existing circular-buffer entries don't need
* to be moved when we do it.
*
* Access to the shared sinval array is protected by two locks, SInvalReadLock
* and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
* authorizes them to modify their own ProcState but not to modify or even
* look at anyone else's. When we need to perform array-wide updates,
* such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
* lock out all readers. Writers take SInvalWriteLock (always in exclusive
* mode) to serialize adding messages to the queue. Note that a writer
* can operate in parallel with one or more readers, because the writer
* has no need to touch anyone's ProcState, except in the infrequent cases
* when SICleanupQueue is needed. The only point of overlap is that
* the writer might change maxMsgNum while readers are looking at it.
* This should be okay: we are assuming that fetching or storing an int
* is atomic, an assumption also made elsewhere in Postgres. However
* readers mustn't assume that maxMsgNum isn't changing under them.
*/
......@@ -59,17 +99,46 @@
*
* MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
* Must be a multiple of MAXNUMMESSAGES. Should be large.
*
* CLEANUP_MIN: the minimum number of messages that must be in the buffer
* before we bother to call SICleanupQueue.
*
* CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
* we exceed CLEANUP_MIN. Should be a power of 2 for speed.
*
* SIG_THRESHOLD: the minimum number of messages a backend must have fallen
* behind before we'll send it SIGUSR1.
*
* WRITE_QUANTUM: the max number of messages to push into the buffer per
* iteration of SIInsertDataEntries. Noncritical but should be less than
* CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
* per iteration.
*/
#define MAXNUMMESSAGES 4096
#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
#define CLEANUP_MIN (MAXNUMMESSAGES / 2)
#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
#define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
#define WRITE_QUANTUM 64
/* Per-backend state in shared invalidation structure */
typedef struct ProcState
{
/* nextMsgNum is -1 in an inactive ProcState array entry. */
int nextMsgNum; /* next message number to read, or -1 */
bool resetState; /* true, if backend has to reset its state */
/* procPid is zero in an inactive ProcState array entry. */
pid_t procPid; /* PID of backend, for signaling */
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
int nextMsgNum; /* next message number to read */
bool resetState; /* backend needs to reset its state */
bool signaled; /* backend has been sent catchup signal */
/*
* Next LocalTransactionId to use for each idle backend slot. We keep
* this here because it is indexed by BackendId and it is convenient to
* copy the value to and from local memory when MyBackendId is set.
* It's meaningless in an active ProcState entry.
*/
LocalTransactionId nextLXID;
} ProcState;
/* Shared cache invalidation memory segment */
......@@ -80,16 +149,10 @@ typedef struct SISeg
*/
int minMsgNum; /* oldest message still needed */
int maxMsgNum; /* next message number to be assigned */
int nextThreshold; /* # of messages to call SICleanupQueue */
int lastBackend; /* index of last active procState entry, +1 */
int maxBackends; /* size of procState array */
/*
* Next LocalTransactionId to use for each idle backend slot. We keep
* this here because it is indexed by BackendId and it is convenient to
* copy the value to and from local memory when MyBackendId is set.
*/
LocalTransactionId *nextLXID; /* array of maxBackends entries */
/*
* Circular buffer holding shared-inval messages
*/
......@@ -110,7 +173,6 @@ static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
static LocalTransactionId nextLocalTransactionId;
static void CleanupInvalidationState(int status, Datum arg);
static void SISetProcStateInvalid(SISeg *segP);
/*
......@@ -124,8 +186,6 @@ SInvalShmemSize(void)
size = offsetof(SISeg, procState);
size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
size = add_size(size, mul_size(sizeof(LocalTransactionId), MaxBackends));
return size;
}
......@@ -149,11 +209,10 @@ CreateSharedInvalidationState(void)
if (found)
return;
shmInvalBuffer->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends);
/* Clear message counters, save size of procState array */
shmInvalBuffer->minMsgNum = 0;
shmInvalBuffer->maxMsgNum = 0;
shmInvalBuffer->nextThreshold = CLEANUP_MIN;
shmInvalBuffer->lastBackend = 0;
shmInvalBuffer->maxBackends = MaxBackends;
......@@ -162,9 +221,11 @@ CreateSharedInvalidationState(void)
/* Mark all backends inactive, and initialize nextLXID */
for (i = 0; i < shmInvalBuffer->maxBackends; i++)
{
shmInvalBuffer->procState[i].nextMsgNum = -1; /* inactive */
shmInvalBuffer->procState[i].procPid = 0; /* inactive */
shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
shmInvalBuffer->procState[i].resetState = false;
shmInvalBuffer->nextLXID[i] = InvalidLocalTransactionId;
shmInvalBuffer->procState[i].signaled = false;
shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
}
}
......@@ -179,12 +240,19 @@ SharedInvalBackendInit(void)
ProcState *stateP = NULL;
SISeg *segP = shmInvalBuffer;
LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
/*
* This can run in parallel with read operations, and for that matter
* with write operations; but not in parallel with additions and removals
* of backends, nor in parallel with SICleanupQueue. It doesn't seem
* worth having a third lock, so we choose to use SInvalWriteLock to
* serialize additions/removals.
*/
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
/* Look for a free entry in the procState array */
for (index = 0; index < segP->lastBackend; index++)
{
if (segP->procState[index].nextMsgNum < 0) /* inactive slot? */
if (segP->procState[index].procPid == 0) /* inactive slot? */
{
stateP = &segP->procState[index];
break;
......@@ -196,7 +264,7 @@ SharedInvalBackendInit(void)
if (segP->lastBackend < segP->maxBackends)
{
stateP = &segP->procState[segP->lastBackend];
Assert(stateP->nextMsgNum < 0);
Assert(stateP->procPid == 0);
segP->lastBackend++;
}
else
......@@ -205,7 +273,7 @@ SharedInvalBackendInit(void)
* out of procState slots: MaxBackends exceeded -- report normally
*/
MyBackendId = InvalidBackendId;
LWLockRelease(SInvalLock);
LWLockRelease(SInvalWriteLock);
ereport(FATAL,
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("sorry, too many clients already")));
......@@ -214,21 +282,21 @@ SharedInvalBackendInit(void)
MyBackendId = (stateP - &segP->procState[0]) + 1;
#ifdef INVALIDDEBUG
elog(DEBUG2, "my backend id is %d", MyBackendId);
#endif /* INVALIDDEBUG */
elog(DEBUG4, "my backend id is %d", MyBackendId);
/* Advertise assigned backend ID in MyProc */
MyProc->backendId = MyBackendId;
/* Fetch next local transaction ID into local memory */
nextLocalTransactionId = segP->nextLXID[MyBackendId - 1];
nextLocalTransactionId = stateP->nextLXID;
/* mark myself active, with all extant messages already read */
stateP->procPid = MyProcPid;
stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false;
stateP->signaled = false;
LWLockRelease(SInvalLock);
LWLockRelease(SInvalWriteLock);
/* register exit routine to mark my entry inactive at exit */
on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
......@@ -238,8 +306,7 @@ SharedInvalBackendInit(void)
* CleanupInvalidationState
* Mark the current backend as no longer active.
*
* This function is called via on_shmem_exit() during backend shutdown,
* so the caller has NOT acquired the lock for us.
* This function is called via on_shmem_exit() during backend shutdown.
*
* arg is really of type "SISeg*".
*/
......@@ -247,227 +314,247 @@ static void
CleanupInvalidationState(int status, Datum arg)
{
SISeg *segP = (SISeg *) DatumGetPointer(arg);
ProcState *stateP;
int i;
Assert(PointerIsValid(segP));
LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
stateP = &segP->procState[MyBackendId - 1];
/* Update next local transaction ID for next holder of this backendID */
segP->nextLXID[MyBackendId - 1] = nextLocalTransactionId;
stateP->nextLXID = nextLocalTransactionId;
/* Mark myself inactive */
segP->procState[MyBackendId - 1].nextMsgNum = -1;
segP->procState[MyBackendId - 1].resetState = false;
stateP->procPid = 0;
stateP->nextMsgNum = 0;
stateP->resetState = false;
stateP->signaled = false;
/* Recompute index of last active backend */
for (i = segP->lastBackend; i > 0; i--)
{
if (segP->procState[i - 1].nextMsgNum >= 0)
if (segP->procState[i - 1].procPid != 0)
break;
}
segP->lastBackend = i;
LWLockRelease(SInvalLock);
LWLockRelease(SInvalWriteLock);
}
/*
* SIInsertDataEntry
* Add a new invalidation message to the buffer.
*
* If we are unable to insert the message because the buffer is full,
* then clear the buffer and assert the "reset" flag to each backend.
* This will cause all the backends to discard *all* invalidatable state.
*
* Returns true for normal successful insertion, false if had to reset.
* SIInsertDataEntries
* Add new invalidation message(s) to the buffer.
*/
bool
SIInsertDataEntry(SharedInvalidationMessage *data)
void
SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
{
int numMsgs;
bool signal_postmaster = false;
SISeg *segP;
SISeg *segP = shmInvalBuffer;
/*
* N can be arbitrarily large. We divide the work into groups of no more
* than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
* an unreasonably long time. (This is not so much because we care about
* letting in other writers, as that some just-caught-up backend might be
* trying to do SICleanupQueue to pass on its signal, and we don't want it
* to have to wait a long time.) Also, we need to consider calling
* SICleanupQueue every so often.
*/
while (n > 0)
{
int nthistime = Min(n, WRITE_QUANTUM);
int numMsgs;
LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
n -= nthistime;
segP = shmInvalBuffer;
numMsgs = segP->maxMsgNum - segP->minMsgNum;
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
/* Is the buffer full? */
if (numMsgs >= MAXNUMMESSAGES)
{
/*
* Don't panic just yet: slowest backend might have consumed some
* messages but not yet have done SIDelExpiredDataEntries() to advance
* minMsgNum. So, make sure minMsgNum is up-to-date.
* If the buffer is full, we *must* acquire some space. Clean the
* queue and reset anyone who is preventing space from being freed.
* Otherwise, clean the queue only when it's exceeded the next
* fullness threshold.
*/
SIDelExpiredDataEntries(true);
numMsgs = segP->maxMsgNum - segP->minMsgNum;
if (numMsgs >= MAXNUMMESSAGES)
if (numMsgs + nthistime > MAXNUMMESSAGES)
{
/* Yup, it's definitely full, no choice but to reset */
SISetProcStateInvalid(segP);
LWLockRelease(SInvalLock);
return false;
SICleanupQueue(true, nthistime);
Assert((segP->maxMsgNum - segP->minMsgNum + nthistime) <= MAXNUMMESSAGES);
}
}
/*
* Try to prevent table overflow. When the table is 70% full send a
* WAKEN_CHILDREN request to the postmaster. The postmaster will send a
* SIGUSR1 signal to all the backends, which will cause sinval.c to read
* any pending SI entries.
*
* This should never happen if all the backends are actively executing
* queries, but if a backend is sitting idle then it won't be starting
* transactions and so won't be reading SI entries.
*/
if (numMsgs == (MAXNUMMESSAGES * 70 / 100) && IsUnderPostmaster)
signal_postmaster = true;
/*
* Insert new message into proper slot of circular buffer
*/
segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
segP->maxMsgNum++;
LWLockRelease(SInvalLock);
if (signal_postmaster)
{
elog(DEBUG4, "SI table is 70%% full, signaling postmaster");
SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
}
return true;
}
/*
* SISetProcStateInvalid
* Flush pending messages from buffer, assert reset flag for each backend
*
* This is used only to recover from SI buffer overflow.
*/
static void
SISetProcStateInvalid(SISeg *segP)
{
int i;
segP->minMsgNum = 0;
segP->maxMsgNum = 0;
else if (numMsgs >= segP->nextThreshold)
SICleanupQueue(true, 0);
for (i = 0; i < segP->lastBackend; i++)
{
if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
/*
* Insert new message(s) into proper slot of circular buffer
*/
while (nthistime-- > 0)
{
segP->procState[i].resetState = true;
segP->procState[i].nextMsgNum = 0;
segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data++;
segP->maxMsgNum++;
}
LWLockRelease(SInvalWriteLock);
}
}
/*
* SIGetDataEntry
* get next SI message for specified backend, if there is one
* SIGetDataEntries
* get next SI message(s) for current backend, if there are any
*
* Possible return values:
* 0: no SI message available
* 1: next SI message has been extracted into *data
* (there may be more messages available after this one!)
* -1: SI reset message extracted
* 0: no SI message available
* n>0: next n SI messages have been extracted into data[]
* -1: SI reset message extracted
*
* If the return value is less than the array size "datasize", the caller
* can assume that there are no more SI messages after the one(s) returned.
* Otherwise, another call is needed to collect more messages.
*
* NB: this can run in parallel with other instances of SIGetDataEntry
* NB: this can run in parallel with other instances of SIGetDataEntries
* executing on behalf of other backends, since each instance will modify only
* fields of its own backend's ProcState, and no instance will look at fields
* of other backends' ProcStates. We express this by grabbing SInvalLock in
* shared mode. Note that this is not exactly the normal (read-only)
* of other backends' ProcStates. We express this by grabbing SInvalReadLock
* in shared mode. Note that this is not exactly the normal (read-only)
* interpretation of a shared lock! Look closely at the interactions before
* allowing SInvalLock to be grabbed in shared mode for any other reason!
* allowing SInvalReadLock to be grabbed in shared mode for any other reason!
*
* NB: this can also run in parallel with SIInsertDataEntries. It is not
* guaranteed that we will return any messages added after the routine is
* entered.
*
* Note: we assume that "datasize" is not so large that it might be important
* to break our hold on SInvalReadLock into segments.
*/
int
SIGetDataEntry(int backendId, SharedInvalidationMessage *data)
SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
{
ProcState *stateP;
SISeg *segP;
ProcState *stateP;
int n;
LWLockAcquire(SInvalLock, LW_SHARED);
LWLockAcquire(SInvalReadLock, LW_SHARED);
segP = shmInvalBuffer;
stateP = &segP->procState[backendId - 1];
stateP = &segP->procState[MyBackendId - 1];
if (stateP->resetState)
{
/*
* Force reset. We can say we have dealt with any messages added
* since the reset, as well...
* since the reset, as well; and that means we should clear the
* signaled flag, too.
*/
stateP->resetState = false;
stateP->nextMsgNum = segP->maxMsgNum;
LWLockRelease(SInvalLock);
stateP->resetState = false;
stateP->signaled = false;
LWLockRelease(SInvalReadLock);
return -1;
}
if (stateP->nextMsgNum >= segP->maxMsgNum)
{
LWLockRelease(SInvalLock);
return 0; /* nothing to read */
}
/*
* Retrieve message and advance my counter.
* Retrieve messages and advance backend's counter, until data array is
* full or there are no more messages.
*
* There may be other backends that haven't read the message(s), so we
* cannot delete them here. SICleanupQueue() will eventually remove them
* from the queue.
*
* Note: depending on the compiler, we might read maxMsgNum only once
* here, or each time through the loop. It doesn't matter (as long as
* each fetch is atomic).
*/
*data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
stateP->nextMsgNum++;
n = 0;
while (n < datasize && stateP->nextMsgNum < segP->maxMsgNum)
{
data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
stateP->nextMsgNum++;
}
/*
* There may be other backends that haven't read the message, so we cannot
* delete it here. SIDelExpiredDataEntries() should be called to remove
* dead messages.
* Reset our "signaled" flag whenever we have caught up completely.
*/
if (stateP->nextMsgNum >= segP->maxMsgNum)
stateP->signaled = false;
LWLockRelease(SInvalLock);
return 1; /* got a message */
LWLockRelease(SInvalReadLock);
return n;
}
/*
* SIDelExpiredDataEntries
* SICleanupQueue
* Remove messages that have been consumed by all active backends
*
* callerHasWriteLock is TRUE if caller is holding SInvalWriteLock.
* minFree is the minimum number of free message slots required at completion.
*
* Possible side effects of this routine include marking one or more
* backends as "reset" in the array, and sending a catchup interrupt (SIGUSR1)
* to some backend that seems to be getting too far behind. We signal at
* most one backend at a time, for reasons explained at the top of the file.
*/
void
SIDelExpiredDataEntries(bool locked)
SICleanupQueue(bool callerHasWriteLock, int minFree)
{
SISeg *segP = shmInvalBuffer;
int min,
i,
h;
minsig,
lowbound,
numMsgs,
i;
ProcState *needSig = NULL;
if (!locked)
LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
/* Lock out all writers and readers */
if (!callerHasWriteLock)
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
/*
* Recompute minMsgNum = minimum of all backends' nextMsgNum, identify
* the furthest-back backend that needs signaling (if any), and reset
* any backends that are too far back.
*/
min = segP->maxMsgNum;
if (min == segP->minMsgNum)
{
if (!locked)
LWLockRelease(SInvalLock);
return; /* fast path if no messages exist */
}
/* Recompute minMsgNum = minimum of all backends' nextMsgNum */
minsig = min - SIG_THRESHOLD;
lowbound = min - MAXNUMMESSAGES + minFree;
for (i = 0; i < segP->lastBackend; i++)
{
h = segP->procState[i].nextMsgNum;
if (h >= 0)
{ /* backend active */
if (h < min)
min = h;
ProcState *stateP = &segP->procState[i];
int n = stateP->nextMsgNum;
/* Ignore if inactive or already in reset state */
if (stateP->procPid == 0 || stateP->resetState)
continue;
/*
* If we must free some space and this backend is preventing it,
* force him into reset state and then ignore until he catches up.
*/
if (n < lowbound)
{
stateP->resetState = true;
/* no point in signaling him ... */
continue;
}
/* Track the global minimum nextMsgNum */
if (n < min)
min = n;
/* Also see who's furthest back of the unsignaled backends */
if (n < minsig && !stateP->signaled)
{
minsig = n;
needSig = stateP;
}
}
segP->minMsgNum = min;
/*
* When minMsgNum gets really large, decrement all message counters so as
* to forestall overflow of the counters.
* to forestall overflow of the counters. This happens seldom enough
* that folding it into the previous loop would be a loser.
*/
if (min >= MSGNUMWRAPAROUND)
{
......@@ -475,13 +562,43 @@ SIDelExpiredDataEntries(bool locked)
segP->maxMsgNum -= MSGNUMWRAPAROUND;
for (i = 0; i < segP->lastBackend; i++)
{
if (segP->procState[i].nextMsgNum >= 0)
segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
/* we don't bother skipping inactive entries here */
segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
}
}
if (!locked)
LWLockRelease(SInvalLock);
/*
* Determine how many messages are still in the queue, and set the
* threshold at which we should repeat SICleanupQueue().
*/
numMsgs = segP->maxMsgNum - segP->minMsgNum;
if (numMsgs < CLEANUP_MIN)
segP->nextThreshold = CLEANUP_MIN;
else
segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
/*
* Lastly, signal anyone who needs a catchup interrupt. Since kill()
* might not be fast, we don't want to hold locks while executing it.
*/
if (needSig)
{
pid_t his_pid = needSig->procPid;
needSig->signaled = true;
LWLockRelease(SInvalReadLock);
LWLockRelease(SInvalWriteLock);
elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
kill(his_pid, SIGUSR1);
if (callerHasWriteLock)
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
}
else
{
LWLockRelease(SInvalReadLock);
if (!callerHasWriteLock)
LWLockRelease(SInvalWriteLock);
}
}
......
......@@ -80,7 +80,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.85 2008/06/19 00:46:05 alvherre Exp $
* $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.86 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -203,7 +203,7 @@ AddInvalidationMessage(InvalidationChunk **listHdr,
if (chunk == NULL)
{
/* First time through; create initial chunk */
#define FIRSTCHUNKSIZE 16
#define FIRSTCHUNKSIZE 32
chunk = (InvalidationChunk *)
MemoryContextAlloc(CurTransactionContext,
sizeof(InvalidationChunk) +
......@@ -275,6 +275,23 @@ AppendInvalidationMessageList(InvalidationChunk **destHdr,
} \
} while (0)
/*
* Process a list of invalidation messages group-wise.
*
* As above, but the code fragment can handle an array of messages.
* The fragment should refer to the messages as msgs[], with n entries.
*/
#define ProcessMessageListMulti(listHdr, codeFragment) \
do { \
InvalidationChunk *_chunk; \
for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \
{ \
SharedInvalidationMessage *msgs = _chunk->msgs; \
int n = _chunk->nitems; \
codeFragment; \
} \
} while (0)
/* ----------------------------------------------------------------
* Invalidation set support functions
......@@ -371,6 +388,18 @@ ProcessInvalidationMessages(InvalidationListHeader *hdr,
ProcessMessageList(hdr->rclist, func(msg));
}
/*
* As above, but the function is able to process an array of messages
* rather than just one at a time.
*/
static void
ProcessInvalidationMessagesMulti(InvalidationListHeader *hdr,
void (*func) (const SharedInvalidationMessage *msgs, int n))
{
ProcessMessageListMulti(hdr->cclist, func(msgs, n));
ProcessMessageListMulti(hdr->rclist, func(msgs, n));
}
/* ----------------------------------------------------------------
* private support functions
* ----------------------------------------------------------------
......@@ -792,7 +821,7 @@ inval_twophase_postcommit(TransactionId xid, uint16 info,
case TWOPHASE_INFO_MSG:
msg = (SharedInvalidationMessage *) recdata;
Assert(len == sizeof(SharedInvalidationMessage));
SendSharedInvalidMessage(msg);
SendSharedInvalidMessages(msg, 1);
break;
case TWOPHASE_INFO_FILE_BEFORE:
RelationCacheInitFileInvalidate(true);
......@@ -850,8 +879,8 @@ AtEOXact_Inval(bool isCommit)
AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
&transInvalInfo->CurrentCmdInvalidMsgs);
ProcessInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
SendSharedInvalidMessage);
ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs,
SendSharedInvalidMessages);
if (transInvalInfo->RelcacheInitFileInval)
RelationCacheInitFileInvalidate(false);
......@@ -1033,8 +1062,8 @@ EndNonTransactionalInvalidation(void)
/* Send out the invals */
ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
LocalExecuteInvalidationMessage);
ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
SendSharedInvalidMessage);
ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
SendSharedInvalidMessages);
/* Clean up and release memory */
for (chunk = transInvalInfo->CurrentCmdInvalidMsgs.cclist;
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.38 2008/01/01 19:45:59 momjian Exp $
* $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.39 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -43,7 +43,8 @@ typedef enum LWLockId
OidGenLock,
XidGenLock,
ProcArrayLock,
SInvalLock,
SInvalReadLock,
SInvalWriteLock,
FreeSpaceLock,
WALInsertLock,
WALWriteLock,
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.19 2008/01/01 19:45:59 momjian Exp $
* $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.20 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -23,7 +23,6 @@
typedef enum
{
PMSIGNAL_PASSWORD_CHANGE, /* pg_auth file has changed */
PMSIGNAL_WAKEN_CHILDREN, /* send a SIGUSR1 signal to all backends */
PMSIGNAL_WAKEN_ARCHIVER, /* send a NOTIFY signal to xlog archiver */
PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */
PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/storage/sinval.h,v 1.47 2008/03/16 19:47:34 alvherre Exp $
* $PostgreSQL: pgsql/src/include/storage/sinval.h,v 1.48 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -83,7 +83,8 @@ typedef union
} SharedInvalidationMessage;
extern void SendSharedInvalidMessage(SharedInvalidationMessage *msg);
extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs,
int n);
extern void ReceiveSharedInvalidMessages(
void (*invalFunction) (SharedInvalidationMessage *msg),
void (*resetFunction) (void));
......
/*-------------------------------------------------------------------------
*
* sinvaladt.h
* POSTGRES shared cache invalidation segment definitions.
* POSTGRES shared cache invalidation data manager.
*
* The shared cache invalidation manager is responsible for transmitting
* invalidation messages between backends. Any message sent by any backend
* must be delivered to all already-running backends before it can be
* forgotten.
* forgotten. (If we run out of space, we instead deliver a "RESET"
* message to backends that have fallen too far behind.)
*
* The struct type SharedInvalidationMessage, defining the contents of
* a single message, is defined in sinval.h.
......@@ -14,7 +15,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/storage/sinvaladt.h,v 1.47 2008/03/17 11:50:27 alvherre Exp $
* $PostgreSQL: pgsql/src/include/storage/sinvaladt.h,v 1.48 2008/06/19 21:32:56 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -23,7 +24,6 @@
#include "storage/sinval.h"
/*
* prototypes for functions in sinvaladt.c
*/
......@@ -31,9 +31,9 @@ extern Size SInvalShmemSize(void);
extern void CreateSharedInvalidationState(void);
extern void SharedInvalBackendInit(void);
extern bool SIInsertDataEntry(SharedInvalidationMessage *data);
extern int SIGetDataEntry(int backendId, SharedInvalidationMessage *data);
extern void SIDelExpiredDataEntries(bool locked);
extern void SIInsertDataEntries(const SharedInvalidationMessage *data, int n);
extern int SIGetDataEntries(SharedInvalidationMessage *data, int datasize);
extern void SICleanupQueue(bool callerHasWriteLock, int minFree);
extern LocalTransactionId GetNextLocalTransactionId(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment