Commit c46c803f authored by Robert Haas's avatar Robert Haas

Fix relfilenodemap.c's handling of cache invalidations.

The old code entered a new hash table entry first, then scanned
pg_class to determine what value to fill in, and then populated the
entry.  This fails to work properly if a cache invalidation happens
as a result of opening pg_class.  Repair.

Along the way, get rid of the idea of blowing away the entire hash
table as a method of processing invalidations.  Instead, just delete
all the entries one by one.  This is probably not quite as cheap but
it's simpler, and shouldn't happen often.

Andres Freund
parent cd8115e0
...@@ -57,23 +57,20 @@ RelfilenodeMapInvalidateCallback(Datum arg, Oid relid) ...@@ -57,23 +57,20 @@ RelfilenodeMapInvalidateCallback(Datum arg, Oid relid)
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
RelfilenodeMapEntry *entry; RelfilenodeMapEntry *entry;
/* nothing to do if not active or deleted */ /* callback only gets registered after creating the hash */
if (RelfilenodeMapHash == NULL) Assert(RelfilenodeMapHash != NULL);
return;
/* if relid is InvalidOid, we must invalidate the entire cache */
if (relid == InvalidOid)
{
hash_destroy(RelfilenodeMapHash);
RelfilenodeMapHash = NULL;
return;
}
hash_seq_init(&status, RelfilenodeMapHash); hash_seq_init(&status, RelfilenodeMapHash);
while ((entry = (RelfilenodeMapEntry *) hash_seq_search(&status)) != NULL) while ((entry = (RelfilenodeMapEntry *) hash_seq_search(&status)) != NULL)
{ {
/* Same OID may occur in more than one tablespace. */ /*
if (entry->relid == relid) * If relid is InvalidOid, signalling a complete reset, we must remove
* all entries, otherwise just remove the specific relation's entry.
* Always remove negative cache entries.
*/
if (relid == InvalidOid || /* complete reset */
entry->relid == InvalidOid || /* negative cache entry */
entry->relid == relid) /* individual flushed relation */
{ {
if (hash_search(RelfilenodeMapHash, if (hash_search(RelfilenodeMapHash,
(void *) &entry->key, (void *) &entry->key,
...@@ -92,32 +89,12 @@ static void ...@@ -92,32 +89,12 @@ static void
InitializeRelfilenodeMap(void) InitializeRelfilenodeMap(void)
{ {
HASHCTL ctl; HASHCTL ctl;
static bool initial_init_done = false;
int i; int i;
/* Make sure we've initialized CacheMemoryContext. */ /* Make sure we've initialized CacheMemoryContext. */
if (CacheMemoryContext == NULL) if (CacheMemoryContext == NULL)
CreateCacheMemoryContext(); CreateCacheMemoryContext();
/* Initialize the hash table. */
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(RelfilenodeMapKey);
ctl.entrysize = sizeof(RelfilenodeMapEntry);
ctl.hash = tag_hash;
ctl.hcxt = CacheMemoryContext;
RelfilenodeMapHash =
hash_create("RelfilenodeMap cache", 1024, &ctl,
HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
/*
* For complete resets we simply delete the entire hash, but there's no
* need to do the other stuff multiple times. Especially the initialization
* of the relcche invalidation should only be done once.
*/
if (initial_init_done)
return;
/* build skey */ /* build skey */
MemSet(&relfilenode_skey, 0, sizeof(relfilenode_skey)); MemSet(&relfilenode_skey, 0, sizeof(relfilenode_skey));
...@@ -134,10 +111,25 @@ InitializeRelfilenodeMap(void) ...@@ -134,10 +111,25 @@ InitializeRelfilenodeMap(void)
relfilenode_skey[0].sk_attno = Anum_pg_class_reltablespace; relfilenode_skey[0].sk_attno = Anum_pg_class_reltablespace;
relfilenode_skey[1].sk_attno = Anum_pg_class_relfilenode; relfilenode_skey[1].sk_attno = Anum_pg_class_relfilenode;
/* Initialize the hash table. */
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(RelfilenodeMapKey);
ctl.entrysize = sizeof(RelfilenodeMapEntry);
ctl.hash = tag_hash;
ctl.hcxt = CacheMemoryContext;
/*
* Only create the RelfilenodeMapHash now, so we don't end up partially
* initialized when fmgr_info_cxt() above ERRORs out with an out of memory
* error.
*/
RelfilenodeMapHash =
hash_create("RelfilenodeMap cache", 1024, &ctl,
HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
/* Watch for invalidation events. */ /* Watch for invalidation events. */
CacheRegisterRelcacheCallback(RelfilenodeMapInvalidateCallback, CacheRegisterRelcacheCallback(RelfilenodeMapInvalidateCallback,
(Datum) 0); (Datum) 0);
initial_init_done = true;
} }
/* /*
...@@ -156,6 +148,7 @@ RelidByRelfilenode(Oid reltablespace, Oid relfilenode) ...@@ -156,6 +148,7 @@ RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
Relation relation; Relation relation;
HeapTuple ntp; HeapTuple ntp;
ScanKeyData skey[2]; ScanKeyData skey[2];
Oid relid;
if (RelfilenodeMapHash == NULL) if (RelfilenodeMapHash == NULL)
InitializeRelfilenodeMap(); InitializeRelfilenodeMap();
...@@ -169,30 +162,37 @@ RelidByRelfilenode(Oid reltablespace, Oid relfilenode) ...@@ -169,30 +162,37 @@ RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
key.relfilenode = relfilenode; key.relfilenode = relfilenode;
/* /*
* Check cache and enter entry if nothing could be found. Even if no target * Check cache and return entry if one is found. Even if no target
* relation can be found later on we store the negative match and return a * relation can be found later on we store the negative match and return a
* InvalidOid from cache. That's not really necessary for performance since * InvalidOid from cache. That's not really necessary for performance
* querying invalid values isn't supposed to be a frequent thing, but the * since querying invalid values isn't supposed to be a frequent thing,
* implementation is simpler this way. * but it's basically free.
*/ */
entry = hash_search(RelfilenodeMapHash, (void *) &key, HASH_ENTER, &found); entry = hash_search(RelfilenodeMapHash, (void *) &key, HASH_FIND, &found);
if (found) if (found)
return entry->relid; return entry->relid;
/* initialize empty/negative cache entry before doing the actual lookup */
entry->relid = InvalidOid;
/* ok, no previous cache entry, do it the hard way */ /* ok, no previous cache entry, do it the hard way */
/* check shared tables */ /* initialize empty/negative cache entry before doing the actual lookups */
relid = InvalidOid;
if (reltablespace == GLOBALTABLESPACE_OID) if (reltablespace == GLOBALTABLESPACE_OID)
{ {
entry->relid = RelationMapFilenodeToOid(relfilenode, true); /*
return entry->relid; * Ok, shared table, check relmapper.
*/
relid = RelationMapFilenodeToOid(relfilenode, true);
} }
else
{
/*
* Not a shared table, could either be a plain relation or a
* non-shared, nailed one, like e.g. pg_class.
*/
/* check plain relations by looking in pg_class */ /* check for plain relations by looking in pg_class */
relation = heap_open(RelationRelationId, AccessShareLock); relation = heap_open(RelationRelationId, AccessShareLock);
/* copy scankey to local copy, it will be modified during the scan */ /* copy scankey to local copy, it will be modified during the scan */
...@@ -235,7 +235,7 @@ RelidByRelfilenode(Oid reltablespace, Oid relfilenode) ...@@ -235,7 +235,7 @@ RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
Assert(!isnull && check == relfilenode); Assert(!isnull && check == relfilenode);
} }
#endif #endif
entry->relid = HeapTupleGetOid(ntp); relid = HeapTupleGetOid(ntp);
} }
systable_endscan(scandesc); systable_endscan(scandesc);
...@@ -243,7 +243,18 @@ RelidByRelfilenode(Oid reltablespace, Oid relfilenode) ...@@ -243,7 +243,18 @@ RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
/* check for tables that are mapped but not shared */ /* check for tables that are mapped but not shared */
if (!found) if (!found)
entry->relid = RelationMapFilenodeToOid(relfilenode, false); relid = RelationMapFilenodeToOid(relfilenode, false);
}
return entry->relid; /*
* Only enter entry into cache now, our opening of pg_class could have
* caused cache invalidations to be executed which would have deleted a
* new entry if we had entered it above.
*/
entry = hash_search(RelfilenodeMapHash, (void *) &key, HASH_ENTER, &found);
if (found)
elog(ERROR, "corrupted hashtable");
entry->relid = relid;
return relid;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment