Commit 0f2e7948 authored by Hiroshi Inoue's avatar Hiroshi Inoue

Improve cache invalidation handling. Eespecially

this would fix TODO
* elog() flushes cache, try invalidating just entries from
  current xact, perhaps using invalidation cache
parent 57709359
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.62 1999/12/21 00:06:40 wieck Exp $ * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.63 2000/01/10 06:30:50 inoue Exp $
* *
* *
* INTERFACE ROUTINES * INTERFACE ROUTINES
...@@ -1262,7 +1262,7 @@ heap_insert(Relation relation, HeapTuple tup) ...@@ -1262,7 +1262,7 @@ heap_insert(Relation relation, HeapTuple tup)
RelationPutHeapTupleAtEnd(relation, tup); RelationPutHeapTupleAtEnd(relation, tup);
if (IsSystemRelationName(RelationGetRelationName(relation))) if (IsSystemRelationName(RelationGetRelationName(relation)))
RelationInvalidateHeapTuple(relation, tup); RelationMark4RollbackHeapTuple(relation, tup);
return tup->t_data->t_oid; return tup->t_data->t_oid;
} }
...@@ -1473,6 +1473,8 @@ l2: ...@@ -1473,6 +1473,8 @@ l2:
RelationPutHeapTupleAtEnd(relation, newtup); RelationPutHeapTupleAtEnd(relation, newtup);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
} }
/* mark for rollback caches */
RelationMark4RollbackHeapTuple(relation, newtup);
/* /*
* New item in place, now record address of new tuple in t_ctid of old * New item in place, now record address of new tuple in t_ctid of old
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.57 2000/01/05 18:23:44 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.58 2000/01/10 06:30:50 inoue Exp $
* *
* NOTES * NOTES
* Transaction aborts can now occur two ways: * Transaction aborts can now occur two ways:
...@@ -165,6 +165,7 @@ static void AtAbort_Cache(void); ...@@ -165,6 +165,7 @@ static void AtAbort_Cache(void);
static void AtAbort_Locks(void); static void AtAbort_Locks(void);
static void AtAbort_Memory(void); static void AtAbort_Memory(void);
static void AtCommit_Cache(void); static void AtCommit_Cache(void);
static void AtCommit_LocalCache(void);
static void AtCommit_Locks(void); static void AtCommit_Locks(void);
static void AtCommit_Memory(void); static void AtCommit_Memory(void);
static void AtStart_Cache(void); static void AtStart_Cache(void);
...@@ -512,8 +513,11 @@ CommandCounterIncrement() ...@@ -512,8 +513,11 @@ CommandCounterIncrement()
CurrentTransactionStateData.scanCommandId = CurrentTransactionStateData.commandId; CurrentTransactionStateData.scanCommandId = CurrentTransactionStateData.commandId;
/* make cache changes visible to me */ /*
AtCommit_Cache(); * make cache changes visible to me. AtCommit_LocalCache()
* instead of AtCommit_Cache() is called here.
*/
AtCommit_LocalCache();
AtStart_Cache(); AtStart_Cache();
} }
...@@ -663,15 +667,26 @@ static void ...@@ -663,15 +667,26 @@ static void
AtCommit_Cache() AtCommit_Cache()
{ {
/* ---------------- /* ----------------
* Make catalog changes visible to me for the next command. * Make catalog changes visible to all backend.
* Other backends will not process my invalidation messages until
* after I commit and free my locks--though they will do
* unnecessary work if I abort.
* ---------------- * ----------------
*/ */
RegisterInvalid(true); RegisterInvalid(true);
} }
/* --------------------------------
* AtCommit_LocalCache
* --------------------------------
*/
static void
AtCommit_LocalCache()
{
/* ----------------
* Make catalog changes visible to me for the next command.
* ----------------
*/
ImmediateLocalInvalidation(true);
}
/* -------------------------------- /* --------------------------------
* AtCommit_Locks * AtCommit_Locks
* -------------------------------- * --------------------------------
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.60 1999/11/16 04:13:56 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.61 2000/01/10 06:30:51 inoue Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "utils/inval.h" /* ImmediateSharedRelationCacheInvalidate() */
#undef DIAGNOSTIC #undef DIAGNOSTIC
...@@ -203,6 +204,15 @@ mdunlink(Relation reln) ...@@ -203,6 +204,15 @@ mdunlink(Relation reln)
*/ */
if (reln->rd_unlinked && reln->rd_fd < 0) if (reln->rd_unlinked && reln->rd_fd < 0)
return SM_SUCCESS; return SM_SUCCESS;
/*
* This call isn't good for independency of md stuff,but
* mdunlink() unlinks the base file immediately and couldn't
* be rollbacked in case of abort. We must guarantee all
* backends' relation cache invalidation here.
* This would be unnecessary if unlinking is postponed
* till end of transaction.
*/
ImmediateSharedRelationCacheInvalidate(reln);
/* /*
* Force all segments of the relation to be opened, so that we * Force all segments of the relation to be opened, so that we
* won't miss deleting any of them. * won't miss deleting any of them.
...@@ -779,6 +789,7 @@ mdtruncate(Relation reln, int nblocks) ...@@ -779,6 +789,7 @@ mdtruncate(Relation reln, int nblocks)
#ifndef LET_OS_MANAGE_FILESIZE #ifndef LET_OS_MANAGE_FILESIZE
MemoryContext oldcxt; MemoryContext oldcxt;
int priorblocks; int priorblocks;
bool invalregistered = false;
#endif #endif
/* NOTE: mdnblocks makes sure we have opened all existing segments, /* NOTE: mdnblocks makes sure we have opened all existing segments,
...@@ -810,6 +821,20 @@ mdtruncate(Relation reln, int nblocks) ...@@ -810,6 +821,20 @@ mdtruncate(Relation reln, int nblocks)
* a big file... * a big file...
*/ */
FileTruncate(v->mdfd_vfd, 0); FileTruncate(v->mdfd_vfd, 0);
/*
* To call ImmediateSharedRelationCacheInvalidate() here
* isn't good for independency of md stuff,but smgrunlink()
* removes the base file immediately and couldn't be
* rollbacked in case of abort. We must guarantee
* all backends' relation cache invalidation here.
* This would be unnecessary if the truncation is postponed
* till end of transaction.
*/
if (!invalregistered)
{
ImmediateSharedRelationCacheInvalidate(reln);
invalregistered = true;
}
FileUnlink(v->mdfd_vfd); FileUnlink(v->mdfd_vfd);
v = v->mdfd_chain; v = v->mdfd_chain;
Assert(ov != &Md_fdvec[fd]); /* we never drop the 1st segment */ Assert(ov != &Md_fdvec[fd]); /* we never drop the 1st segment */
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/utils/cache/inval.c,v 1.30 1999/11/21 01:58:22 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/utils/cache/inval.c,v 1.31 2000/01/10 06:30:53 inoue Exp $
* *
* Note - this code is real crufty... * Note - this code is real crufty...
* *
...@@ -79,13 +79,39 @@ typedef InvalidationMessageData *InvalidationMessage; ...@@ -79,13 +79,39 @@ typedef InvalidationMessageData *InvalidationMessage;
* variables and macros * variables and macros
* ---------------- * ----------------
*/ */
static LocalInvalid Invalid = EmptyLocalInvalid; /* head of linked list */
/*
* ----------------
* Invalidation info was devided into three parts.
* 1) shared invalidation to be registerd for all backends
* 2) local invalidation for the transaction itself
* 3) rollback information for the transaction itself
* ----------------
*/
/*
* head of invalidation linked list for all backends
* eaten by AtCommit_Cache() in CommitTransaction()
*/
static LocalInvalid InvalidForall = EmptyLocalInvalid;
/*
* head of invalidation linked list for the backend itself
* eaten by AtCommit_LocalCache() in CommandCounterIncrement()
*/
static LocalInvalid InvalidLocal = EmptyLocalInvalid;
/*
* head of rollback linked list for the backend itself
* eaten by AtAbort_Cache() in AbortTransaction()
*/
static LocalInvalid RollbackStack = EmptyLocalInvalid;
static InvalidationEntry InvalidationEntryAllocate(uint16 size); static InvalidationEntry InvalidationEntryAllocate(uint16 size);
static void LocalInvalidInvalidate(LocalInvalid invalid, void (*function) ()); static void LocalInvalidInvalidate(LocalInvalid invalid, void (*function) (), bool freemember);
static LocalInvalid LocalInvalidRegister(LocalInvalid invalid, static LocalInvalid LocalInvalidRegister(LocalInvalid invalid,
InvalidationEntry entry); InvalidationEntry entry);
static void DiscardInvalidStack(LocalInvalid *invalid);
static void InvalidationMessageRegisterSharedInvalid(InvalidationMessage message);
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
...@@ -130,11 +156,11 @@ LocalInvalidRegister(LocalInvalid invalid, ...@@ -130,11 +156,11 @@ LocalInvalidRegister(LocalInvalid invalid,
/* -------------------------------- /* --------------------------------
* LocalInvalidInvalidate * LocalInvalidInvalidate
* Processes, then frees all entries in a local cache * Processes, then frees all entries in a local cache
* invalidation list. * invalidation list unless freemember parameter is false.
* -------------------------------- * --------------------------------
*/ */
static void static void
LocalInvalidInvalidate(LocalInvalid invalid, void (*function) ()) LocalInvalidInvalidate(LocalInvalid invalid, void (*function) (), bool freemember)
{ {
InvalidationEntryData *entryDataP; InvalidationEntryData *entryDataP;
...@@ -148,6 +174,8 @@ LocalInvalidInvalidate(LocalInvalid invalid, void (*function) ()) ...@@ -148,6 +174,8 @@ LocalInvalidInvalidate(LocalInvalid invalid, void (*function) ())
invalid = (Pointer) entryDataP->nextP; invalid = (Pointer) entryDataP->nextP;
if (!freemember)
continue;
/* help catch errors */ /* help catch errors */
entryDataP->nextP = (InvalidationUserData *) NULL; entryDataP->nextP = (InvalidationUserData *) NULL;
...@@ -155,27 +183,147 @@ LocalInvalidInvalidate(LocalInvalid invalid, void (*function) ()) ...@@ -155,27 +183,147 @@ LocalInvalidInvalidate(LocalInvalid invalid, void (*function) ())
} }
} }
static void
DiscardInvalidStack(LocalInvalid *invalid)
{
LocalInvalid locinv;
locinv = *invalid;
*invalid = EmptyLocalInvalid;
if (locinv)
LocalInvalidInvalidate(locinv, (void (*)()) NULL, true);
}
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* private support functions * private support functions
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
/* -------------------------------- /* --------------------------------
* CacheIdRegisterLocalInvalid * CacheIdRegister.......
* RelationIdRegister....
* -------------------------------- * --------------------------------
*/ */
#ifdef INVALIDDEBUG #ifdef INVALIDDEBUG
#define CacheIdRegisterSpecifiedLocalInvalid_DEBUG1 \
elog(DEBUG, "CacheIdRegisterSpecifiedLocalInvalid(%d, %d, [%d, %d])", \
cacheId, hashIndex, ItemPointerGetBlockNumber(pointer), \
ItemPointerGetOffsetNumber(pointer))
#define CacheIdRegisterLocalInvalid_DEBUG1 \ #define CacheIdRegisterLocalInvalid_DEBUG1 \
elog(DEBUG, "CacheIdRegisterLocalInvalid(%d, %d, [%d, %d])", \ elog(DEBUG, "CacheIdRegisterLocalInvalid(%d, %d, [%d, %d])", \
cacheId, hashIndex, ItemPointerGetBlockNumber(pointer), \ cacheId, hashIndex, ItemPointerGetBlockNumber(pointer), \
ItemPointerGetOffsetNumber(pointer)) ItemPointerGetOffsetNumber(pointer))
#define CacheIdRegisterLocalRollback_DEBUG1 \
elog(DEBUG, "CacheIdRegisterLocalRollback(%d, %d, [%d, %d])", \
cacheId, hashIndex, ItemPointerGetBlockNumber(pointer), \
ItemPointerGetOffsetNumber(pointer))
#define CacheIdImmediateRegisterSharedInvalid_DEBUG1 \
elog(DEBUG, "CacheIdImmediateRegisterSharedInvalid(%d, %d, [%d, %d])", \
cacheId, hashIndex, ItemPointerGetBlockNumber(pointer), \
ItemPointerGetOffsetNumber(pointer))
#else #else
#define CacheIdRegisterSpecifiedLocalInvalid_DEBUG1
#define CacheIdRegisterLocalInvalid_DEBUG1 #define CacheIdRegisterLocalInvalid_DEBUG1
#define CacheIdRegisterLocalRollback_DEBUG1
#define CacheIdImmediateRegisterSharedInvalid_DEBUG1
#endif /* INVALIDDEBUG */ #endif /* INVALIDDEBUG */
/* --------------------------------
* CacheIdRegisterSpecifiedLocalInvalid
* --------------------------------
*/
static LocalInvalid
CacheIdRegisterSpecifiedLocalInvalid(LocalInvalid invalid,
Index cacheId, Index hashIndex, ItemPointer pointer)
{
InvalidationMessage message;
/* ----------------
* debugging stuff
* ----------------
*/
CacheIdRegisterSpecifiedLocalInvalid_DEBUG1;
/* ----------------
* create a message describing the system catalog tuple
* we wish to invalidate.
* ----------------
*/
message = (InvalidationMessage)
InvalidationEntryAllocate(sizeof(InvalidationMessageData));
message->kind = 'c';
message->any.catalog.cacheId = cacheId;
message->any.catalog.hashIndex = hashIndex;
ItemPointerCopy(pointer, &message->any.catalog.pointerData);
/* ----------------
* Add message to linked list of unprocessed messages.
* ----------------
*/
invalid = LocalInvalidRegister(invalid, (InvalidationEntry) message);
return invalid;
}
/* --------------------------------
* CacheIdRegisterLocalInvalid
* --------------------------------
*/
static void static void
CacheIdRegisterLocalInvalid(Index cacheId, CacheIdRegisterLocalInvalid(Index cacheId,
Index hashIndex, Index hashIndex,
ItemPointer pointer) ItemPointer pointer)
{
/* ----------------
* debugging stuff
* ----------------
*/
CacheIdRegisterLocalInvalid_DEBUG1;
/* ----------------
* Add message to InvalidForall linked list.
* ----------------
*/
InvalidForall = CacheIdRegisterSpecifiedLocalInvalid(InvalidForall,
cacheId, hashIndex, pointer);
/* ----------------
* Add message to InvalidLocal linked list.
* ----------------
*/
InvalidLocal = CacheIdRegisterSpecifiedLocalInvalid(InvalidLocal,
cacheId, hashIndex, pointer);
}
/* --------------------------------
* CacheIdRegisterLocalRollback
* --------------------------------
*/
static void
CacheIdRegisterLocalRollback(Index cacheId, Index hashIndex,
ItemPointer pointer)
{
/* ----------------
* debugging stuff
* ----------------
*/
CacheIdRegisterLocalRollback_DEBUG1;
/* ----------------
* Add message to RollbackStack linked list.
* ----------------
*/
RollbackStack = CacheIdRegisterSpecifiedLocalInvalid(
RollbackStack, cacheId, hashIndex, pointer);
}
/* --------------------------------
* CacheIdImmediateRegisterSharedInvalid
* --------------------------------
*/
static void
CacheIdImmediateRegisterSharedInvalid(Index cacheId, Index hashIndex,
ItemPointer pointer)
{ {
InvalidationMessage message; InvalidationMessage message;
...@@ -183,7 +331,7 @@ CacheIdRegisterLocalInvalid(Index cacheId, ...@@ -183,7 +331,7 @@ CacheIdRegisterLocalInvalid(Index cacheId,
* debugging stuff * debugging stuff
* ---------------- * ----------------
*/ */
CacheIdRegisterLocalInvalid_DEBUG1; CacheIdImmediateRegisterSharedInvalid_DEBUG1;
/* ---------------- /* ----------------
* create a message describing the system catalog tuple * create a message describing the system catalog tuple
...@@ -198,12 +346,51 @@ CacheIdRegisterLocalInvalid(Index cacheId, ...@@ -198,12 +346,51 @@ CacheIdRegisterLocalInvalid(Index cacheId,
message->any.catalog.hashIndex = hashIndex; message->any.catalog.hashIndex = hashIndex;
ItemPointerCopy(pointer, &message->any.catalog.pointerData); ItemPointerCopy(pointer, &message->any.catalog.pointerData);
/* ----------------
* Register a shared catalog cache invalidation.
* ----------------
*/
InvalidationMessageRegisterSharedInvalid(message);
free((Pointer) &((InvalidationUserData *) message)->dataP[-1]);
}
/* --------------------------------
* RelationIdRegisterSpecifiedLocalInvalid
* --------------------------------
*/
static LocalInvalid
RelationIdRegisterSpecifiedLocalInvalid(LocalInvalid invalid,
Oid relationId, Oid objectId)
{
InvalidationMessage message;
/* ----------------
* debugging stuff
* ----------------
*/
#ifdef INVALIDDEBUG
elog(DEBUG, "RelationRegisterSpecifiedLocalInvalid(%u, %u)", relationId,
objectId);
#endif /* defined(INVALIDDEBUG) */
/* ----------------
* create a message describing the relation descriptor
* we wish to invalidate.
* ----------------
*/
message = (InvalidationMessage)
InvalidationEntryAllocate(sizeof(InvalidationMessageData));
message->kind = 'r';
message->any.relation.relationId = relationId;
message->any.relation.objectId = objectId;
/* ---------------- /* ----------------
* Add message to linked list of unprocessed messages. * Add message to linked list of unprocessed messages.
* ---------------- * ----------------
*/ */
Invalid = LocalInvalidRegister(Invalid, (InvalidationEntry) message); invalid = LocalInvalidRegister(invalid, (InvalidationEntry) message);
return invalid;
} }
/* -------------------------------- /* --------------------------------
...@@ -212,6 +399,61 @@ CacheIdRegisterLocalInvalid(Index cacheId, ...@@ -212,6 +399,61 @@ CacheIdRegisterLocalInvalid(Index cacheId,
*/ */
static void static void
RelationIdRegisterLocalInvalid(Oid relationId, Oid objectId) RelationIdRegisterLocalInvalid(Oid relationId, Oid objectId)
{
/* ----------------
* debugging stuff
* ----------------
*/
#ifdef INVALIDDEBUG
elog(DEBUG, "RelationRegisterLocalInvalid(%u, %u)", relationId,
objectId);
#endif /* defined(INVALIDDEBUG) */
/* ----------------
* Add message to InvalidForall linked list.
* ----------------
*/
InvalidForall = RelationIdRegisterSpecifiedLocalInvalid(InvalidForall,
relationId, objectId);
/* ----------------
* Add message to InvalidLocal linked list.
* ----------------
*/
InvalidLocal = RelationIdRegisterSpecifiedLocalInvalid(InvalidLocal,
relationId, objectId);
}
/* --------------------------------
* RelationIdRegisterLocalRollback
* --------------------------------
*/
static void
RelationIdRegisterLocalRollback(Oid relationId, Oid objectId)
{
/* ----------------
* debugging stuff
* ----------------
*/
#ifdef INVALIDDEBUG
elog(DEBUG, "RelationRegisterLocalRollback(%u, %u)", relationId,
objectId);
#endif /* defined(INVALIDDEBUG) */
/* ----------------
* Add message to RollbackStack linked list.
* ----------------
*/
RollbackStack = RelationIdRegisterSpecifiedLocalInvalid(
RollbackStack, relationId, objectId);
}
/* --------------------------------
* RelationIdImmediateRegisterSharedInvalid
* --------------------------------
*/
static void
RelationIdImmediateRegisterSharedInvalid(Oid relationId, Oid objectId)
{ {
InvalidationMessage message; InvalidationMessage message;
...@@ -220,7 +462,7 @@ RelationIdRegisterLocalInvalid(Oid relationId, Oid objectId) ...@@ -220,7 +462,7 @@ RelationIdRegisterLocalInvalid(Oid relationId, Oid objectId)
* ---------------- * ----------------
*/ */
#ifdef INVALIDDEBUG #ifdef INVALIDDEBUG
elog(DEBUG, "RelationRegisterLocalInvalid(%u, %u)", relationId, elog(DEBUG, "RelationImmediateRegisterSharedInvalid(%u, %u)", relationId,
objectId); objectId);
#endif /* defined(INVALIDDEBUG) */ #endif /* defined(INVALIDDEBUG) */
...@@ -237,10 +479,11 @@ RelationIdRegisterLocalInvalid(Oid relationId, Oid objectId) ...@@ -237,10 +479,11 @@ RelationIdRegisterLocalInvalid(Oid relationId, Oid objectId)
message->any.relation.objectId = objectId; message->any.relation.objectId = objectId;
/* ---------------- /* ----------------
* Add message to linked list of unprocessed messages. * Register a shared catalog cache invalidation.
* ---------------- * ----------------
*/ */
Invalid = LocalInvalidRegister(Invalid, (InvalidationEntry) message); InvalidationMessageRegisterSharedInvalid(message);
free((Pointer) &((InvalidationUserData *) message)->dataP[-1]);
} }
/* -------------------------------- /* --------------------------------
...@@ -397,7 +640,7 @@ InvalidationMessageCacheInvalidate(InvalidationMessage message) ...@@ -397,7 +640,7 @@ InvalidationMessageCacheInvalidate(InvalidationMessage message)
case 'c': /* cached system catalog tuple */ case 'c': /* cached system catalog tuple */
InvalidationMessageCacheInvalidate_DEBUG1; InvalidationMessageCacheInvalidate_DEBUG1;
CatalogCacheIdInvalidate(message->any.catalog.cacheId, CacheIdInvalidate(message->any.catalog.cacheId,
message->any.catalog.hashIndex, message->any.catalog.hashIndex,
&message->any.catalog.pointerData); &message->any.catalog.pointerData);
break; break;
...@@ -405,7 +648,9 @@ InvalidationMessageCacheInvalidate(InvalidationMessage message) ...@@ -405,7 +648,9 @@ InvalidationMessageCacheInvalidate(InvalidationMessage message)
case 'r': /* cached relation descriptor */ case 'r': /* cached relation descriptor */
InvalidationMessageCacheInvalidate_DEBUG2; InvalidationMessageCacheInvalidate_DEBUG2;
/* XXX ignore this--is this correct ??? */ CacheIdInvalidate(message->any.relation.relationId,
message->any.relation.objectId,
(ItemPointer) NULL);
break; break;
default: default:
...@@ -500,38 +745,93 @@ RegisterInvalid(bool send) ...@@ -500,38 +745,93 @@ RegisterInvalid(bool send)
* Process and free the current list of inval messages. * Process and free the current list of inval messages.
* ---------------- * ----------------
*/ */
invalid = Invalid;
Invalid = EmptyLocalInvalid; /* anything added now is part of a new list */
DiscardInvalidStack(&InvalidLocal);
if (send) if (send)
LocalInvalidInvalidate(invalid, {
InvalidationMessageRegisterSharedInvalid); DiscardInvalidStack(&RollbackStack);
invalid = InvalidForall;
InvalidForall = EmptyLocalInvalid; /* clear InvalidForall */
LocalInvalidInvalidate(invalid, InvalidationMessageRegisterSharedInvalid, true);
}
else else
LocalInvalidInvalidate(invalid, {
InvalidationMessageCacheInvalidate); DiscardInvalidStack(&InvalidForall);
invalid = RollbackStack;
RollbackStack = EmptyLocalInvalid; /* clear RollbackStack */
LocalInvalidInvalidate(invalid, InvalidationMessageCacheInvalidate, true);
}
} }
/* /*
* RelationIdInvalidateHeapTuple * ImmediateLocalInvalidation
* Causes the given tuple in a relation to be invalidated. * Causes invalidation immediately for the next command of the transaction.
* *
* Note: * Note:
* This should be called in time of CommandCounterIncrement().
*/
void
ImmediateLocalInvalidation(bool send)
{
LocalInvalid invalid;
/* ----------------
* debugging stuff
* ----------------
*/
#ifdef INVALIDDEBUG
elog(DEBUG, "ImmediateLocalInvalidation(%d) called", send);
#endif /* defined(INVALIDDEBUG) */
/* ----------------
* Process and free the local list of inval messages.
* ----------------
*/
if (send)
{
invalid = InvalidLocal;
InvalidLocal = EmptyLocalInvalid; /* clear InvalidLocal */
LocalInvalidInvalidate(invalid, InvalidationMessageCacheInvalidate, true);
}
else
{
/*
* This may be used for rollback to a savepoint.
* Don't clear InvalidForall and RollbackStack here.
*/
DiscardInvalidStack(&InvalidLocal);
invalid = RollbackStack;
LocalInvalidInvalidate(invalid, InvalidationMessageCacheInvalidate, false);
}
}
/*
* InvokeHeapTupleInvalidation
* Invoke functions for the tuple which register invalidation
* of catalog/relation cache.
* Note:
* Assumes object id is valid. * Assumes object id is valid.
* Assumes tuple is valid. * Assumes tuple is valid.
*/ */
#ifdef INVALIDDEBUG #ifdef INVALIDDEBUG
#define RelationInvalidateHeapTuple_DEBUG1 \ #define InvokeHeapTupleInvalidation_DEBUG1 \
elog(DEBUG, "RelationInvalidateHeapTuple(%s, [%d,%d])", \ elog(DEBUG, "%s(%s, [%d,%d])", \
funcname,\
RelationGetPhysicalRelationName(relation), \ RelationGetPhysicalRelationName(relation), \
ItemPointerGetBlockNumber(&tuple->t_ctid), \ ItemPointerGetBlockNumber(&tuple->t_self), \
ItemPointerGetOffsetNumber(&tuple->t_ctid)) ItemPointerGetOffsetNumber(&tuple->t_self))
#else #else
#define RelationInvalidateHeapTuple_DEBUG1 #define InvokeHeapTupleInvalidation_DEBUG1
#endif /* defined(INVALIDDEBUG) */ #endif /* defined(INVALIDDEBUG) */
void static void
RelationInvalidateHeapTuple(Relation relation, HeapTuple tuple) InvokeHeapTupleInvalidation(Relation relation, HeapTuple tuple,
void (*CacheIdRegisterFunc)(),
void (*RelationIdRegisterFunc)(),
const char *funcname)
{ {
/* ---------------- /* ----------------
* sanity checks * sanity checks
...@@ -553,13 +853,88 @@ RelationInvalidateHeapTuple(Relation relation, HeapTuple tuple) ...@@ -553,13 +853,88 @@ RelationInvalidateHeapTuple(Relation relation, HeapTuple tuple)
* debugging stuff * debugging stuff
* ---------------- * ----------------
*/ */
RelationInvalidateHeapTuple_DEBUG1; InvokeHeapTupleInvalidation_DEBUG1;
RelationInvalidateCatalogCacheTuple(relation, tuple,
CacheIdRegisterFunc);
RelationInvalidateCatalogCacheTuple(relation, RelationInvalidateRelationCache(relation, tuple,
tuple, RelationIdRegisterFunc);
CacheIdRegisterLocalInvalid); }
/*
* RelationInvalidateHeapTuple
* Causes the given tuple in a relation to be invalidated.
*/
void
RelationInvalidateHeapTuple(Relation relation, HeapTuple tuple)
{
InvokeHeapTupleInvalidation(relation, tuple,
CacheIdRegisterLocalInvalid,
RelationIdRegisterLocalInvalid,
"RelationInvalidateHeapTuple");
}
/*
* RelationMark4RollbackHeapTuple
* keep the given tuple in a relation to be invalidated
* in case of abort.
*/
void
RelationMark4RollbackHeapTuple(Relation relation, HeapTuple tuple)
{
InvokeHeapTupleInvalidation(relation, tuple,
CacheIdRegisterLocalRollback,
RelationIdRegisterLocalRollback,
"RelationMark4RollbackHeapTuple");
}
/*
* ImmediateInvalidateSharedHeapTuple
* Different from RelationInvalidateHeapTuple()
* this function queues shared invalidation info immediately.
*/
void
ImmediateInvalidateSharedHeapTuple(Relation relation, HeapTuple tuple)
{
InvokeHeapTupleInvalidation(relation, tuple,
CacheIdImmediateRegisterSharedInvalid,
RelationIdImmediateRegisterSharedInvalid,
"ImmediateInvalidateSharedHeapTuple");
}
/*
* ImmediateSharedRelationCacheInvalidate
* Register shared relation cache invalidation immediately
*
* This is needed for smgrunlink()/smgrtruncate().
* Those functions unlink/truncate the base file immediately
* and couldn't be rollbacked in case of abort/crash.
* So relation cache invalidation must be registerd immediately.
* Note:
* Assumes Relation is valid.
*/
void
ImmediateSharedRelationCacheInvalidate(Relation relation)
{
/* ----------------
* sanity checks
* ----------------
*/
Assert(RelationIsValid(relation));
if (IsBootstrapProcessingMode())
return;
/* ----------------
* debugging stuff
* ----------------
*/
#ifdef INVALIDDEBUG
elog(DEBUG, "ImmediateSharedRelationCacheInvalidate(%s)", \
RelationGetPhysicalRelationName(relation));
#endif /* defined(INVALIDDEBUG) */
RelationInvalidateRelationCache(relation, RelationIdImmediateRegisterSharedInvalid(
tuple, RelOid_pg_class, RelationGetRelid(relation));
RelationIdRegisterLocalInvalid);
} }
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: inval.h,v 1.14 1999/11/21 01:58:20 tgl Exp $ * $Id: inval.h,v 1.15 2000/01/10 06:30:56 inoue Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -19,6 +19,14 @@ extern void DiscardInvalid(void); ...@@ -19,6 +19,14 @@ extern void DiscardInvalid(void);
extern void RegisterInvalid(bool send); extern void RegisterInvalid(bool send);
extern void ImmediateLocalInvalidation(bool send);
extern void RelationInvalidateHeapTuple(Relation relation, HeapTuple tuple); extern void RelationInvalidateHeapTuple(Relation relation, HeapTuple tuple);
extern void RelationMark4RollbackHeapTuple(Relation relation, HeapTuple tuple);
extern void ImmediateInvalidateSharedHeapTuple(Relation relation, HeapTuple tuple);
extern void ImmediateSharedRelationCacheInvalidate(Relation relation);
#endif /* INVAL_H */ #endif /* INVAL_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment