Commit b23b0f55 authored by Tom Lane's avatar Tom Lane

Code review for recent changes in relcache.c.

rd_replidindex should be managed the same as rd_oidindex, and rd_keyattr
and rd_idattr should be managed like rd_indexattr.  Omissions in this area
meant that the bitmapsets computed for rd_keyattr and rd_idattr would be
leaked during any relcache flush, resulting in a slow but permanent leak in
CacheMemoryContext.  There was also a tiny probability of relcache entry
corruption if we ran out of memory at just the wrong point in
RelationGetIndexAttrBitmap.  Otherwise, the fields were not zeroed where
expected, which would not bother the code any AFAICS but could greatly
confuse anyone examining the relcache entry while debugging.

Also, create an API function RelationGetReplicaIndex rather than letting
non-relcache code be intimate with the mechanisms underlying caching of
that value (we won't even mention the memory leak there).

Also, fix a relcache flush hazard identified by Andres Freund:
RelationGetIndexAttrBitmap must not assume that rd_replidindex stays valid
across index_open.

The aspects of this involving rd_keyattr date back to 9.3, so back-patch
those changes.
parent ac53295d
...@@ -7113,6 +7113,7 @@ static HeapTuple ...@@ -7113,6 +7113,7 @@ static HeapTuple
ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *copy) ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *copy)
{ {
TupleDesc desc = RelationGetDescr(relation); TupleDesc desc = RelationGetDescr(relation);
Oid replidindex;
Relation idx_rel; Relation idx_rel;
TupleDesc idx_desc; TupleDesc idx_desc;
char replident = relation->rd_rel->relreplident; char replident = relation->rd_rel->relreplident;
...@@ -7147,18 +7148,16 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool * ...@@ -7147,18 +7148,16 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *
if (!key_changed) if (!key_changed)
return NULL; return NULL;
/* needs to already have been fetched? */ /* find the replica identity index */
if (relation->rd_indexvalid == 0) replidindex = RelationGetReplicaIndex(relation);
RelationGetIndexList(relation); if (!OidIsValid(replidindex))
if (!OidIsValid(relation->rd_replidindex))
{ {
elog(DEBUG4, "Could not find configured replica identity for table \"%s\"", elog(DEBUG4, "could not find configured replica identity for table \"%s\"",
RelationGetRelationName(relation)); RelationGetRelationName(relation));
return NULL; return NULL;
} }
idx_rel = RelationIdGetRelation(relation->rd_replidindex); idx_rel = RelationIdGetRelation(replidindex);
idx_desc = RelationGetDescr(idx_rel); idx_desc = RelationGetDescr(idx_rel);
/* deform tuple, so we have fast access to columns */ /* deform tuple, so we have fast access to columns */
......
...@@ -1901,6 +1901,8 @@ RelationDestroyRelation(Relation relation, bool remember_tupdesc) ...@@ -1901,6 +1901,8 @@ RelationDestroyRelation(Relation relation, bool remember_tupdesc)
} }
list_free(relation->rd_indexlist); list_free(relation->rd_indexlist);
bms_free(relation->rd_indexattr); bms_free(relation->rd_indexattr);
bms_free(relation->rd_keyattr);
bms_free(relation->rd_idattr);
FreeTriggerDesc(relation->trigdesc); FreeTriggerDesc(relation->trigdesc);
if (relation->rd_options) if (relation->rd_options)
pfree(relation->rd_options); pfree(relation->rd_options);
...@@ -2547,6 +2549,7 @@ AtEOXact_cleanup(Relation relation, bool isCommit) ...@@ -2547,6 +2549,7 @@ AtEOXact_cleanup(Relation relation, bool isCommit)
list_free(relation->rd_indexlist); list_free(relation->rd_indexlist);
relation->rd_indexlist = NIL; relation->rd_indexlist = NIL;
relation->rd_oidindex = InvalidOid; relation->rd_oidindex = InvalidOid;
relation->rd_replidindex = InvalidOid;
relation->rd_indexvalid = 0; relation->rd_indexvalid = 0;
} }
} }
...@@ -2645,6 +2648,7 @@ AtEOSubXact_cleanup(Relation relation, bool isCommit, ...@@ -2645,6 +2648,7 @@ AtEOSubXact_cleanup(Relation relation, bool isCommit,
list_free(relation->rd_indexlist); list_free(relation->rd_indexlist);
relation->rd_indexlist = NIL; relation->rd_indexlist = NIL;
relation->rd_oidindex = InvalidOid; relation->rd_oidindex = InvalidOid;
relation->rd_replidindex = InvalidOid;
relation->rd_indexvalid = 0; relation->rd_indexvalid = 0;
} }
} }
...@@ -3596,6 +3600,10 @@ CheckConstraintFetch(Relation relation) ...@@ -3596,6 +3600,10 @@ CheckConstraintFetch(Relation relation)
* of the index list. rd_oidindex is valid when rd_indexvalid isn't zero; * of the index list. rd_oidindex is valid when rd_indexvalid isn't zero;
* it is the pg_class OID of a unique index on OID when the relation has one, * it is the pg_class OID of a unique index on OID when the relation has one,
* and InvalidOid if there is no such index. * and InvalidOid if there is no such index.
*
* In exactly the same way, we update rd_replidindex, which is the pg_class
* OID of an index to be used as the relation's replication identity index,
* or InvalidOid if there is no such index.
*/ */
List * List *
RelationGetIndexList(Relation relation) RelationGetIndexList(Relation relation)
...@@ -3667,8 +3675,8 @@ RelationGetIndexList(Relation relation) ...@@ -3667,8 +3675,8 @@ RelationGetIndexList(Relation relation)
/* /*
* Invalid, non-unique, non-immediate or predicate indexes aren't * Invalid, non-unique, non-immediate or predicate indexes aren't
* interesting for neither oid indexes nor replication identity * interesting for either oid indexes or replication identity indexes,
* indexes, so don't check them. * so don't check them.
*/ */
if (!IndexIsValid(index) || !index->indisunique || if (!IndexIsValid(index) || !index->indisunique ||
!index->indimmediate || !index->indimmediate ||
...@@ -3681,35 +3689,29 @@ RelationGetIndexList(Relation relation) ...@@ -3681,35 +3689,29 @@ RelationGetIndexList(Relation relation)
indclass->values[0] == OID_BTREE_OPS_OID) indclass->values[0] == OID_BTREE_OPS_OID)
oidIndex = index->indexrelid; oidIndex = index->indexrelid;
/* always prefer primary keys */ /* remember primary key index if any */
if (index->indisprimary) if (index->indisprimary)
pkeyIndex = index->indexrelid; pkeyIndex = index->indexrelid;
/* explicitly chosen index */ /* remember explicitly chosen replica index */
if (index->indisreplident) if (index->indisreplident)
candidateIndex = index->indexrelid; candidateIndex = index->indexrelid;
} }
systable_endscan(indscan); systable_endscan(indscan);
/* primary key */
if (replident == REPLICA_IDENTITY_DEFAULT &&
OidIsValid(pkeyIndex))
relation->rd_replidindex = pkeyIndex;
/* explicitly chosen index */
else if (replident == REPLICA_IDENTITY_INDEX &&
OidIsValid(candidateIndex))
relation->rd_replidindex = candidateIndex;
/* nothing */
else
relation->rd_replidindex = InvalidOid;
heap_close(indrel, AccessShareLock); heap_close(indrel, AccessShareLock);
/* Now save a copy of the completed list in the relcache entry. */ /* Now save a copy of the completed list in the relcache entry. */
oldcxt = MemoryContextSwitchTo(CacheMemoryContext); oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
relation->rd_indexlist = list_copy(result); relation->rd_indexlist = list_copy(result);
relation->rd_oidindex = oidIndex; relation->rd_oidindex = oidIndex;
if (replident == REPLICA_IDENTITY_DEFAULT && OidIsValid(pkeyIndex))
relation->rd_replidindex = pkeyIndex;
else if (replident == REPLICA_IDENTITY_INDEX && OidIsValid(candidateIndex))
relation->rd_replidindex = candidateIndex;
else
relation->rd_replidindex = InvalidOid;
relation->rd_indexvalid = 1; relation->rd_indexvalid = 1;
MemoryContextSwitchTo(oldcxt); MemoryContextSwitchTo(oldcxt);
...@@ -3767,7 +3769,8 @@ insert_ordered_oid(List *list, Oid datum) ...@@ -3767,7 +3769,8 @@ insert_ordered_oid(List *list, Oid datum)
* correctly with respect to the full index set. It is up to the caller * correctly with respect to the full index set. It is up to the caller
* to ensure that a correct rd_indexattr set has been cached before first * to ensure that a correct rd_indexattr set has been cached before first
* calling RelationSetIndexList; else a subsequent inquiry might cause a * calling RelationSetIndexList; else a subsequent inquiry might cause a
* wrong rd_indexattr set to get computed and cached. * wrong rd_indexattr set to get computed and cached. Likewise, we do not
* touch rd_keyattr or rd_idattr.
*/ */
void void
RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex) RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
...@@ -3783,6 +3786,8 @@ RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex) ...@@ -3783,6 +3786,8 @@ RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
list_free(relation->rd_indexlist); list_free(relation->rd_indexlist);
relation->rd_indexlist = indexIds; relation->rd_indexlist = indexIds;
relation->rd_oidindex = oidIndex; relation->rd_oidindex = oidIndex;
/* For the moment, assume the target rel hasn't got a replica index */
relation->rd_replidindex = InvalidOid;
relation->rd_indexvalid = 2; /* mark list as forced */ relation->rd_indexvalid = 2; /* mark list as forced */
/* Flag relation as needing eoxact cleanup (to reset the list) */ /* Flag relation as needing eoxact cleanup (to reset the list) */
EOXactListAdd(relation); EOXactListAdd(relation);
...@@ -3816,6 +3821,27 @@ RelationGetOidIndex(Relation relation) ...@@ -3816,6 +3821,27 @@ RelationGetOidIndex(Relation relation)
return relation->rd_oidindex; return relation->rd_oidindex;
} }
/*
* RelationGetReplicaIndex -- get OID of the relation's replica identity index
*
* Returns InvalidOid if there is no such index.
*/
Oid
RelationGetReplicaIndex(Relation relation)
{
List *ilist;
if (relation->rd_indexvalid == 0)
{
/* RelationGetIndexList does the heavy lifting. */
ilist = RelationGetIndexList(relation);
list_free(ilist);
Assert(relation->rd_indexvalid != 0);
}
return relation->rd_replidindex;
}
/* /*
* RelationGetIndexExpressions -- get the index expressions for an index * RelationGetIndexExpressions -- get the index expressions for an index
* *
...@@ -3954,8 +3980,8 @@ RelationGetIndexPredicate(Relation relation) ...@@ -3954,8 +3980,8 @@ RelationGetIndexPredicate(Relation relation)
* predicates.) * predicates.)
* *
* Depending on attrKind, a bitmap covering the attnums for all index columns, * Depending on attrKind, a bitmap covering the attnums for all index columns,
* for all key columns or for all the columns the configured replica identity * for all potential foreign key columns, or for all columns in the configured
* are returned. * replica identity index is returned.
* *
* Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
* we can include system attributes (e.g., OID) in the bitmap representation. * we can include system attributes (e.g., OID) in the bitmap representation.
...@@ -3974,22 +4000,25 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) ...@@ -3974,22 +4000,25 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Bitmapset *uindexattrs; /* columns in unique indexes */ Bitmapset *uindexattrs; /* columns in unique indexes */
Bitmapset *idindexattrs; /* columns in the replica identity */ Bitmapset *idindexattrs; /* columns in the replica identity */
List *indexoidlist; List *indexoidlist;
Oid relreplindex;
ListCell *l; ListCell *l;
MemoryContext oldcxt; MemoryContext oldcxt;
/* Quick exit if we already computed the result. */ /* Quick exit if we already computed the result. */
if (relation->rd_indexattr != NULL) if (relation->rd_indexattr != NULL)
{
switch (attrKind) switch (attrKind)
{ {
case INDEX_ATTR_BITMAP_IDENTITY_KEY:
return bms_copy(relation->rd_idattr);
case INDEX_ATTR_BITMAP_KEY:
return bms_copy(relation->rd_keyattr);
case INDEX_ATTR_BITMAP_ALL: case INDEX_ATTR_BITMAP_ALL:
return bms_copy(relation->rd_indexattr); return bms_copy(relation->rd_indexattr);
case INDEX_ATTR_BITMAP_KEY:
return bms_copy(relation->rd_keyattr);
case INDEX_ATTR_BITMAP_IDENTITY_KEY:
return bms_copy(relation->rd_idattr);
default: default:
elog(ERROR, "unknown attrKind %u", attrKind); elog(ERROR, "unknown attrKind %u", attrKind);
} }
}
/* Fast path if definitely no indexes */ /* Fast path if definitely no indexes */
if (!RelationGetForm(relation)->relhasindex) if (!RelationGetForm(relation)->relhasindex)
...@@ -4004,6 +4033,15 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) ...@@ -4004,6 +4033,15 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
if (indexoidlist == NIL) if (indexoidlist == NIL)
return NULL; return NULL;
/*
* Copy the rd_replidindex value computed by RelationGetIndexList before
* proceeding. This is needed because a relcache flush could occur inside
* index_open below, resetting the fields managed by RelationGetIndexList.
* (The values we're computing will still be valid, assuming that caller
* has a sufficient lock on the relation.)
*/
relreplindex = relation->rd_replidindex;
/* /*
* For each index, add referenced attributes to indexattrs. * For each index, add referenced attributes to indexattrs.
* *
...@@ -4026,7 +4064,6 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) ...@@ -4026,7 +4064,6 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
bool isKey; /* candidate key */ bool isKey; /* candidate key */
bool isIDKey; /* replica identity index */ bool isIDKey; /* replica identity index */
indexDesc = index_open(indexOid, AccessShareLock); indexDesc = index_open(indexOid, AccessShareLock);
/* Extract index key information from the index's pg_index row */ /* Extract index key information from the index's pg_index row */
...@@ -4038,7 +4075,7 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) ...@@ -4038,7 +4075,7 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
indexInfo->ii_Predicate == NIL; indexInfo->ii_Predicate == NIL;
/* Is this index the configured (or default) replica identity? */ /* Is this index the configured (or default) replica identity? */
isIDKey = indexOid == relation->rd_replidindex; isIDKey = (indexOid == relreplindex);
/* Collect simple attribute references */ /* Collect simple attribute references */
for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++) for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
...@@ -4050,13 +4087,13 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) ...@@ -4050,13 +4087,13 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
indexattrs = bms_add_member(indexattrs, indexattrs = bms_add_member(indexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber); attrnum - FirstLowInvalidHeapAttributeNumber);
if (isIDKey)
idindexattrs = bms_add_member(idindexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber);
if (isKey) if (isKey)
uindexattrs = bms_add_member(uindexattrs, uindexattrs = bms_add_member(uindexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber); attrnum - FirstLowInvalidHeapAttributeNumber);
if (isIDKey)
idindexattrs = bms_add_member(idindexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber);
} }
} }
...@@ -4071,22 +4108,28 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) ...@@ -4071,22 +4108,28 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
list_free(indexoidlist); list_free(indexoidlist);
/* Now save a copy of the bitmap in the relcache entry. */ /*
* Now save copies of the bitmaps in the relcache entry. We intentionally
* set rd_indexattr last, because that's the one that signals validity of
* the values; if we run out of memory before making that copy, we won't
* leave the relcache entry looking like the other ones are valid but
* empty.
*/
oldcxt = MemoryContextSwitchTo(CacheMemoryContext); oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
relation->rd_indexattr = bms_copy(indexattrs);
relation->rd_keyattr = bms_copy(uindexattrs); relation->rd_keyattr = bms_copy(uindexattrs);
relation->rd_idattr = bms_copy(idindexattrs); relation->rd_idattr = bms_copy(idindexattrs);
relation->rd_indexattr = bms_copy(indexattrs);
MemoryContextSwitchTo(oldcxt); MemoryContextSwitchTo(oldcxt);
/* We return our original working copy for caller to play with */ /* We return our original working copy for caller to play with */
switch (attrKind) switch (attrKind)
{ {
case INDEX_ATTR_BITMAP_IDENTITY_KEY:
return idindexattrs;
case INDEX_ATTR_BITMAP_KEY:
return uindexattrs;
case INDEX_ATTR_BITMAP_ALL: case INDEX_ATTR_BITMAP_ALL:
return indexattrs; return indexattrs;
case INDEX_ATTR_BITMAP_KEY:
return uindexattrs;
case INDEX_ATTR_BITMAP_IDENTITY_KEY:
return idindexattrs;
default: default:
elog(ERROR, "unknown attrKind %u", attrKind); elog(ERROR, "unknown attrKind %u", attrKind);
return NULL; return NULL;
...@@ -4630,8 +4673,11 @@ load_relcache_init_file(bool shared) ...@@ -4630,8 +4673,11 @@ load_relcache_init_file(bool shared)
rel->rd_refcnt = 0; rel->rd_refcnt = 0;
rel->rd_indexvalid = 0; rel->rd_indexvalid = 0;
rel->rd_indexlist = NIL; rel->rd_indexlist = NIL;
rel->rd_indexattr = NULL;
rel->rd_oidindex = InvalidOid; rel->rd_oidindex = InvalidOid;
rel->rd_replidindex = InvalidOid;
rel->rd_indexattr = NULL;
rel->rd_keyattr = NULL;
rel->rd_idattr = NULL;
rel->rd_createSubid = InvalidSubTransactionId; rel->rd_createSubid = InvalidSubTransactionId;
rel->rd_newRelfilenodeSubid = InvalidSubTransactionId; rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
rel->rd_amcache = NULL; rel->rd_amcache = NULL;
......
...@@ -101,22 +101,20 @@ typedef struct RelationData ...@@ -101,22 +101,20 @@ typedef struct RelationData
Form_pg_class rd_rel; /* RELATION tuple */ Form_pg_class rd_rel; /* RELATION tuple */
TupleDesc rd_att; /* tuple descriptor */ TupleDesc rd_att; /* tuple descriptor */
Oid rd_id; /* relation's object id */ Oid rd_id; /* relation's object id */
List *rd_indexlist; /* list of OIDs of indexes on relation */
Bitmapset *rd_indexattr; /* identifies columns used in indexes */
Bitmapset *rd_keyattr; /* cols that can be ref'd by foreign keys */
Bitmapset *rd_idattr; /* included in replica identity index */
Oid rd_oidindex; /* OID of unique index on OID, if any */
LockInfoData rd_lockInfo; /* lock mgr's info for locking relation */ LockInfoData rd_lockInfo; /* lock mgr's info for locking relation */
RuleLock *rd_rules; /* rewrite rules */ RuleLock *rd_rules; /* rewrite rules */
MemoryContext rd_rulescxt; /* private memory cxt for rd_rules, if any */ MemoryContext rd_rulescxt; /* private memory cxt for rd_rules, if any */
TriggerDesc *trigdesc; /* Trigger info, or NULL if rel has none */ TriggerDesc *trigdesc; /* Trigger info, or NULL if rel has none */
/* /* data managed by RelationGetIndexList: */
* The index chosen as the relation's replication identity or InvalidOid. List *rd_indexlist; /* list of OIDs of indexes on relation */
* Only set correctly if RelationGetIndexList has been Oid rd_oidindex; /* OID of unique index on OID, if any */
* called/rd_indexvalid > 0. Oid rd_replidindex; /* OID of replica identity index, if any */
*/
Oid rd_replidindex; /* data managed by RelationGetIndexAttrBitmap: */
Bitmapset *rd_indexattr; /* identifies columns used in indexes */
Bitmapset *rd_keyattr; /* cols that can be ref'd by foreign keys */
Bitmapset *rd_idattr; /* included in replica identity index */
/* /*
* rd_options is set whenever rd_rel is loaded into the relcache entry. * rd_options is set whenever rd_rel is loaded into the relcache entry.
......
...@@ -39,6 +39,7 @@ extern void RelationClose(Relation relation); ...@@ -39,6 +39,7 @@ extern void RelationClose(Relation relation);
*/ */
extern List *RelationGetIndexList(Relation relation); extern List *RelationGetIndexList(Relation relation);
extern Oid RelationGetOidIndex(Relation relation); extern Oid RelationGetOidIndex(Relation relation);
extern Oid RelationGetReplicaIndex(Relation relation);
extern List *RelationGetIndexExpressions(Relation relation); extern List *RelationGetIndexExpressions(Relation relation);
extern List *RelationGetIndexPredicate(Relation relation); extern List *RelationGetIndexPredicate(Relation relation);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment