Commit 944dc0f9 authored by Peter Eisentraut's avatar Peter Eisentraut

Check relkind of tables in CREATE/ALTER SUBSCRIPTION

We used to only check for a supported relkind on the subscriber during
replication, which is needed to ensure that the setup is valid and we
don't crash.  But it's also useful to tell the user immediately when
CREATE or ALTER SUBSCRIPTION is executed that the relation being added
to the subscription is not of a supported relkind.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: default avatartushar <tushar.ahuja@enterprisedb.com>
parent 0fbfb65d
...@@ -33,6 +33,8 @@ ...@@ -33,6 +33,8 @@
#include "commands/event_trigger.h" #include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h" #include "commands/subscriptioncmds.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
#include "replication/logicallauncher.h" #include "replication/logicallauncher.h"
...@@ -417,6 +419,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) ...@@ -417,6 +419,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
relid = RangeVarGetRelid(rv, AccessShareLock, false); relid = RangeVarGetRelid(rv, AccessShareLock, false);
/* Check for supported relkind. */
CheckSubscriptionRelkind(get_rel_relkind(relid),
rv->schemaname, rv->relname);
SetSubscriptionRelState(subid, relid, table_state, SetSubscriptionRelState(subid, relid, table_state,
InvalidXLogRecPtr); InvalidXLogRecPtr);
} }
...@@ -529,6 +535,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) ...@@ -529,6 +535,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
Oid relid; Oid relid;
relid = RangeVarGetRelid(rv, AccessShareLock, false); relid = RangeVarGetRelid(rv, AccessShareLock, false);
/* Check for supported relkind. */
CheckSubscriptionRelkind(get_rel_relkind(relid),
rv->schemaname, rv->relname);
pubrel_local_oids[off++] = relid; pubrel_local_oids[off++] = relid;
if (!bsearch(&relid, subrel_local_oids, if (!bsearch(&relid, subrel_local_oids,
......
...@@ -551,3 +551,23 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd) ...@@ -551,3 +551,23 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
RelationGetRelationName(rel)), RelationGetRelationName(rel)),
errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE."))); errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
} }
/*
* Check if we support writing into specific relkind.
*
* The nspname and relname are only needed for error reporting.
*/
void
CheckSubscriptionRelkind(char relkind, const char *nspname,
const char *relname)
{
/*
* We currently only support writing to regular tables.
*/
if (relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("logical replication target relation \"%s.%s\" is not a table",
nspname, relname)));
}
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "access/sysattr.h" #include "access/sysattr.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_subscription_rel.h" #include "catalog/pg_subscription_rel.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
#include "replication/logicalrelation.h" #include "replication/logicalrelation.h"
#include "replication/worker_internal.h" #include "replication/worker_internal.h"
...@@ -258,15 +259,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) ...@@ -258,15 +259,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
remoterel->nspname, remoterel->relname))); remoterel->nspname, remoterel->relname)));
entry->localrel = heap_open(relid, NoLock); entry->localrel = heap_open(relid, NoLock);
/* /* Check for supported relkind. */
* We currently only support writing to regular and partitioned CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
* tables. remoterel->nspname, remoterel->relname);
*/
if (entry->localrel->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("logical replication target relation \"%s.%s\" is not a table",
remoterel->nspname, remoterel->relname)));
/* /*
* Build the mapping of local attribute numbers to remote attribute * Build the mapping of local attribute numbers to remote attribute
......
...@@ -535,5 +535,7 @@ extern void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, ...@@ -535,5 +535,7 @@ extern void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
TupleTableSlot *searchslot); TupleTableSlot *searchslot);
extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd); extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd);
extern void CheckSubscriptionRelkind(char relkind, const char *nspname,
const char *relname);
#endif /* EXECUTOR_H */ #endif /* EXECUTOR_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment