sinval.c 15.3 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * sinval.c
4
 *	  POSTGRES shared cache invalidation communication code.
5
 *
6
 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
Jan Wieck's avatar
Jan Wieck committed
11
 *	  $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.48 2002/06/11 13:40:51 wieck Exp $
12 13 14
 *
 *-------------------------------------------------------------------------
 */
15
#include "postgres.h"
16

Marc G. Fournier's avatar
Marc G. Fournier committed
17
#include <sys/types.h>
Bruce Momjian's avatar
Bruce Momjian committed
18

19
#include "storage/proc.h"
20 21
#include "storage/sinval.h"
#include "storage/sinvaladt.h"
22
#include "utils/tqual.h"
23 24
#include "miscadmin.h"

25 26

/****************************************************************************/
27
/*	CreateSharedInvalidationState()		 Initialize SI buffer				*/
28 29
/*																			*/
/*	should be called only by the POSTMASTER									*/
30 31
/****************************************************************************/
void
32
CreateSharedInvalidationState(int maxBackends)
33
{
34
	/* SInvalLock must be initialized already, during LWLock init */
35
	SIBufferInit(maxBackends);
36 37
}

38
/*
39
 * InitBackendSharedInvalidationState
40 41
 *		Initialize new backend's state info in buffer segment.
 */
42
void
43
InitBackendSharedInvalidationState(void)
44
{
45
	int			flag;
46

47
	LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
48
	flag = SIBackendInit(shmInvalBuffer);
49
	LWLockRelease(SInvalLock);
50 51 52 53
	if (flag < 0)				/* unexpected problem */
		elog(FATAL, "Backend cache invalidation initialization failed");
	if (flag == 0)				/* expected problem: MaxBackends exceeded */
		elog(FATAL, "Sorry, too many clients already");
54 55 56
}

/*
57
 * SendSharedInvalidMessage
58
 *	Add a shared-cache-invalidation message to the global SI message queue.
59 60
 */
void
61
SendSharedInvalidMessage(SharedInvalidationMessage *msg)
62
{
63
	bool		insertOK;
64

65
	LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
66
	insertOK = SIInsertDataEntry(shmInvalBuffer, msg);
67
	LWLockRelease(SInvalLock);
68
	if (!insertOK)
69
		elog(DEBUG3, "SendSharedInvalidMessage: SI buffer overflow");
70 71 72
}

/*
73
 * ReceiveSharedInvalidMessages
74
 *		Process shared-cache-invalidation messages waiting for this backend
75 76
 */
void
77
ReceiveSharedInvalidMessages(
78 79
				  void (*invalFunction) (SharedInvalidationMessage *msg),
							 void (*resetFunction) (void))
80
{
81
	SharedInvalidationMessage data;
82 83
	int			getResult;
	bool		gotMessage = false;
84

85 86
	for (;;)
	{
87
		/*
88 89 90 91 92 93 94 95
		 * We can run SIGetDataEntry in parallel with other backends
		 * running SIGetDataEntry for themselves, since each instance will
		 * modify only fields of its own backend's ProcState, and no
		 * instance will look at fields of other backends' ProcStates.  We
		 * express this by grabbing SInvalLock in shared mode.	Note that
		 * this is not exactly the normal (read-only) interpretation of a
		 * shared lock! Look closely at the interactions before allowing
		 * SInvalLock to be grabbed in shared mode for any other reason!
96
		 *
97 98 99
		 * The routines later in this file that use shared mode are okay with
		 * this, because they aren't looking at the ProcState fields
		 * associated with SI message transfer; they only use the
Jan Wieck's avatar
Jan Wieck committed
100
		 * ProcState array as an easy way to find all the PGPROC structures.
101 102
		 */
		LWLockAcquire(SInvalLock, LW_SHARED);
103
		getResult = SIGetDataEntry(shmInvalBuffer, MyBackendId, &data);
104 105
		LWLockRelease(SInvalLock);

106 107 108 109 110
		if (getResult == 0)
			break;				/* nothing more to do */
		if (getResult < 0)
		{
			/* got a reset message */
111
			elog(DEBUG3, "ReceiveSharedInvalidMessages: cache state reset");
112 113 114 115 116
			resetFunction();
		}
		else
		{
			/* got a normal data message */
117
			invalFunction(&data);
118 119 120 121 122 123 124
		}
		gotMessage = true;
	}

	/* If we got any messages, try to release dead messages */
	if (gotMessage)
	{
125
		LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
126
		SIDelExpiredDataEntries(shmInvalBuffer);
127
		LWLockRelease(SInvalLock);
128
	}
129
}
130 131 132


/****************************************************************************/
Jan Wieck's avatar
Jan Wieck committed
133
/* Functions that need to scan the PGPROC structures of all running backends. */
134 135 136
/* It's a bit strange to keep these in sinval.c, since they don't have any	*/
/* direct relationship to shared-cache invalidation.  But the procState		*/
/* array in the SI segment is the only place in the system where we have	*/
137
/* an array of per-backend data, so it is the most convenient place to keep */
Jan Wieck's avatar
Jan Wieck committed
138
/* pointers to the backends' PGPROC structures.  We used to implement these	*/
139
/* functions with a slow, ugly search through the ShmemIndex hash table --- */
140 141 142 143 144 145 146
/* now they are simple loops over the SI ProcState array.					*/
/****************************************************************************/


/*
 * DatabaseHasActiveBackends -- are there any backends running in the given DB
 *
147 148 149
 * If 'ignoreMyself' is TRUE, ignore this particular backend while checking
 * for backends in the target database.
 *
150 151 152 153 154 155 156 157 158
 * This function is used to interlock DROP DATABASE against there being
 * any active backends in the target DB --- dropping the DB while active
 * backends remain would be a Bad Thing.  Note that we cannot detect here
 * the possibility of a newly-started backend that is trying to connect
 * to the doomed database, so additional interlocking is needed during
 * backend startup.
 */

bool
159
DatabaseHasActiveBackends(Oid databaseId, bool ignoreMyself)
160 161 162 163 164 165
{
	bool		result = false;
	SISeg	   *segP = shmInvalBuffer;
	ProcState  *stateP = segP->procState;
	int			index;

166
	LWLockAcquire(SInvalLock, LW_SHARED);
167

168
	for (index = 0; index < segP->lastBackend; index++)
169
	{
170
		SHMEM_OFFSET pOffset = stateP[index].procStruct;
171 172 173

		if (pOffset != INVALID_OFFSET)
		{
Jan Wieck's avatar
Jan Wieck committed
174
			PGPROC	   *proc = (PGPROC *) MAKE_PTR(pOffset);
175 176 177

			if (proc->databaseId == databaseId)
			{
178 179 180
				if (ignoreMyself && proc == MyProc)
					continue;

181 182 183 184 185 186
				result = true;
				break;
			}
		}
	}

187
	LWLockRelease(SInvalLock);
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202

	return result;
}

/*
 * TransactionIdIsInProgress -- is given transaction running by some backend
 */
bool
TransactionIdIsInProgress(TransactionId xid)
{
	bool		result = false;
	SISeg	   *segP = shmInvalBuffer;
	ProcState  *stateP = segP->procState;
	int			index;

203
	LWLockAcquire(SInvalLock, LW_SHARED);
204

205
	for (index = 0; index < segP->lastBackend; index++)
206
	{
207
		SHMEM_OFFSET pOffset = stateP[index].procStruct;
208 209 210

		if (pOffset != INVALID_OFFSET)
		{
Jan Wieck's avatar
Jan Wieck committed
211
			PGPROC	   *proc = (PGPROC *) MAKE_PTR(pOffset);
212

213 214
			/* Fetch xid just once - see GetNewTransactionId */
			TransactionId pxid = proc->xid;
215

216
			if (TransactionIdEquals(pxid, xid))
217 218 219 220 221 222 223
			{
				result = true;
				break;
			}
		}
	}

224
	LWLockRelease(SInvalLock);
225 226 227 228 229

	return result;
}

/*
230 231 232 233 234
 * GetOldestXmin -- returns oldest transaction that was running
 *					when any current transaction was started.
 *
 * If allDbs is TRUE then all backends are considered; if allDbs is FALSE
 * then only backends running in my own database are considered.
235
 *
236
 * This is used by VACUUM to decide which deleted tuples must be preserved
237
 * in a table.	allDbs = TRUE is needed for shared relations, but allDbs =
238 239 240 241
 * FALSE is sufficient for non-shared relations, since only backends in my
 * own database could ever see the tuples in them.
 *
 * Note: we include the currently running xids in the set of considered xids.
242 243
 * This ensures that if a just-started xact has not yet set its snapshot,
 * when it does set the snapshot it cannot set xmin less than what we compute.
244
 */
245 246
TransactionId
GetOldestXmin(bool allDbs)
247 248 249
{
	SISeg	   *segP = shmInvalBuffer;
	ProcState  *stateP = segP->procState;
250
	TransactionId result;
251 252
	int			index;

253
	result = GetCurrentTransactionId();
254

255
	LWLockAcquire(SInvalLock, LW_SHARED);
256

257
	for (index = 0; index < segP->lastBackend; index++)
258
	{
259
		SHMEM_OFFSET pOffset = stateP[index].procStruct;
260 261 262

		if (pOffset != INVALID_OFFSET)
		{
Jan Wieck's avatar
Jan Wieck committed
263
			PGPROC	   *proc = (PGPROC *) MAKE_PTR(pOffset);
264

265
			if (allDbs || proc->databaseId == MyDatabaseId)
266
			{
267 268 269
				/* Fetch xid just once - see GetNewTransactionId */
				TransactionId xid = proc->xid;

270
				if (TransactionIdIsNormal(xid))
271
				{
272 273
					if (TransactionIdPrecedes(xid, result))
						result = xid;
274 275 276 277 278
					xid = proc->xmin;
					if (TransactionIdIsNormal(xid))
						if (TransactionIdPrecedes(xid, result))
							result = xid;
				}
279
			}
280 281 282
		}
	}

283
	LWLockRelease(SInvalLock);
284

285
	return result;
286 287
}

288
/*----------
289
 * GetSnapshotData -- returns information about running transactions.
290 291 292 293 294 295 296 297 298 299
 *
 * The returned snapshot includes xmin (lowest still-running xact ID),
 * xmax (next xact ID to be assigned), and a list of running xact IDs
 * in the range xmin <= xid < xmax.  It is used as follows:
 *		All xact IDs < xmin are considered finished.
 *		All xact IDs >= xmax are considered still running.
 *		For an xact ID xmin <= xid < xmax, consult list to see whether
 *		it is considered running or not.
 * This ensures that the set of transactions seen as "running" by the
 * current xact will not change after it takes the snapshot.
300 301 302 303
 *
 * Also, we compute the current global xmin (oldest xmin across all running
 * transactions) and save it in RecentGlobalXmin.  This is the same
 * computation done by GetOldestXmin(TRUE).
304
 *----------
305 306 307 308 309 310 311
 */
Snapshot
GetSnapshotData(bool serializable)
{
	Snapshot	snapshot = (Snapshot) malloc(sizeof(SnapshotData));
	SISeg	   *segP = shmInvalBuffer;
	ProcState  *stateP = segP->procState;
312 313 314
	TransactionId xmin;
	TransactionId xmax;
	TransactionId globalxmin;
315 316 317
	int			index;
	int			count = 0;

318 319 320
	if (snapshot == NULL)
		elog(ERROR, "Memory exhausted in GetSnapshotData");

321
	/*
322 323 324
	 * Allocating space for MaxBackends xids is usually overkill;
	 * lastBackend would be sufficient.  But it seems better to do the
	 * malloc while not holding the lock, so we can't look at lastBackend.
325 326
	 */
	snapshot->xip = (TransactionId *)
327
		malloc(MaxBackends * sizeof(TransactionId));
328 329
	if (snapshot->xip == NULL)
		elog(ERROR, "Memory exhausted in GetSnapshotData");
330

331
	globalxmin = xmin = GetCurrentTransactionId();
332 333 334 335 336 337 338

	/*
	 * If we are going to set MyProc->xmin then we'd better get exclusive
	 * lock; if not, this is a read-only operation so it can be shared.
	 */
	LWLockAcquire(SInvalLock, serializable ? LW_EXCLUSIVE : LW_SHARED);

339 340 341
	/*--------------------
	 * Unfortunately, we have to call ReadNewTransactionId() after acquiring
	 * SInvalLock above.  It's not good because ReadNewTransactionId() does
342
	 * LWLockAcquire(XidGenLock), but *necessary*.	We need to be sure that
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
	 * no transactions exit the set of currently-running transactions
	 * between the time we fetch xmax and the time we finish building our
	 * snapshot.  Otherwise we could have a situation like this:
	 *
	 *		1. Tx Old is running (in Read Committed mode).
	 *		2. Tx S reads new transaction ID into xmax, then
	 *		   is swapped out before acquiring SInvalLock.
	 *		3. Tx New gets new transaction ID (>= S' xmax),
	 *		   makes changes and commits.
	 *		4. Tx Old changes some row R changed by Tx New and commits.
	 *		5. Tx S finishes getting its snapshot data.  It sees Tx Old as
	 *		   done, but sees Tx New as still running (since New >= xmax).
	 *
	 * Now S will see R changed by both Tx Old and Tx New, *but* does not
	 * see other changes made by Tx New.  If S is supposed to be in
	 * Serializable mode, this is wrong.
	 *
	 * By locking SInvalLock before we read xmax, we ensure that TX Old
	 * cannot exit the set of running transactions seen by Tx S.  Therefore
	 * both Old and New will be seen as still running => no inconsistency.
	 *--------------------
364
	 */
365

366
	xmax = ReadNewTransactionId();
367

368
	for (index = 0; index < segP->lastBackend; index++)
369
	{
370
		SHMEM_OFFSET pOffset = stateP[index].procStruct;
371 372 373

		if (pOffset != INVALID_OFFSET)
		{
Jan Wieck's avatar
Jan Wieck committed
374
			PGPROC	   *proc = (PGPROC *) MAKE_PTR(pOffset);
375

376
			/* Fetch xid just once - see GetNewTransactionId */
377
			TransactionId xid = proc->xid;
378

379
			/*
380
			 * Ignore my own proc (dealt with my xid above), procs not
381 382 383
			 * running a transaction, and xacts started since we read the
			 * next transaction ID.  There's no need to store XIDs above
			 * what we got from ReadNewTransactionId, since we'll treat
384 385 386
			 * them as running anyway.  We also assume that such xacts can't
			 * compute an xmin older than ours, so they needn't be considered
			 * in computing globalxmin.
387
			 */
388
			if (proc == MyProc ||
389
				!TransactionIdIsNormal(xid) ||
390
				TransactionIdFollowsOrEquals(xid, xmax))
391
				continue;
392

393 394
			if (TransactionIdPrecedes(xid, xmin))
				xmin = xid;
395 396
			snapshot->xip[count] = xid;
			count++;
397 398 399 400 401 402

			/* Update globalxmin to be the smallest valid xmin */
			xid = proc->xmin;
			if (TransactionIdIsNormal(xid))
				if (TransactionIdPrecedes(xid, globalxmin))
					globalxmin = xid;
403 404 405 406
		}
	}

	if (serializable)
407
		MyProc->xmin = xmin;
408

409
	LWLockRelease(SInvalLock);
410

411
	/* Serializable snapshot must be computed before any other... */
412
	Assert(TransactionIdIsValid(MyProc->xmin));
413

414 415 416 417 418 419 420 421 422 423 424 425
	/*
	 * Update globalxmin to include actual process xids.  This is a slightly
	 * different way of computing it than GetOldestXmin uses, but should give
	 * the same result.
	 */
	if (TransactionIdPrecedes(xmin, globalxmin))
		globalxmin = xmin;

	RecentGlobalXmin = globalxmin;

	snapshot->xmin = xmin;
	snapshot->xmax = xmax;
426
	snapshot->xcnt = count;
427 428 429

	snapshot->curcid = GetCurrentCommandId();

430 431
	return snapshot;
}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
432 433

/*
434 435 436 437 438 439 440 441
 * CountActiveBackends --- count backends (other than myself) that are in
 *		active transactions.  This is used as a heuristic to decide if
 *		a pre-XLOG-flush delay is worthwhile during commit.
 *
 * An active transaction is something that has written at least one XLOG
 * record; read-only transactions don't count.  Also, do not count backends
 * that are blocked waiting for locks, since they are not going to get to
 * run until someone else commits.
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
442
 */
443 444 445 446 447 448 449
int
CountActiveBackends(void)
{
	SISeg	   *segP = shmInvalBuffer;
	ProcState  *stateP = segP->procState;
	int			count = 0;
	int			index;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
450

451 452 453 454 455 456 457 458 459 460 461 462
	/*
	 * Note: for speed, we don't acquire SInvalLock.  This is a little bit
	 * bogus, but since we are only testing xrecoff for zero or nonzero,
	 * it should be OK.  The result is only used for heuristic purposes
	 * anyway...
	 */
	for (index = 0; index < segP->lastBackend; index++)
	{
		SHMEM_OFFSET pOffset = stateP[index].procStruct;

		if (pOffset != INVALID_OFFSET)
		{
Jan Wieck's avatar
Jan Wieck committed
463
			PGPROC	   *proc = (PGPROC *) MAKE_PTR(pOffset);
464 465 466 467 468 469 470 471 472 473 474 475 476 477 478

			if (proc == MyProc)
				continue;		/* do not count myself */
			if (proc->logRec.xrecoff == 0)
				continue;		/* do not count if not in a transaction */
			if (proc->waitLock != NULL)
				continue;		/* do not count if blocked on a lock */
			count++;
		}
	}

	return count;
}

/*
Jan Wieck's avatar
Jan Wieck committed
479
 * GetUndoRecPtr -- returns oldest PGPROC->logRec.
480
 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
481 482 483 484 485 486 487 488 489
XLogRecPtr
GetUndoRecPtr(void)
{
	SISeg	   *segP = shmInvalBuffer;
	ProcState  *stateP = segP->procState;
	XLogRecPtr	urec = {0, 0};
	XLogRecPtr	tempr;
	int			index;

490
	LWLockAcquire(SInvalLock, LW_SHARED);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
491

492
	for (index = 0; index < segP->lastBackend; index++)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
493 494 495 496 497
	{
		SHMEM_OFFSET pOffset = stateP[index].procStruct;

		if (pOffset != INVALID_OFFSET)
		{
Jan Wieck's avatar
Jan Wieck committed
498
			PGPROC	   *proc = (PGPROC *) MAKE_PTR(pOffset);
499

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
500 501 502 503 504 505 506 507 508
			tempr = proc->logRec;
			if (tempr.xrecoff == 0)
				continue;
			if (urec.xrecoff != 0 && XLByteLT(urec, tempr))
				continue;
			urec = tempr;
		}
	}

509
	LWLockRelease(SInvalLock);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
510

511
	return (urec);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
512
}
513 514

/*
Jan Wieck's avatar
Jan Wieck committed
515
 * BackendIdGetProc - given a BackendId, find its PGPROC structure
516 517 518 519 520
 *
 * This is a trivial lookup in the ProcState array.  We assume that the caller
 * knows that the backend isn't going to go away, so we do not bother with
 * locking.
 */
Jan Wieck's avatar
Jan Wieck committed
521
struct PGPROC *
522 523 524 525 526 527 528 529 530 531 532
BackendIdGetProc(BackendId procId)
{
	SISeg	   *segP = shmInvalBuffer;

	if (procId > 0 && procId <= segP->lastBackend)
	{
		ProcState  *stateP = &segP->procState[procId - 1];
		SHMEM_OFFSET pOffset = stateP->procStruct;

		if (pOffset != INVALID_OFFSET)
		{
Jan Wieck's avatar
Jan Wieck committed
533
			PGPROC	   *proc = (PGPROC *) MAKE_PTR(pOffset);
534 535 536 537 538 539 540

			return proc;
		}
	}

	return NULL;
}