/*------------------------------------------------------------------------- * * sinvaladt.c * POSTGRES shared cache invalidation segment definitions. * * Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.27 1999/09/24 00:24:35 tgl Exp $ * *------------------------------------------------------------------------- */ #include <signal.h> #include <unistd.h> #include "postgres.h" #include "miscadmin.h" #include "storage/backendid.h" #include "storage/lmgr.h" #include "storage/proc.h" #include "storage/sinval.h" #include "storage/sinvaladt.h" #include "utils/trace.h" SISeg *shmInvalBuffer; static void SISegmentAttach(IpcMemoryId shmid); static void SISegInit(SISeg *segP, int maxBackends); static void CleanupInvalidationState(int status, SISeg *segP); static void SISetProcStateInvalid(SISeg *segP); /* * SISegmentInit * Create a new SI memory segment, or attach to an existing one * * This is called with createNewSegment = true by the postmaster (or by * a standalone backend), and subsequently with createNewSegment = false * by backends started by the postmaster. * * Note: maxBackends param is only valid when createNewSegment is true */ int SISegmentInit(bool createNewSegment, IPCKey key, int maxBackends) { int segSize; IpcMemoryId shmId; if (createNewSegment) { /* Kill existing segment, if any */ IpcMemoryKill(key); /* Figure space needed. * Note sizeof(SISeg) includes the first ProcState entry. */ segSize = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1); /* Get a shared segment */ shmId = IpcMemoryCreate(key, segSize, IPCProtection); if (shmId < 0) { perror("SISegmentInit: segment create failed"); return -1; /* an error */ } /* Attach to the shared cache invalidation segment */ /* sets the global variable shmInvalBuffer */ SISegmentAttach(shmId); /* Init shared memory contents */ SISegInit(shmInvalBuffer, maxBackends); } else { /* find existing segment */ shmId = IpcMemoryIdGet(key, 0); if (shmId < 0) { perror("SISegmentInit: segment get failed"); return -1; /* an error */ } /* Attach to the shared cache invalidation segment */ /* sets the global variable shmInvalBuffer */ SISegmentAttach(shmId); } return 1; } /* * SISegmentAttach * Attach to specified shared memory segment */ static void SISegmentAttach(IpcMemoryId shmid) { shmInvalBuffer = (SISeg *) IpcMemoryAttach(shmid); if (shmInvalBuffer == IpcMemAttachFailed) { /* XXX use validity function */ elog(FATAL, "SISegmentAttach: Could not attach segment: %m"); } } /* * SISegInit * Initialize contents of a new shared memory sinval segment */ static void SISegInit(SISeg *segP, int maxBackends) { int i; /* Clear message counters, save size of procState array */ segP->minMsgNum = 0; segP->maxMsgNum = 0; segP->maxBackends = maxBackends; /* The buffer[] array is initially all unused, so we need not fill it */ /* Mark all backends inactive */ for (i = 0; i < maxBackends; i++) { segP->procState[i].nextMsgNum = -1; /* inactive */ segP->procState[i].resetState = false; segP->procState[i].tag = InvalidBackendTag; segP->procState[i].procStruct = INVALID_OFFSET; } } /* * SIBackendInit * Initialize a new backend to operate on the sinval buffer * * NB: this routine, and all following ones, must be executed with the * SInvalLock spinlock held, since there may be multiple backends trying * to access the buffer. */ int SIBackendInit(SISeg *segP) { Index index; ProcState *stateP = NULL; Assert(MyBackendTag > 0); /* Check for duplicate backend tags (should never happen) */ for (index = 0; index < segP->maxBackends; index++) { if (segP->procState[index].tag == MyBackendTag) elog(FATAL, "SIBackendInit: tag %d already in use", MyBackendTag); } /* Look for a free entry in the procState array */ for (index = 0; index < segP->maxBackends; index++) { if (segP->procState[index].tag == InvalidBackendTag) { stateP = &segP->procState[index]; break; } } /* elog() with spinlock held is probably not too cool, but this * condition should never happen anyway. */ if (stateP == NULL) { elog(NOTICE, "SIBackendInit: no free procState slot available"); MyBackendId = InvalidBackendTag; return 0; } MyBackendId = (stateP - &segP->procState[0]) + 1; #ifdef INVALIDDEBUG elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.", MyBackendTag, MyBackendId); #endif /* INVALIDDEBUG */ /* mark myself active, with all extant messages already read */ stateP->nextMsgNum = segP->maxMsgNum; stateP->resetState = false; stateP->tag = MyBackendTag; stateP->procStruct = MAKE_OFFSET(MyProc); /* register exit routine to mark my entry inactive at exit */ on_shmem_exit(CleanupInvalidationState, (caddr_t) segP); return 1; } /* * CleanupInvalidationState * Mark the current backend as no longer active. * * This function is called via on_shmem_exit() during backend shutdown, * so the caller has NOT acquired the lock for us. */ static void CleanupInvalidationState(int status, SISeg *segP) { Assert(PointerIsValid(segP)); SpinAcquire(SInvalLock); segP->procState[MyBackendId - 1].nextMsgNum = -1; segP->procState[MyBackendId - 1].resetState = false; segP->procState[MyBackendId - 1].tag = InvalidBackendTag; segP->procState[MyBackendId - 1].procStruct = INVALID_OFFSET; SpinRelease(SInvalLock); } /* * SIInsertDataEntry * Add a new invalidation message to the buffer. * * If we are unable to insert the message because the buffer is full, * then clear the buffer and assert the "reset" flag to each backend. * This will cause all the backends to discard *all* invalidatable state. * * Returns true for normal successful insertion, false if had to reset. */ bool SIInsertDataEntry(SISeg *segP, SharedInvalidData *data) { int numMsgs = segP->maxMsgNum - segP->minMsgNum; /* Is the buffer full? */ if (numMsgs >= MAXNUMMESSAGES) { /* * Don't panic just yet: slowest backend might have consumed some * messages but not yet have done SIDelExpiredDataEntries() to * advance minMsgNum. So, make sure minMsgNum is up-to-date. */ SIDelExpiredDataEntries(segP); numMsgs = segP->maxMsgNum - segP->minMsgNum; if (numMsgs >= MAXNUMMESSAGES) { /* Yup, it's definitely full, no choice but to reset */ SISetProcStateInvalid(segP); return false; } } /* * Try to prevent table overflow. When the table is 70% full send a * SIGUSR2 (ordinarily a NOTIFY signal) to the postmaster, which will * send it back to all the backends. This will force idle backends to * execute a transaction to look through pg_listener for NOTIFY messages, * and as a byproduct of the transaction start they will read SI entries. * * This should never happen if all the backends are actively executing * queries, but if a backend is sitting idle then it won't be starting * transactions and so won't be reading SI entries. * * dz - 27 Jan 1998 */ if (numMsgs == (MAXNUMMESSAGES * 70 / 100) && IsUnderPostmaster) { TPRINTF(TRACE_VERBOSE, "SIInsertDataEntry: table is 70%% full, signaling postmaster"); kill(getppid(), SIGUSR2); } /* * Insert new message into proper slot of circular buffer */ segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data; segP->maxMsgNum++; return true; } /* * SISetProcStateInvalid * Flush pending messages from buffer, assert reset flag for each backend * * This is used only to recover from SI buffer overflow. */ static void SISetProcStateInvalid(SISeg *segP) { int i; segP->minMsgNum = 0; segP->maxMsgNum = 0; for (i = 0; i < segP->maxBackends; i++) { if (segP->procState[i].nextMsgNum >= 0) /* active backend? */ { segP->procState[i].resetState = true; segP->procState[i].nextMsgNum = 0; } } } /* * SIGetDataEntry * get next SI message for specified backend, if there is one * * Possible return values: * 0: no SI message available * 1: next SI message has been extracted into *data * (there may be more messages available after this one!) * -1: SI reset message extracted */ int SIGetDataEntry(SISeg *segP, int backendId, SharedInvalidData *data) { ProcState *stateP = & segP->procState[backendId - 1]; Assert(stateP->tag == MyBackendTag); if (stateP->resetState) { /* Force reset. We can say we have dealt with any messages added * since the reset, as well... */ stateP->resetState = false; stateP->nextMsgNum = segP->maxMsgNum; return -1; } if (stateP->nextMsgNum >= segP->maxMsgNum) return 0; /* nothing to read */ /* * Retrieve message and advance my counter. */ *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; stateP->nextMsgNum++; /* There may be other backends that haven't read the message, * so we cannot delete it here. * SIDelExpiredDataEntries() should be called to remove dead messages. */ return 1; /* got a message */ } /* * SIDelExpiredDataEntries * Remove messages that have been consumed by all active backends */ void SIDelExpiredDataEntries(SISeg *segP) { int min, i, h; min = segP->maxMsgNum; if (min == segP->minMsgNum) return; /* fast path if no messages exist */ /* Recompute minMsgNum = minimum of all backends' nextMsgNum */ for (i = 0; i < segP->maxBackends; i++) { h = segP->procState[i].nextMsgNum; if (h >= 0) { /* backend active */ if (h < min) min = h; } } segP->minMsgNum = min; /* When minMsgNum gets really large, decrement all message counters * so as to forestall overflow of the counters. */ if (min >= MSGNUMWRAPAROUND) { segP->minMsgNum -= MSGNUMWRAPAROUND; segP->maxMsgNum -= MSGNUMWRAPAROUND; for (i = 0; i < segP->maxBackends; i++) { if (segP->procState[i].nextMsgNum >= 0) segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; } } }