Commit 22fe3d4b authored by Tom Lane's avatar Tom Lane

I finally understood what sinvaladt.c is doing --- and it

offended my aesthestic sensibility that there was so much unreadable code
doing so little.  Rewritten code is about half the size, faster, and
(I hope) much more intelligible.
parent cc8b67a5
......@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.17 1999/09/04 18:36:45 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.18 1999/09/06 19:37:38 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -21,12 +21,6 @@
#include "storage/sinval.h"
#include "storage/sinvaladt.h"
extern SISeg *shmInvalBuffer; /* the shared buffer segment, set by
* SISegmentAttach()
*/
extern BackendId MyBackendId;
extern BackendTag MyBackendTag;
SPINLOCK SInvalLock = (SPINLOCK) NULL;
/****************************************************************************/
......@@ -39,11 +33,6 @@ CreateSharedInvalidationState(IPCKey key, int maxBackends)
{
int status;
/*
* REMOVED SISyncKill(IPCKeyGetSIBufferMemorySemaphoreKey(key));
* SISyncInit(IPCKeyGetSIBufferMemorySemaphoreKey(key));
*/
/* SInvalLock gets set in spin.c, during spinlock init */
status = SISegmentInit(true, IPCKeyGetSIBufferMemoryBlock(key),
maxBackends);
......@@ -53,9 +42,9 @@ CreateSharedInvalidationState(IPCKey key, int maxBackends)
}
/****************************************************************************/
/* AttachSharedInvalidationState(key) Attach a buffer segment */
/* AttachSharedInvalidationState(key) Attach to existing buffer segment */
/* */
/* should be called only by the POSTMASTER */
/* should be called by each backend during startup */
/****************************************************************************/
void
AttachSharedInvalidationState(IPCKey key)
......@@ -74,6 +63,11 @@ AttachSharedInvalidationState(IPCKey key)
elog(FATAL, "AttachSharedInvalidationState: failed segment init");
}
/*
* InitSharedInvalidationState
* Initialize new backend's state info in buffer segment.
* Must be called after AttachSharedInvalidationState().
*/
void
InitSharedInvalidationState(void)
{
......@@ -88,24 +82,19 @@ InitSharedInvalidationState(void)
/*
* RegisterSharedInvalid
* Returns a new local cache invalidation state containing a new entry.
* Add a shared-cache-invalidation message to the global SI message queue.
*
* Note:
* Assumes hash index is valid.
* Assumes item pointer is valid.
*/
/****************************************************************************/
/* RegisterSharedInvalid(cacheId, hashIndex, pointer) */
/* */
/* register a message in the buffer */
/* should be called by a backend */
/****************************************************************************/
void
RegisterSharedInvalid(int cacheId, /* XXX */
Index hashIndex,
ItemPointer pointer)
{
SharedInvalidData newInvalid;
SharedInvalidData newInvalid;
bool insertOK;
/*
* This code has been hacked to accept two types of messages. This
......@@ -127,34 +116,16 @@ RegisterSharedInvalid(int cacheId, /* XXX */
ItemPointerSetInvalid(&newInvalid.pointerData);
SpinAcquire(SInvalLock);
while (!SISetDataEntry(shmInvalBuffer, &newInvalid))
{
/* buffer full */
/* release a message, mark process cache states to be invalid */
SISetProcStateInvalid(shmInvalBuffer);
if (!SIDelDataEntries(shmInvalBuffer, 1))
{
/* inconsistent buffer state -- shd never happen */
SpinRelease(SInvalLock);
elog(FATAL, "RegisterSharedInvalid: inconsistent buffer state");
}
/* loop around to try write again */
}
insertOK = SIInsertDataEntry(shmInvalBuffer, &newInvalid);
SpinRelease(SInvalLock);
if (! insertOK)
elog(NOTICE, "RegisterSharedInvalid: SI buffer overflow");
}
/*
* InvalidateSharedInvalid
* Processes all entries in a shared cache invalidation state.
* Process shared-cache-invalidation messages waiting for this backend
*/
/****************************************************************************/
/* InvalidateSharedInvalid(invalFunction, resetFunction) */
/* */
/* invalidate a message in the buffer (read and clean up) */
/* should be called by a backend */
/****************************************************************************/
void
InvalidateSharedInvalid(void (*invalFunction) (),
void (*resetFunction) ())
......
......@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.24 1999/09/04 18:36:45 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.25 1999/09/06 19:37:38 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -16,648 +16,313 @@
#include "postgres.h"
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/lmgr.h"
#include "storage/sinvaladt.h"
#include "utils/trace.h"
/* ----------------
* global variable notes
*
* SharedInvalidationSemaphore
*
* shmInvalBuffer
* the shared buffer segment, set by SISegmentAttach()
*
* MyBackendId
* might be removed later, used only for
* debugging in debug routines (end of file)
*
* SIDbId
* identification of buffer (disappears)
*
* SIRelId \
* SIDummyOid \ identification of buffer
* SIXidData /
* SIXid /
*
* XXX This file really needs to be cleaned up. We switched to using
* spinlocks to protect critical sections (as opposed to using fake
* relations and going through the lock manager) and some of the old
* cruft was 'ifdef'ed out, while other parts (now unused) are still
* compiled into the system. -mer 5/24/92
* ----------------
*/
#ifdef HAS_TEST_AND_SET
int SharedInvalidationLockId;
#else
IpcSemaphoreId SharedInvalidationSemaphore;
#endif
SISeg *shmInvalBuffer;
extern BackendId MyBackendId;
static void CleanupInvalidationState(int status, SISeg *segInOutP);
static BackendId SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag);
static int SIGetNumEntries(SISeg *segP);
static void SISegmentAttach(IpcMemoryId shmid);
static void SISegInit(SISeg *segP, int maxBackends);
static void CleanupInvalidationState(int status, SISeg *segP);
static void SISetProcStateInvalid(SISeg *segP);
/************************************************************************/
/* SISetActiveProcess(segP, backendId) set the backend status active */
/* should be called only by the postmaster when creating a backend */
/************************************************************************/
/* XXX I suspect that the segP parameter is extraneous. -hirohama */
static void
SISetActiveProcess(SISeg *segInOutP, BackendId backendId)
{
/* mark all messages as read */
/* Assert(segP->procState[backendId - 1].tag == MyBackendTag); */
segInOutP->procState[backendId - 1].resetState = false;
segInOutP->procState[backendId - 1].limit = SIGetNumEntries(segInOutP);
}
/****************************************************************************/
/* SIBackendInit() initializes a backend to operate on the buffer */
/****************************************************************************/
/*
* SISegmentInit
* Create a new SI memory segment, or attach to an existing one
*
* This is called with createNewSegment = true by the postmaster (or by
* a standalone backend), and subsequently with createNewSegment = false
* by backends started by the postmaster.
*
* Note: maxBackends param is only valid when createNewSegment is true
*/
int
SIBackendInit(SISeg *segInOutP)
SISegmentInit(bool createNewSegment, IPCKey key, int maxBackends)
{
LockRelId LtCreateRelId();
TransactionId LMITransactionIdCopy();
Assert(MyBackendTag > 0);
MyBackendId = SIAssignBackendId(segInOutP, MyBackendTag);
if (MyBackendId == InvalidBackendTag)
return 0;
#ifdef INVALIDDEBUG
elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.",
MyBackendTag, MyBackendId);
#endif /* INVALIDDEBUG */
int segSize;
IpcMemoryId shmId;
SISetActiveProcess(segInOutP, MyBackendId);
on_shmem_exit(CleanupInvalidationState, (caddr_t) segInOutP);
return 1;
}
if (createNewSegment)
{
/* Kill existing segment, if any */
IpcMemoryKill(key);
/* ----------------
* SIAssignBackendId
* ----------------
*/
static BackendId
SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag)
{
Index index;
ProcState *stateP = NULL;
/* Figure space needed.
* Note sizeof(SISeg) includes the first ProcState entry.
*/
segSize = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
for (index = 0; index < segInOutP->maxBackends; index++)
{
if (segInOutP->procState[index].tag == InvalidBackendTag ||
segInOutP->procState[index].tag == backendTag)
/* Get a shared segment */
shmId = IpcMemoryCreate(key, segSize, IPCProtection);
if (shmId < 0)
{
stateP = &segInOutP->procState[index];
break;
perror("SISegmentInit: segment create failed");
return -1; /* an error */
}
if (!PointerIsValid(stateP) ||
(segInOutP->procState[index].resetState &&
(!stateP->resetState ||
stateP->tag < backendTag)) ||
(!stateP->resetState &&
(segInOutP->procState[index].limit <
stateP->limit ||
stateP->tag < backendTag)))
stateP = &segInOutP->procState[index];
}
/* verify that all "procState" entries checked for matching tags */
/* Attach to the shared cache invalidation segment */
/* sets the global variable shmInvalBuffer */
SISegmentAttach(shmId);
for (index++; index < segInOutP->maxBackends; index++)
{
if (segInOutP->procState[index].tag == backendTag)
elog(FATAL, "SIAssignBackendId: tag %d found twice", backendTag);
/* Init shared memory contents */
SISegInit(shmInvalBuffer, maxBackends);
}
Assert(stateP);
if (stateP->tag != InvalidBackendTag)
else
{
if (stateP->tag == backendTag)
elog(NOTICE, "SIAssignBackendId: reusing tag %d", backendTag);
else
/* find existing segment */
shmId = IpcMemoryIdGet(key, 0);
if (shmId < 0)
{
elog(NOTICE, "SIAssignBackendId: discarding tag %d", stateP->tag);
return InvalidBackendTag;
perror("SISegmentInit: segment get failed");
return -1; /* an error */
}
}
stateP->tag = backendTag;
return 1 + stateP - &segInOutP->procState[0];
}
/************************************************************************/
/* The following function should be called only by the postmaster !! */
/************************************************************************/
/************************************************************************/
/* SISetDeadProcess(segP, backendId) set the backend status DEAD */
/* should be called only by the postmaster when a backend died */
/************************************************************************/
static void
SISetDeadProcess(SISeg *segP, int backendId)
{
/* XXX call me.... */
segP->procState[backendId - 1].resetState = false;
segP->procState[backendId - 1].limit = -1;
segP->procState[backendId - 1].tag = InvalidBackendTag;
/* Attach to the shared cache invalidation segment */
/* sets the global variable shmInvalBuffer */
SISegmentAttach(shmId);
}
return 1;
}
/*
* CleanupInvalidationState
* Note:
* This is a temporary hack. ExitBackend should call this instead
* of exit (via on_shmem_exit).
* SISegmentAttach
* Attach to specified shared memory segment
*/
static void
CleanupInvalidationState(int status, /* XXX */
SISeg *segInOutP) /* XXX style */
{
Assert(PointerIsValid(segInOutP));
SISetDeadProcess(segInOutP, MyBackendId);
}
/************************************************************************/
/* SIComputeSize() - compute size and offsets for SI segment */
/************************************************************************/
static void
SIComputeSize(SISegOffsets *oP, int maxBackends)
{
int A,
B,
a,
b,
totalSize;
A = 0;
/* sizeof(SISeg) includes the first ProcState entry */
a = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
a = MAXALIGN(a); /* offset to first data entry */
b = sizeof(SISegEntry) * MAXNUMMESSAGES;
B = A + a + b;
B = MAXALIGN(B);
totalSize = B - A;
oP->startSegment = A;
oP->offsetToFirstEntry = a; /* relative to A */
oP->offsetToEndOfSegment = totalSize; /* relative to A */
}
/************************************************************************/
/* SISetStartEntrySection(segP, offset) - sets the offset */
/************************************************************************/
static void
SISetStartEntrySection(SISeg *segP, Offset offset)
{
segP->startEntrySection = offset;
}
/************************************************************************/
/* SIGetStartEntrySection(segP) - returnss the offset */
/************************************************************************/
static Offset
SIGetStartEntrySection(SISeg *segP)
SISegmentAttach(IpcMemoryId shmid)
{
return segP->startEntrySection;
}
shmInvalBuffer = (SISeg *) IpcMemoryAttach(shmid);
/************************************************************************/
/* SISetEndEntrySection(segP, offset) - sets the offset */
/************************************************************************/
static void
SISetEndEntrySection(SISeg *segP, Offset offset)
{
segP->endEntrySection = offset;
if (shmInvalBuffer == IpcMemAttachFailed)
{
/* XXX use validity function */
elog(FATAL, "SISegmentAttach: Could not attach segment: %m");
}
}
/************************************************************************/
/* SISetEndEntryChain(segP, offset) - sets the offset */
/************************************************************************/
/*
* SISegInit
* Initialize contents of a new shared memory sinval segment
*/
static void
SISetEndEntryChain(SISeg *segP, Offset offset)
SISegInit(SISeg *segP, int maxBackends)
{
segP->endEntryChain = offset;
}
/************************************************************************/
/* SIGetEndEntryChain(segP) - returnss the offset */
/************************************************************************/
static Offset
SIGetEndEntryChain(SISeg *segP)
{
return segP->endEntryChain;
}
int i;
/************************************************************************/
/* SISetStartEntryChain(segP, offset) - sets the offset */
/************************************************************************/
static void
SISetStartEntryChain(SISeg *segP, Offset offset)
{
segP->startEntryChain = offset;
}
/* Clear message counters, save size of procState array */
segP->minMsgNum = 0;
segP->maxMsgNum = 0;
segP->maxBackends = maxBackends;
/************************************************************************/
/* SIGetStartEntryChain(segP) - returns the offset */
/************************************************************************/
static Offset
SIGetStartEntryChain(SISeg *segP)
{
return segP->startEntryChain;
}
/* The buffer[] array is initially all unused, so we need not fill it */
/************************************************************************/
/* SISetNumEntries(segP, num) sets the current nuber of entries */
/************************************************************************/
static bool
SISetNumEntries(SISeg *segP, int num)
{
if (num <= MAXNUMMESSAGES)
{
segP->numEntries = num;
return true;
}
else
/* Mark all backends inactive */
for (i = 0; i < maxBackends; i++)
{
return false; /* table full */
segP->procState[i].nextMsgNum = -1; /* inactive */
segP->procState[i].resetState = false;
segP->procState[i].tag = InvalidBackendTag;
}
}
/************************************************************************/
/* SIGetNumEntries(segP) - returns the current nuber of entries */
/************************************************************************/
static int
SIGetNumEntries(SISeg *segP)
{
return segP->numEntries;
}
/************************************************************************/
/* SISetMaxNumEntries(segP, num) sets the maximal number of entries */
/************************************************************************/
static bool
SISetMaxNumEntries(SISeg *segP, int num)
/*
* SIBackendInit
* Initialize a new backend to operate on the sinval buffer
*
* NB: this routine, and all following ones, must be executed with the
* SInvalLock spinlock held, since there may be multiple backends trying
* to access the buffer.
*/
int
SIBackendInit(SISeg *segP)
{
if (num <= MAXNUMMESSAGES)
{
segP->maxNumEntries = num;
return true;
}
else
{
return false; /* wrong number */
}
}
/************************************************************************/
/* SIGetProcStateLimit(segP, i) returns the limit of read messages */
/************************************************************************/
#define SIGetProcStateLimit(segP,i) \
((segP)->procState[i].limit)
Index index;
ProcState *stateP = NULL;
/************************************************************************/
/* SIIncNumEntries(segP, num) increments the current nuber of entries */
/************************************************************************/
static bool
SIIncNumEntries(SISeg *segP, int num)
{
Assert(MyBackendTag > 0);
/*
* Try to prevent table overflow. When the table is 70% full send a
* SIGUSR2 to the postmaster which will send it back to all the
* backends. This will be handled by Async_NotifyHandler() with a
* StartTransactionCommand() which will flush unread SI entries for
* each backend. dz - 27 Jan 1998
*/
if (segP->numEntries == (MAXNUMMESSAGES * 70 / 100))
/* Check for duplicate backend tags (should never happen) */
for (index = 0; index < segP->maxBackends; index++)
{
TPRINTF(TRACE_VERBOSE,
"SIIncNumEntries: table is 70%% full, signaling postmaster");
kill(getppid(), SIGUSR2);
if (segP->procState[index].tag == MyBackendTag)
elog(FATAL, "SIBackendInit: tag %d already in use", MyBackendTag);
}
if ((segP->numEntries + num) <= MAXNUMMESSAGES)
{
segP->numEntries = segP->numEntries + num;
return true;
}
else
/* Look for a free entry in the procState array */
for (index = 0; index < segP->maxBackends; index++)
{
return false; /* table full */
if (segP->procState[index].tag == InvalidBackendTag)
{
stateP = &segP->procState[index];
break;
}
}
}
/************************************************************************/
/* SIDecNumEntries(segP, num) decrements the current nuber of entries */
/************************************************************************/
static bool
SIDecNumEntries(SISeg *segP, int num)
{
if ((segP->numEntries - num) >= 0)
{
segP->numEntries = segP->numEntries - num;
return true;
}
else
/* elog() with spinlock held is probably not too cool, but these
* conditions should never happen anyway.
*/
if (stateP == NULL)
{
return false; /* not enough entries in table */
elog(NOTICE, "SIBackendInit: no free procState slot available");
MyBackendId = InvalidBackendTag;
return 0;
}
}
/************************************************************************/
/* SISetStartFreeSpace(segP, offset) - sets the offset */
/************************************************************************/
static void
SISetStartFreeSpace(SISeg *segP, Offset offset)
{
segP->startFreeSpace = offset;
}
/************************************************************************/
/* SIGetStartFreeSpace(segP) - returns the offset */
/************************************************************************/
static Offset
SIGetStartFreeSpace(SISeg *segP)
{
return segP->startFreeSpace;
}
MyBackendId = (stateP - &segP->procState[0]) + 1;
#ifdef INVALIDDEBUG
elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.",
MyBackendTag, MyBackendId);
#endif /* INVALIDDEBUG */
/* mark myself active, with all extant messages already read */
stateP->tag = MyBackendTag;
stateP->resetState = false;
stateP->nextMsgNum = segP->maxMsgNum;
/************************************************************************/
/* SIGetFirstDataEntry(segP) returns first data entry */
/************************************************************************/
static SISegEntry *
SIGetFirstDataEntry(SISeg *segP)
{
SISegEntry *eP;
Offset startChain;
startChain = SIGetStartEntryChain(segP);
if (startChain == InvalidOffset)
return NULL;
eP = (SISegEntry *) ((Pointer) segP +
SIGetStartEntrySection(segP) +
startChain);
return eP;
}
/************************************************************************/
/* SIGetLastDataEntry(segP) returns last data entry in the chain */
/************************************************************************/
static SISegEntry *
SIGetLastDataEntry(SISeg *segP)
{
SISegEntry *eP;
Offset endChain;
endChain = SIGetEndEntryChain(segP);
if (endChain == InvalidOffset)
return NULL;
eP = (SISegEntry *) ((Pointer) segP +
SIGetStartEntrySection(segP) +
endChain);
return eP;
}
/************************************************************************/
/* SIGetNextDataEntry(segP, offset) returns next data entry */
/************************************************************************/
#define SIGetNextDataEntry(segP,offset) \
(((offset) == InvalidOffset) ? (SISegEntry *) NULL : \
(SISegEntry *) ((Pointer) (segP) + \
(segP)->startEntrySection + \
(Offset) (offset)))
/************************************************************************/
/* SIGetNthDataEntry(segP, n) returns the n-th data entry in chain */
/************************************************************************/
static SISegEntry *
SIGetNthDataEntry(SISeg *segP,
int n) /* must range from 1 to MaxMessages */
{
SISegEntry *eP;
int i;
if (n <= 0)
return NULL;
eP = SIGetFirstDataEntry(segP);
for (i = 1; i < n; i++)
{
/* skip one and get the next */
eP = SIGetNextDataEntry(segP, eP->next);
}
return eP;
}
/************************************************************************/
/* SIEntryOffset(segP, entryP) returns the offset for an pointer */
/************************************************************************/
static Offset
SIEntryOffset(SISeg *segP, SISegEntry *entryP)
{
/* relative to B !! */
return ((Offset) ((Pointer) entryP -
(Pointer) segP -
SIGetStartEntrySection(segP)));
}
/* register exit routine to mark my entry inactive at exit */
on_shmem_exit(CleanupInvalidationState, (caddr_t) segP);
/************************************************************************/
/* SISetDataEntry(segP, data) - sets a message in the segemnt */
/************************************************************************/
bool
SISetDataEntry(SISeg *segP, SharedInvalidData *data)
{
Offset offsetToNewData;
SISegEntry *eP,
*lastP;
if (!SIIncNumEntries(segP, 1))
return false; /* no space */
/* get a free entry */
offsetToNewData = SIGetStartFreeSpace(segP);
eP = SIGetNextDataEntry(segP, offsetToNewData); /* it's a free one */
SISetStartFreeSpace(segP, eP->next);
/* fill it up */
eP->entryData = *data;
eP->isfree = false;
eP->next = InvalidOffset;
/* handle insertion point at the end of the chain !! */
lastP = SIGetLastDataEntry(segP);
if (lastP == NULL)
{
/* there is no chain, insert the first entry */
SISetStartEntryChain(segP, SIEntryOffset(segP, eP));
}
else
{
/* there is a last entry in the chain */
lastP->next = SIEntryOffset(segP, eP);
}
SISetEndEntryChain(segP, SIEntryOffset(segP, eP));
return true;
return 1;
}
/************************************************************************/
/* SIDecProcLimit(segP, num) decrements all process limits */
/************************************************************************/
/*
* CleanupInvalidationState
* Mark the current backend as no longer active.
*
* This function is called via on_shmem_exit() during backend shutdown.
*/
static void
SIDecProcLimit(SISeg *segP, int num)
CleanupInvalidationState(int status,
SISeg *segP)
{
int i;
Assert(PointerIsValid(segP));
for (i = 0; i < segP->maxBackends; i++)
{
/* decrement only, if there is a limit > 0 */
if (segP->procState[i].limit > 0)
{
segP->procState[i].limit = segP->procState[i].limit - num;
if (segP->procState[i].limit < 0)
{
/* limit was not high enough, reset to zero */
/* negative means it's a dead backend */
segP->procState[i].limit = 0;
}
}
}
}
/* XXX we probably oughta grab the SInval spinlock for this...
* but I think it is safe not to.
*/
segP->procState[MyBackendId - 1].nextMsgNum = -1;
segP->procState[MyBackendId - 1].resetState = false;
segP->procState[MyBackendId - 1].tag = InvalidBackendTag;
}
/************************************************************************/
/* SIDelDataEntries(segP, n) - free the FIRST n entries */
/************************************************************************/
/*
* SIInsertDataEntry
* Add a new invalidation message to the buffer.
*
* If we are unable to insert the message because the buffer is full,
* then clear the buffer and assert the "reset" flag to each backend.
* This will cause all the backends to discard *all* invalidatable state.
*
* Returns true for normal successful insertion, false if had to reset.
*/
bool
SIDelDataEntries(SISeg *segP, int n)
SIInsertDataEntry(SISeg *segP, SharedInvalidData *data)
{
int i;
if (n <= 0)
return false;
int numMsgs = segP->maxMsgNum - segP->minMsgNum;
if (!SIDecNumEntries(segP, n))
/* Is the buffer full? */
if (numMsgs >= MAXNUMMESSAGES)
{
/* not that many entries in buffer */
/* Yes, so force reset */
SISetProcStateInvalid(segP);
return false;
}
for (i = 1; i <= n; i++)
/*
* Try to prevent table overflow. When the table is 70% full send a
* SIGUSR2 (ordinarily a NOTIFY signal) to the postmaster, which will
* send it back to all the backends. This will force idle backends to
* execute a transaction to look through pg_listener for NOTIFY messages,
* and as a byproduct of the transaction start they will read SI entries.
*
* This should never happen if all the backends are actively executing
* queries, but if a backend is sitting idle then it won't be starting
* transactions and so won't be reading SI entries.
*
* dz - 27 Jan 1998
*/
if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
IsUnderPostmaster)
{
SISegEntry *e1P = SIGetFirstDataEntry(segP);
SISetStartEntryChain(segP, e1P->next);
if (SIGetStartEntryChain(segP) == InvalidOffset)
{
/* it was the last entry */
SISetEndEntryChain(segP, InvalidOffset);
}
/* free the entry */
e1P->isfree = true;
e1P->next = SIGetStartFreeSpace(segP);
SISetStartFreeSpace(segP, SIEntryOffset(segP, e1P));
TPRINTF(TRACE_VERBOSE,
"SIInsertDataEntry: table is 70%% full, signaling postmaster");
kill(getppid(), SIGUSR2);
}
SIDecProcLimit(segP, n);
/*
* Insert new message into proper slot of circular buffer
*/
segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
segP->maxMsgNum++;
return true;
}
/************************************************************************/
/* SISetProcStateInvalid(segP) checks and marks a backends state as */
/* invalid */
/************************************************************************/
void
/*
* SISetProcStateInvalid
* Flush pending messages from buffer, assert reset flag for each backend
*
* This is used only to recover from SI buffer overflow.
*/
static void
SISetProcStateInvalid(SISeg *segP)
{
int i;
segP->minMsgNum = 0;
segP->maxMsgNum = 0;
for (i = 0; i < segP->maxBackends; i++)
{
if (segP->procState[i].limit == 0)
if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
{
/* backend i didn't read any message */
segP->procState[i].resetState = true;
/*
* XXX signal backend that it has to reset its internal cache
* ?
*/
segP->procState[i].nextMsgNum = 0;
}
}
}
/************************************************************************/
/* SIGetDataEntry(segP, backendId, data) */
/* get next SI message for specified backend, if there is one */
/* */
/* Possible return values: */
/* 0: no SI message available */
/* 1: next SI message has been extracted into *data */
/* (there may be more messages available after this one!) */
/* -1: SI reset message extracted */
/************************************************************************/
/*
* SIGetDataEntry
* get next SI message for specified backend, if there is one
*
* Possible return values:
* 0: no SI message available
* 1: next SI message has been extracted into *data
* (there may be more messages available after this one!)
* -1: SI reset message extracted
*/
int
SIGetDataEntry(SISeg *segP, int backendId,
SharedInvalidData *data)
{
SISegEntry *msg;
ProcState *stateP = & segP->procState[backendId - 1];
Assert(segP->procState[backendId - 1].tag == MyBackendTag);
Assert(stateP->tag == MyBackendTag);
if (segP->procState[backendId - 1].resetState)
if (stateP->resetState)
{
/* new valid state--mark all messages "read" */
segP->procState[backendId - 1].resetState = false;
segP->procState[backendId - 1].limit = SIGetNumEntries(segP);
/* Force reset. We can say we have dealt with any messages added
* since the reset, as well...
*/
stateP->resetState = false;
stateP->nextMsgNum = segP->maxMsgNum;
return -1;
}
/* Get next message for this backend, if any */
/* This is fairly inefficient if there are many messages,
* but normally there should not be...
*/
msg = SIGetNthDataEntry(segP,
SIGetProcStateLimit(segP, backendId - 1) + 1);
if (msg == NULL)
if (stateP->nextMsgNum >= segP->maxMsgNum)
return 0; /* nothing to read */
*data = msg->entryData; /* return contents of message */
segP->procState[backendId - 1].limit++; /* one more message read */
/*
* Retrieve message and advance my counter.
*/
*data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
stateP->nextMsgNum++;
/* There may be other backends that haven't read the message,
* so we cannot delete it here.
......@@ -666,9 +331,10 @@ SIGetDataEntry(SISeg *segP, int backendId,
return 1; /* got a message */
}
/************************************************************************/
/* SIDelExpiredDataEntries (segP) - removes irrelevant messages */
/************************************************************************/
/*
* SIDelExpiredDataEntries
* Remove messages that have been consumed by all active backends
*/
void
SIDelExpiredDataEntries(SISeg *segP)
{
......@@ -676,161 +342,34 @@ SIDelExpiredDataEntries(SISeg *segP)
i,
h;
min = 9999999;
min = segP->maxMsgNum;
if (min == segP->minMsgNum)
return; /* fast path if no messages exist */
/* Recompute minMsgNum = minimum of all backends' nextMsgNum */
for (i = 0; i < segP->maxBackends; i++)
{
h = SIGetProcStateLimit(segP, i);
h = segP->procState[i].nextMsgNum;
if (h >= 0)
{ /* backend active */
if (h < min)
min = h;
}
}
if (min < 9999999 && min > 0)
{
/* we can remove min messages */
/* this adjusts also the state limits! */
if (!SIDelDataEntries(segP, min))
elog(FATAL, "SIDelExpiredDataEntries: Invalid segment state");
}
}
/************************************************************************/
/* SISegInit(segP) - initializes the segment */
/************************************************************************/
static void
SISegInit(SISeg *segP, SISegOffsets *oP, int maxBackends)
{
int i;
SISegEntry *eP;
/* set semaphore ids in the segment */
/* XXX */
SISetStartEntrySection(segP, oP->offsetToFirstEntry);
SISetEndEntrySection(segP, oP->offsetToEndOfSegment);
SISetStartFreeSpace(segP, 0);
SISetStartEntryChain(segP, InvalidOffset);
SISetEndEntryChain(segP, InvalidOffset);
SISetNumEntries(segP, 0);
SISetMaxNumEntries(segP, MAXNUMMESSAGES);
segP->maxBackends = maxBackends;
for (i = 0; i < segP->maxBackends; i++)
{
segP->procState[i].limit = -1; /* no backend active !! */
segP->procState[i].resetState = false;
segP->procState[i].tag = InvalidBackendTag;
}
/* construct a chain of free entries */
for (i = 1; i < MAXNUMMESSAGES; i++)
{
eP = (SISegEntry *) ((Pointer) segP +
SIGetStartEntrySection(segP) +
(i - 1) * sizeof(SISegEntry));
eP->isfree = true;
eP->next = i * sizeof(SISegEntry); /* relative to B */
}
/* handle the last free entry separate */
eP = (SISegEntry *) ((Pointer) segP +
SIGetStartEntrySection(segP) +
(MAXNUMMESSAGES - 1) * sizeof(SISegEntry));
eP->isfree = true;
eP->next = InvalidOffset; /* it's the end of the chain !! */
}
/************************************************************************/
/* SISegmentKill(key) - kill any segment */
/************************************************************************/
static void
SISegmentKill(int key) /* the corresponding key for the segment */
{
IpcMemoryKill(key);
}
/************************************************************************/
/* SISegmentGet(key, size) - get a shared segment of size <size> */
/* returns a segment id */
/************************************************************************/
static IpcMemoryId
SISegmentGet(int key, /* the corresponding key for the segment */
int size, /* size of segment in bytes */
bool create)
{
IpcMemoryId shmid;
if (create)
shmid = IpcMemoryCreate(key, size, IPCProtection);
else
shmid = IpcMemoryIdGet(key, size);
return shmid;
}
/************************************************************************/
/* SISegmentAttach(shmid) - attach a shared segment with id shmid */
/************************************************************************/
static void
SISegmentAttach(IpcMemoryId shmid)
{
shmInvalBuffer = (struct SISeg *) IpcMemoryAttach(shmid);
if (shmInvalBuffer == IpcMemAttachFailed)
{
/* XXX use validity function */
elog(FATAL, "SISegmentAttach: Could not attach segment: %m");
}
}
/************************************************************************/
/* SISegmentInit() initialize SI segment */
/* */
/* NB: maxBackends param is only valid when killExistingSegment is true */
/************************************************************************/
int
SISegmentInit(bool killExistingSegment, IPCKey key, int maxBackends)
{
SISegOffsets offsets;
IpcMemoryId shmId;
bool create;
if (killExistingSegment)
{
/* Kill existing segment */
/* set semaphore */
SISegmentKill(key);
/* Get a shared segment */
SIComputeSize(&offsets, maxBackends);
create = true;
shmId = SISegmentGet(key, offsets.offsetToEndOfSegment, create);
if (shmId < 0)
{
perror("SISegmentGet: failed");
return -1; /* an error */
}
segP->minMsgNum = min;
/* Attach the shared cache invalidation segment */
/* sets the global variable shmInvalBuffer */
SISegmentAttach(shmId);
/* Init shared memory table */
SISegInit(shmInvalBuffer, &offsets, maxBackends);
}
else
/* When minMsgNum gets really large, decrement all message counters
* so as to forestall overflow of the counters.
*/
if (min >= MSGNUMWRAPAROUND)
{
/* use an existing segment */
create = false;
shmId = SISegmentGet(key, 0, create);
if (shmId < 0)
segP->minMsgNum -= MSGNUMWRAPAROUND;
segP->maxMsgNum -= MSGNUMWRAPAROUND;
for (i = 0; i < segP->maxBackends; i++)
{
perror("SISegmentGet: getting an existent segment failed");
return -1; /* an error */
if (segP->procState[i].nextMsgNum >= 0)
segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
}
/* Attach the shared cache invalidation segment */
SISegmentAttach(shmId);
}
return 1;
}
......@@ -6,16 +6,16 @@
*
* Copyright (c) 1994, Regents of the University of California
*
* $Id: lock.h,v 1.33 1999/07/16 17:07:38 momjian Exp $
* $Id: lock.h,v 1.34 1999/09/06 19:37:37 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#ifndef LOCK_H_
#define LOCK_H_
#include "storage/ipc.h"
#include "storage/itemptr.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
#include "utils/array.h"
extern SPINLOCK LockMgrLock;
......
......@@ -6,7 +6,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
* $Id: sinvaladt.h,v 1.17 1999/09/04 18:36:44 tgl Exp $
* $Id: sinvaladt.h,v 1.18 1999/09/06 19:37:37 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -17,120 +17,106 @@
#include "storage/itemptr.h"
/*
* The structure of the shared cache invaidation segment
* The shared cache invalidation manager is responsible for transmitting
* invalidation messages between backends. Any message sent by any backend
* must be delivered to all already-running backends before it can be
* forgotten.
*
* Conceptually, the messages are stored in an infinite array, where
* maxMsgNum is the next array subscript to store a submitted message in,
* minMsgNum is the smallest array subscript containing a message not yet
* read by all backends, and we always have maxMsgNum >= minMsgNum. (They
* are equal when there are no messages pending.) For each active backend,
* there is a nextMsgNum pointer indicating the next message it needs to read;
* we have maxMsgNum >= nextMsgNum >= minMsgNum for every backend.
*
* In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
* entries. We translate MsgNum values into circular-buffer indexes by
* computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
* MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
* doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
* in the buffer. If the buffer does overflow, we reset it to empty and
* force each backend to "reset", ie, discard all its invalidatable state.
*
* We would have problems if the MsgNum values overflow an integer, so
* whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
* from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
* large so that we don't need to do this often. It must be a multiple of
* MAXNUMMESSAGES so that the existing circular-buffer entries don't need
* to be moved when we do it.
*/
/*
A------------- Header info --------------
criticalSectionSemaphoreId
generalSemaphoreId
startEntrySection (offset a)
endEntrySection (offset a + b)
startFreeSpace (offset relative to B)
startEntryChain (offset relatiev to B)
endEntryChain (offset relative to B)
numEntries
maxNumEntries
maxBackends
procState[maxBackends] --> limit
resetState (bool)
a tag (POSTID)
B------------- Start entry section -------
SISegEntry --> entryData --> ... (see SharedInvalidData!)
isfree (bool)
next (offset to next entry in chain )
b .... (dynamically growing down)
C----------------End shared segment -------
*/
/* Parameters (configurable) *******************************************/
#define MAXNUMMESSAGES 4000 /* maximum number of messages in seg */
/*
* Configurable parameters.
*
* MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
* Must be a power of 2 for speed.
*
* MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
* Must be a multiple of MAXNUMMESSAGES. Should be large.
*/
#define MAXNUMMESSAGES 4096
#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
#define InvalidOffset 1000000000 /* a invalid offset (End of
* chain) */
/* The content of one shared-invalidation message */
typedef struct SharedInvalidData
{
int cacheId; /* XXX */
Index hashIndex;
ItemPointerData pointerData;
} SharedInvalidData;
typedef SharedInvalidData *SharedInvalid;
/* Per-backend state in shared invalidation structure */
typedef struct ProcState
{
int limit; /* the number of read messages */
/* nextMsgNum is -1 in an inactive ProcState array entry. */
int nextMsgNum; /* next message number to read, or -1 */
bool resetState; /* true, if backend has to reset its state */
int tag; /* special tag, recieved from the
* postmaster */
int tag; /* backend tag received from postmaster */
} ProcState;
/* Shared cache invalidation memory segment */
typedef struct SISeg
{
IpcSemaphoreId criticalSectionSemaphoreId; /* semaphore id */
IpcSemaphoreId generalSemaphoreId; /* semaphore id */
Offset startEntrySection; /* (offset a) */
Offset endEntrySection;/* (offset a + b) */
Offset startFreeSpace; /* (offset relative to B) */
Offset startEntryChain;/* (offset relative to B) */
Offset endEntryChain; /* (offset relative to B) */
int numEntries;
int maxNumEntries;
/*
* General state information
*/
int minMsgNum; /* oldest message still needed */
int maxMsgNum; /* next message number to be assigned */
int maxBackends; /* size of procState array */
/*
* Circular buffer holding shared-inval messages
*/
SharedInvalidData buffer[MAXNUMMESSAGES];
/*
* Per-backend state info.
*
* We declare procState as 1 entry because C wants a fixed-size array,
* but actually it is maxBackends entries long.
*/
ProcState procState[1]; /* reflects the invalidation state */
/*
* The entry section begins after the end of the procState array.
* Everything there is controlled by offsets.
*/
} SISeg;
typedef struct SharedInvalidData
{
int cacheId; /* XXX */
Index hashIndex;
ItemPointerData pointerData;
} SharedInvalidData;
typedef SharedInvalidData *SharedInvalid;
typedef struct SISegEntry
{
SharedInvalidData entryData;/* the message data */
bool isfree; /* entry free? */
Offset next; /* offset to next entry */
} SISegEntry;
typedef struct SISegOffsets
{
Offset startSegment; /* always 0 (for now) */
Offset offsetToFirstEntry; /* A + a = B */
Offset offsetToEndOfSegment; /* A + a + b */
} SISegOffsets;
/****************************************************************************/
/* synchronization of the shared buffer access */
/* access to the buffer is synchronized by the lock manager !! */
/****************************************************************************/
#define SI_LockStartValue 255
#define SI_SharedLock (-1)
#define SI_ExclusiveLock (-255)
extern SISeg *shmInvalBuffer; /* pointer to the shared buffer segment,
* set by SISegmentAttach()
*/
extern SISeg *shmInvalBuffer;
/*
* prototypes for functions in sinvaladt.c
*/
extern int SIBackendInit(SISeg *segInOutP);
extern int SISegmentInit(bool killExistingSegment, IPCKey key,
extern int SISegmentInit(bool createNewSegment, IPCKey key,
int maxBackends);
extern int SIBackendInit(SISeg *segP);
extern bool SISetDataEntry(SISeg *segP, SharedInvalidData *data);
extern void SISetProcStateInvalid(SISeg *segP);
extern bool SIInsertDataEntry(SISeg *segP, SharedInvalidData *data);
extern int SIGetDataEntry(SISeg *segP, int backendId,
SharedInvalidData *data);
extern bool SIDelDataEntries(SISeg *segP, int n);
extern void SIDelExpiredDataEntries(SISeg *segP);
#endif /* SINVALADT_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment