Commit 24c0a6c6 authored by Alvaro Herrera's avatar Alvaro Herrera

logical replication: fix OID type mapping mechanism

The logical replication type map seems to have been misused by its only
caller -- it would try to use the remote OID as input for local type
routines, which unsurprisingly could result in bogus "cache lookup
failed for type XYZ" errors, or random other type names being picked up
if they happened to use the right OID.  Fix that, changing
Oid logicalrep_typmap_getid(Oid remoteid) to
char *logicalrep_typmap_gettypname(Oid remoteid)
which is more useful.  If the remote type is not part of the typmap,
this simply prints "unrecognized type" instead of choking trying to
figure out -- a pointless exercise (because the only input for that
comes from replication messages, which are not under the local node's
control) and dangerous to boot, when called from within an error context
callback.

Once that is done, it comes to light that the local OID in the typmap
entry was not being used for anything; the type/schema names are what we
need, so remove local type OID from that struct.

Once you do that, it becomes pointless to attach a callback to regular
syscache invalidation.  So remove that also.

Reported-by: Dang Minh Huong
Author: Masahiko Sawada
Reviewed-by: Álvaro Herrera, Petr Jelínek, Dang Minh Huong, Atsushi Torikoshi
Discussion: https://postgr.es/m/75DB81BEEA95B445AE6D576A0A5C9E936A6BE964@BPXM05GP.gisp.nec.co.jp
Discussion: https://postgr.es/m/75DB81BEEA95B445AE6D576A0A5C9E936A6C4B0A@BPXM05GP.gisp.nec.co.jp
parent 8df5a1c8
...@@ -35,8 +35,6 @@ static MemoryContext LogicalRepRelMapContext = NULL; ...@@ -35,8 +35,6 @@ static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL; static HTAB *LogicalRepRelMap = NULL;
static HTAB *LogicalRepTypMap = NULL; static HTAB *LogicalRepTypMap = NULL;
static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
uint32 hashvalue);
/* /*
* Relcache invalidation callback for our relation map cache. * Relcache invalidation callback for our relation map cache.
...@@ -115,8 +113,6 @@ logicalrep_relmap_init(void) ...@@ -115,8 +113,6 @@ logicalrep_relmap_init(void)
/* Watch for invalidation events. */ /* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb, CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
(Datum) 0); (Datum) 0);
CacheRegisterSyscacheCallback(TYPEOID, logicalrep_typmap_invalidate_cb,
(Datum) 0);
} }
/* /*
...@@ -375,27 +371,6 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode) ...@@ -375,27 +371,6 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
rel->localrel = NULL; rel->localrel = NULL;
} }
/*
* Type cache invalidation callback for our type map cache.
*/
static void
logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS status;
LogicalRepTyp *entry;
/* Just to be sure. */
if (LogicalRepTypMap == NULL)
return;
/* invalidate all cache entries */
hash_seq_init(&status, LogicalRepTypMap);
while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
entry->typoid = InvalidOid;
}
/* /*
* Free the type map cache entry data. * Free the type map cache entry data.
*/ */
...@@ -404,8 +379,6 @@ logicalrep_typmap_free_entry(LogicalRepTyp *entry) ...@@ -404,8 +379,6 @@ logicalrep_typmap_free_entry(LogicalRepTyp *entry)
{ {
pfree(entry->nspname); pfree(entry->nspname);
pfree(entry->typname); pfree(entry->typname);
entry->typoid = InvalidOid;
} }
/* /*
...@@ -436,58 +409,53 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp) ...@@ -436,58 +409,53 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
entry->nspname = pstrdup(remotetyp->nspname); entry->nspname = pstrdup(remotetyp->nspname);
entry->typname = pstrdup(remotetyp->typname); entry->typname = pstrdup(remotetyp->typname);
MemoryContextSwitchTo(oldctx); MemoryContextSwitchTo(oldctx);
entry->typoid = InvalidOid;
} }
/* /*
* Fetch type info from the cache. * Fetch type name from the cache by remote type OID.
*
* Return a substitute value if we cannot find the data type; no message is
* sent to the log in that case, because this is used by error callback
* already.
*/ */
Oid char *
logicalrep_typmap_getid(Oid remoteid) logicalrep_typmap_gettypname(Oid remoteid)
{ {
LogicalRepTyp *entry; LogicalRepTyp *entry;
bool found; bool found;
Oid nspoid;
/* Internal types are mapped directly. */ /* Internal types are mapped directly. */
if (remoteid < FirstNormalObjectId) if (remoteid < FirstNormalObjectId)
{ {
if (!get_typisdefined(remoteid)) if (!get_typisdefined(remoteid))
ereport(ERROR, {
(errmsg("built-in type %u not found", remoteid), /*
errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber."))); * This can be caused by having a publisher with a higher
return remoteid; * PostgreSQL major version than the subscriber.
*/
return psprintf("unrecognized %u", remoteid);
}
return format_type_be(remoteid);
} }
if (LogicalRepTypMap == NULL) if (LogicalRepTypMap == NULL)
logicalrep_relmap_init(); {
/*
* If the typemap is not initialized yet, we cannot possibly attempt
* to search the hash table; but there's no way we know the type
* locally yet, since we haven't received a message about this type,
* so this is the best we can do.
*/
return psprintf("unrecognized %u", remoteid);
}
/* Try finding the mapping. */ /* search the mapping */
entry = hash_search(LogicalRepTypMap, (void *) &remoteid, entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
HASH_FIND, &found); HASH_FIND, &found);
if (!found) if (!found)
elog(ERROR, "no type map entry for remote type %u", return psprintf("unrecognized %u", remoteid);
remoteid);
/* Found and mapped, return the oid. */
if (OidIsValid(entry->typoid))
return entry->typoid;
/* Otherwise, try to map to local type. */
nspoid = LookupExplicitNamespace(entry->nspname, true);
if (OidIsValid(nspoid))
entry->typoid = GetSysCacheOid2(TYPENAMENSP,
PointerGetDatum(entry->typname),
ObjectIdGetDatum(nspoid));
else
entry->typoid = InvalidOid;
if (!OidIsValid(entry->typoid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("data type \"%s.%s\" required for logical replication does not exist",
entry->nspname, entry->typname)));
return entry->typoid; Assert(OidIsValid(entry->remoteid));
return psprintf("%s.%s", entry->nspname, entry->typname);
} }
...@@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); ...@@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg typedef struct SlotErrCallbackArg
{ {
LogicalRepRelation *rel; LogicalRepRelMapEntry *rel;
int attnum; int local_attnum;
int remote_attnum;
} SlotErrCallbackArg; } SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL; static MemoryContext ApplyMessageContext = NULL;
...@@ -282,19 +283,29 @@ static void ...@@ -282,19 +283,29 @@ static void
slot_store_error_callback(void *arg) slot_store_error_callback(void *arg)
{ {
SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg; SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
LogicalRepRelMapEntry *rel;
char *remotetypname;
Oid remotetypoid, Oid remotetypoid,
localtypoid; localtypoid;
if (errarg->attnum < 0) /* Nothing to do if remote attribute number is not set */
if (errarg->remote_attnum < 0)
return; return;
remotetypoid = errarg->rel->atttyps[errarg->attnum]; rel = errarg->rel;
localtypoid = logicalrep_typmap_getid(remotetypoid); remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
/* Fetch remote type name from the LogicalRepTypMap cache */
remotetypname = logicalrep_typmap_gettypname(remotetypoid);
/* Fetch local type OID from the local sys cache */
localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", " errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
"remote type %s, local type %s", "remote type %s, local type %s",
errarg->rel->nspname, errarg->rel->relname, rel->remoterel.nspname, rel->remoterel.relname,
errarg->rel->attnames[errarg->attnum], rel->remoterel.attnames[errarg->remote_attnum],
format_type_be(remotetypoid), remotetypname,
format_type_be(localtypoid)); format_type_be(localtypoid));
} }
...@@ -315,8 +326,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, ...@@ -315,8 +326,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot); ExecClearTuple(slot);
/* Push callback + info on the error context stack */ /* Push callback + info on the error context stack */
errarg.rel = &rel->remoterel; errarg.rel = rel;
errarg.attnum = -1; errarg.local_attnum = -1;
errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback; errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg; errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack; errcallback.previous = error_context_stack;
...@@ -334,14 +346,17 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, ...@@ -334,14 +346,17 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput; Oid typinput;
Oid typioparam; Oid typioparam;
errarg.attnum = remoteattnum; errarg.local_attnum = i;
errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam); getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput, slot->tts_values[i] =
values[remoteattnum], OidInputFunctionCall(typinput, values[remoteattnum],
typioparam, typioparam, att->atttypmod);
att->atttypmod);
slot->tts_isnull[i] = false; slot->tts_isnull[i] = false;
errarg.local_attnum = -1;
errarg.remote_attnum = -1;
} }
else else
{ {
...@@ -380,8 +395,9 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, ...@@ -380,8 +395,9 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot); ExecClearTuple(slot);
/* Push callback + info on the error context stack */ /* Push callback + info on the error context stack */
errarg.rel = &rel->remoterel; errarg.rel = rel;
errarg.attnum = -1; errarg.local_attnum = -1;
errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback; errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg; errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack; errcallback.previous = error_context_stack;
...@@ -404,14 +420,17 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, ...@@ -404,14 +420,17 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput; Oid typinput;
Oid typioparam; Oid typioparam;
errarg.attnum = remoteattnum; errarg.local_attnum = i;
errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam); getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput, slot->tts_values[i] =
values[remoteattnum], OidInputFunctionCall(typinput, values[remoteattnum],
typioparam, typioparam, att->atttypmod);
att->atttypmod);
slot->tts_isnull[i] = false; slot->tts_isnull[i] = false;
errarg.local_attnum = -1;
errarg.remote_attnum = -1;
} }
else else
{ {
......
...@@ -55,10 +55,9 @@ typedef struct LogicalRepRelation ...@@ -55,10 +55,9 @@ typedef struct LogicalRepRelation
/* Type mapping info */ /* Type mapping info */
typedef struct LogicalRepTyp typedef struct LogicalRepTyp
{ {
Oid remoteid; /* unique id of the type */ Oid remoteid; /* unique id of the remote type */
char *nspname; /* schema name */ char *nspname; /* schema name of remote type */
char *typname; /* name of the type */ char *typname; /* name of the remote type */
Oid typoid; /* local type Oid */
} LogicalRepTyp; } LogicalRepTyp;
/* Transaction info */ /* Transaction info */
......
...@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, ...@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode); LOCKMODE lockmode);
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp); extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
extern Oid logicalrep_typmap_getid(Oid remoteid); extern char *logicalrep_typmap_gettypname(Oid remoteid);
#endif /* LOGICALRELATION_H */ #endif /* LOGICALRELATION_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment