sinvaladt.c 8.59 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * sinvaladt.c
4
 *	  POSTGRES shared cache invalidation segment definitions.
5
 *
6
 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.41 2001/09/29 04:02:24 tgl Exp $
12 13 14
 *
 *-------------------------------------------------------------------------
 */
15 16
#include "postgres.h"

17 18
#include <signal.h>
#include <unistd.h>
Marc G. Fournier's avatar
Marc G. Fournier committed
19

20
#include "miscadmin.h"
21
#include "storage/backendid.h"
22
#include "storage/proc.h"
23
#include "storage/sinvaladt.h"
24

25
SISeg	   *shmInvalBuffer;
26

27
static void CleanupInvalidationState(int status, Datum arg);
28
static void SISetProcStateInvalid(SISeg *segP);
29 30 31


/*
32
 * SInvalShmemSize --- return shared-memory space needed
33
 */
34 35
int
SInvalShmemSize(int maxBackends)
36
{
37 38 39 40 41
	/*
	 * Figure space needed. Note sizeof(SISeg) includes the first
	 * ProcState entry.
	 */
	return sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
42 43
}

44
/*
45 46
 * SIBufferInit
 *		Create and initialize a new SI message buffer
47
 */
48 49
void
SIBufferInit(int maxBackends)
50
{
51 52
	int			segSize;
	SISeg	   *segP;
53
	int			i;
54

55 56 57 58
	/* Allocate space in shared memory */
	segSize = SInvalShmemSize(maxBackends);
	shmInvalBuffer = segP = (SISeg *) ShmemAlloc(segSize);

59 60 61
	/* Clear message counters, save size of procState array */
	segP->minMsgNum = 0;
	segP->maxMsgNum = 0;
62
	segP->lastBackend = 0;
63
	segP->maxBackends = maxBackends;
64

65
	/* The buffer[] array is initially all unused, so we need not fill it */
66

67 68
	/* Mark all backends inactive */
	for (i = 0; i < maxBackends; i++)
69
	{
70
		segP->procState[i].nextMsgNum = -1;		/* inactive */
71
		segP->procState[i].resetState = false;
72
		segP->procState[i].procStruct = INVALID_OFFSET;
73
	}
74 75
}

76 77 78 79
/*
 * SIBackendInit
 *		Initialize a new backend to operate on the sinval buffer
 *
80 81 82 83 84
 * Returns:
 *	   >0	A-OK
 *		0	Failed to find a free procState slot (ie, MaxBackends exceeded)
 *	   <0	Some other failure (not currently used)
 *
85
 * NB: this routine, and all following ones, must be executed with the
86
 * SInvalLock lock held, since there may be multiple backends trying
87 88 89 90
 * to access the buffer.
 */
int
SIBackendInit(SISeg *segP)
91
{
92 93
	int			index;
	ProcState  *stateP = NULL;
94

95
	/* Look for a free entry in the procState array */
96
	for (index = 0; index < segP->lastBackend; index++)
97
	{
98
		if (segP->procState[index].nextMsgNum < 0)		/* inactive slot? */
99 100 101 102
		{
			stateP = &segP->procState[index];
			break;
		}
103
	}
104

105
	if (stateP == NULL)
106
	{
107 108 109 110 111 112 113 114
		if (segP->lastBackend < segP->maxBackends)
		{
			stateP = &segP->procState[segP->lastBackend];
			Assert(stateP->nextMsgNum < 0);
			segP->lastBackend++;
		}
		else
		{
115
			/* out of procState slots */
116 117 118
			MyBackendId = InvalidBackendId;
			return 0;
		}
119
	}
120

121
	MyBackendId = (stateP - &segP->procState[0]) + 1;
122

123
#ifdef	INVALIDDEBUG
124
	elog(DEBUG, "SIBackendInit: backend id %d", MyBackendId);
125
#endif	 /* INVALIDDEBUG */
126

127 128
	/* mark myself active, with all extant messages already read */
	stateP->nextMsgNum = segP->maxMsgNum;
129 130
	stateP->resetState = false;
	stateP->procStruct = MAKE_OFFSET(MyProc);
131

132
	/* register exit routine to mark my entry inactive at exit */
133
	on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
134

135
	return 1;
136 137
}

138 139 140 141
/*
 * CleanupInvalidationState
 *		Mark the current backend as no longer active.
 *
142 143
 * This function is called via on_shmem_exit() during backend shutdown,
 * so the caller has NOT acquired the lock for us.
144 145
 *
 * arg is really of type "SISeg*".
146
 */
147
static void
148
CleanupInvalidationState(int status, Datum arg)
149
{
150 151
	SISeg	   *segP = (SISeg *) DatumGetPointer(arg);
	int			i;
152

153
	Assert(PointerIsValid(segP));
154

155
	LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
156

157
	/* Mark myself inactive */
158 159
	segP->procState[MyBackendId - 1].nextMsgNum = -1;
	segP->procState[MyBackendId - 1].resetState = false;
160 161
	segP->procState[MyBackendId - 1].procStruct = INVALID_OFFSET;

162 163 164 165 166 167 168 169
	/* Recompute index of last active backend */
	for (i = segP->lastBackend; i > 0; i--)
	{
		if (segP->procState[i - 1].nextMsgNum >= 0)
			break;
	}
	segP->lastBackend = i;

170
	LWLockRelease(SInvalLock);
171
}
172

173 174 175 176 177 178 179 180 181 182
/*
 * SIInsertDataEntry
 *		Add a new invalidation message to the buffer.
 *
 * If we are unable to insert the message because the buffer is full,
 * then clear the buffer and assert the "reset" flag to each backend.
 * This will cause all the backends to discard *all* invalidatable state.
 *
 * Returns true for normal successful insertion, false if had to reset.
 */
183
bool
184
SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
185
{
186
	int			numMsgs = segP->maxMsgNum - segP->minMsgNum;
187

188 189
	/* Is the buffer full? */
	if (numMsgs >= MAXNUMMESSAGES)
190
	{
191 192 193 194 195 196 197 198 199 200 201 202 203
		/*
		 * Don't panic just yet: slowest backend might have consumed some
		 * messages but not yet have done SIDelExpiredDataEntries() to
		 * advance minMsgNum.  So, make sure minMsgNum is up-to-date.
		 */
		SIDelExpiredDataEntries(segP);
		numMsgs = segP->maxMsgNum - segP->minMsgNum;
		if (numMsgs >= MAXNUMMESSAGES)
		{
			/* Yup, it's definitely full, no choice but to reset */
			SISetProcStateInvalid(segP);
			return false;
		}
204 205
	}

206 207 208 209
	/*
	 * Try to prevent table overflow.  When the table is 70% full send a
	 * SIGUSR2 (ordinarily a NOTIFY signal) to the postmaster, which will
	 * send it back to all the backends.  This will force idle backends to
210 211 212
	 * execute a transaction to look through pg_listener for NOTIFY
	 * messages, and as a byproduct of the transaction start they will
	 * read SI entries.
213 214 215 216 217 218 219 220 221
	 *
	 * This should never happen if all the backends are actively executing
	 * queries, but if a backend is sitting idle then it won't be starting
	 * transactions and so won't be reading SI entries.
	 *
	 * dz - 27 Jan 1998
	 */
	if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
		IsUnderPostmaster)
222
	{
223 224
		if (DebugLvl >= 1)
			elog(DEBUG, "SIInsertDataEntry: table is 70%% full, signaling postmaster");
225
		kill(getppid(), SIGUSR2);
226
	}
227

228 229 230 231 232 233
	/*
	 * Insert new message into proper slot of circular buffer
	 */
	segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
	segP->maxMsgNum++;

234
	return true;
235 236
}

237 238 239 240 241 242 243
/*
 * SISetProcStateInvalid
 *		Flush pending messages from buffer, assert reset flag for each backend
 *
 * This is used only to recover from SI buffer overflow.
 */
static void
244
SISetProcStateInvalid(SISeg *segP)
245
{
246
	int			i;
247

248 249 250
	segP->minMsgNum = 0;
	segP->maxMsgNum = 0;

251
	for (i = 0; i < segP->lastBackend; i++)
252
	{
253
		if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
254 255
		{
			segP->procState[i].resetState = true;
256
			segP->procState[i].nextMsgNum = 0;
257 258
		}
	}
259 260
}

261 262 263 264 265 266 267 268 269
/*
 * SIGetDataEntry
 *		get next SI message for specified backend, if there is one
 *
 * Possible return values:
 *	0: no SI message available
 *	1: next SI message has been extracted into *data
 *		(there may be more messages available after this one!)
 * -1: SI reset message extracted
270 271 272 273
 *
 * NB: this can run in parallel with other instances of SIGetDataEntry
 * executing on behalf of other backends.  See comments in sinval.c in
 * ReceiveSharedInvalidMessages().
274
 */
275 276
int
SIGetDataEntry(SISeg *segP, int backendId,
277
			   SharedInvalidationMessage *data)
278
{
279
	ProcState  *stateP = &segP->procState[backendId - 1];
280

281
	if (stateP->resetState)
282
	{
283 284
		/*
		 * Force reset.  We can say we have dealt with any messages added
285 286 287 288
		 * since the reset, as well...
		 */
		stateP->resetState = false;
		stateP->nextMsgNum = segP->maxMsgNum;
289
		return -1;
290
	}
291

292
	if (stateP->nextMsgNum >= segP->maxMsgNum)
293 294
		return 0;				/* nothing to read */

295 296 297 298 299
	/*
	 * Retrieve message and advance my counter.
	 */
	*data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
	stateP->nextMsgNum++;
300

301 302 303 304
	/*
	 * There may be other backends that haven't read the message, so we
	 * cannot delete it here. SIDelExpiredDataEntries() should be called
	 * to remove dead messages.
305 306
	 */
	return 1;					/* got a message */
307 308
}

309 310 311 312
/*
 * SIDelExpiredDataEntries
 *		Remove messages that have been consumed by all active backends
 */
313
void
314
SIDelExpiredDataEntries(SISeg *segP)
315
{
316 317 318
	int			min,
				i,
				h;
319

320 321 322 323 324 325
	min = segP->maxMsgNum;
	if (min == segP->minMsgNum)
		return;					/* fast path if no messages exist */

	/* Recompute minMsgNum = minimum of all backends' nextMsgNum */

326
	for (i = 0; i < segP->lastBackend; i++)
327
	{
328
		h = segP->procState[i].nextMsgNum;
329 330 331 332 333 334
		if (h >= 0)
		{						/* backend active */
			if (h < min)
				min = h;
		}
	}
335
	segP->minMsgNum = min;
336

337 338 339
	/*
	 * When minMsgNum gets really large, decrement all message counters so
	 * as to forestall overflow of the counters.
340 341
	 */
	if (min >= MSGNUMWRAPAROUND)
342
	{
343 344
		segP->minMsgNum -= MSGNUMWRAPAROUND;
		segP->maxMsgNum -= MSGNUMWRAPAROUND;
345
		for (i = 0; i < segP->lastBackend; i++)
346
		{
347 348
			if (segP->procState[i].nextMsgNum >= 0)
				segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
349 350 351
		}
	}
}