Commit 272adf4f authored by Peter Eisentraut's avatar Peter Eisentraut

Disallow CREATE/DROP SUBSCRIPTION in transaction block

Disallow CREATE SUBSCRIPTION and DROP SUBSCRIPTION in a transaction
block when the replication slot is to be created or dropped, since that
cannot be rolled back.

based on patch by Masahiko Sawada <sawada.mshk@gmail.com>
parent 34730273
...@@ -51,6 +51,11 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl ...@@ -51,6 +51,11 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl
subscription at the commit of the transaction where this command is run. subscription at the commit of the transaction where this command is run.
</para> </para>
<para>
<command>CREATE SUBSCRIPTION</command> cannot be executed inside a
transaction block when <literal>CREATE SLOT</literal> is specified.
</para>
<para> <para>
Additional info about subscriptions and logical replication as a whole Additional info about subscriptions and logical replication as a whole
can is available at <xref linkend="logical-replication-subscription"> and can is available at <xref linkend="logical-replication-subscription"> and
......
...@@ -38,8 +38,8 @@ DROP SUBSCRIPTION [ IF EXISTS ] <replaceable class="parameter">name</replaceable ...@@ -38,8 +38,8 @@ DROP SUBSCRIPTION [ IF EXISTS ] <replaceable class="parameter">name</replaceable
</para> </para>
<para> <para>
The replication worker associated with the subscription will not stop until <command>DROP SUBSCRIPTION</command> cannot be executed inside a
after the transaction that issued this command has committed. transaction block when <literal>DROP SLOT</literal> is specified.
</para> </para>
</refsect1> </refsect1>
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "access/heapam.h" #include "access/heapam.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "catalog/objectaccess.h" #include "catalog/objectaccess.h"
...@@ -204,7 +205,7 @@ publicationListToArray(List *publist) ...@@ -204,7 +205,7 @@ publicationListToArray(List *publist)
* Create new subscription. * Create new subscription.
*/ */
ObjectAddress ObjectAddress
CreateSubscription(CreateSubscriptionStmt *stmt) CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
{ {
Relation rel; Relation rel;
ObjectAddress myself; ObjectAddress myself;
...@@ -221,6 +222,23 @@ CreateSubscription(CreateSubscriptionStmt *stmt) ...@@ -221,6 +222,23 @@ CreateSubscription(CreateSubscriptionStmt *stmt)
bool create_slot; bool create_slot;
List *publications; List *publications;
/*
* Parse and check options.
* Connection and publication should not be specified here.
*/
parse_subscription_options(stmt->options, NULL, NULL,
&enabled_given, &enabled,
&create_slot, &slotname);
/*
* Since creating a replication slot is not transactional, rolling back
* the transaction leaves the created replication slot. So we cannot run
* CREATE SUBSCRIPTION inside a transaction block if creating a
* replication slot.
*/
if (create_slot)
PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... CREATE SLOT");
if (!superuser()) if (!superuser())
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
...@@ -239,13 +257,6 @@ CreateSubscription(CreateSubscriptionStmt *stmt) ...@@ -239,13 +257,6 @@ CreateSubscription(CreateSubscriptionStmt *stmt)
stmt->subname))); stmt->subname)));
} }
/*
* Parse and check options.
* Connection and publication should not be specified here.
*/
parse_subscription_options(stmt->options, NULL, NULL,
&enabled_given, &enabled,
&create_slot, &slotname);
if (slotname == NULL) if (slotname == NULL)
slotname = stmt->subname; slotname = stmt->subname;
...@@ -424,7 +435,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ...@@ -424,7 +435,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
* Drop a subscription * Drop a subscription
*/ */
void void
DropSubscription(DropSubscriptionStmt *stmt) DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
{ {
Relation rel; Relation rel;
ObjectAddress myself; ObjectAddress myself;
...@@ -441,6 +452,15 @@ DropSubscription(DropSubscriptionStmt *stmt) ...@@ -441,6 +452,15 @@ DropSubscription(DropSubscriptionStmt *stmt)
WalReceiverConn *wrconn = NULL; WalReceiverConn *wrconn = NULL;
StringInfoData cmd; StringInfoData cmd;
/*
* Since dropping a replication slot is not transactional, the replication
* slot stays dropped even if the transaction rolls back. So we cannot
* run DROP SUBSCRIPTION inside a transaction block if dropping the
* replication slot.
*/
if (stmt->drop_slot)
PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT");
rel = heap_open(SubscriptionRelationId, RowExclusiveLock); rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId, tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
......
...@@ -1609,7 +1609,8 @@ ProcessUtilitySlow(ParseState *pstate, ...@@ -1609,7 +1609,8 @@ ProcessUtilitySlow(ParseState *pstate,
break; break;
case T_CreateSubscriptionStmt: case T_CreateSubscriptionStmt:
address = CreateSubscription((CreateSubscriptionStmt *) parsetree); address = CreateSubscription((CreateSubscriptionStmt *) parsetree,
isTopLevel);
break; break;
case T_AlterSubscriptionStmt: case T_AlterSubscriptionStmt:
...@@ -1617,7 +1618,7 @@ ProcessUtilitySlow(ParseState *pstate, ...@@ -1617,7 +1618,7 @@ ProcessUtilitySlow(ParseState *pstate,
break; break;
case T_DropSubscriptionStmt: case T_DropSubscriptionStmt:
DropSubscription((DropSubscriptionStmt *) parsetree); DropSubscription((DropSubscriptionStmt *) parsetree, isTopLevel);
/* no commands stashed for DROP */ /* no commands stashed for DROP */
commandCollected = true; commandCollected = true;
break; break;
......
...@@ -18,9 +18,10 @@ ...@@ -18,9 +18,10 @@
#include "catalog/objectaddress.h" #include "catalog/objectaddress.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
extern ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt); extern ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt,
bool isTopLevel);
extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt); extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt);
extern void DropSubscription(DropSubscriptionStmt *stmt); extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId); extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
......
...@@ -14,6 +14,11 @@ ERROR: syntax error at or near "PUBLICATION" ...@@ -14,6 +14,11 @@ ERROR: syntax error at or near "PUBLICATION"
LINE 1: CREATE SUBSCRIPTION testsub PUBLICATION foo; LINE 1: CREATE SUBSCRIPTION testsub PUBLICATION foo;
^ ^
set client_min_messages to error; set client_min_messages to error;
-- fail - cannot do CREATE SUBSCRIPTION CREATE SLOT inside transaction block
BEGIN;
CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub WITH (CREATE SLOT);
ERROR: CREATE SUBSCRIPTION ... CREATE SLOT cannot run inside a transaction block
COMMIT;
CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub; CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub;
ERROR: invalid connection string syntax: missing "=" after "testconn" in connection info string ERROR: invalid connection string syntax: missing "=" after "testconn" in connection info string
...@@ -69,6 +74,13 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; ...@@ -69,6 +74,13 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
testsub_foo | regress_subscription_user | f | {testpub,testpub1} testsub_foo | regress_subscription_user | f | {testpub,testpub1}
(1 row) (1 row)
-- fail - cannot do DROP SUBSCRIPTION DROP SLOT inside transaction block
BEGIN;
DROP SUBSCRIPTION testsub DROP SLOT;
ERROR: DROP SUBSCRIPTION ... DROP SLOT cannot run inside a transaction block
COMMIT;
BEGIN;
DROP SUBSCRIPTION testsub_foo NODROP SLOT; DROP SUBSCRIPTION testsub_foo NODROP SLOT;
COMMIT;
RESET SESSION AUTHORIZATION; RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user;
...@@ -12,6 +12,11 @@ CREATE SUBSCRIPTION testsub CONNECTION 'foo'; ...@@ -12,6 +12,11 @@ CREATE SUBSCRIPTION testsub CONNECTION 'foo';
CREATE SUBSCRIPTION testsub PUBLICATION foo; CREATE SUBSCRIPTION testsub PUBLICATION foo;
set client_min_messages to error; set client_min_messages to error;
-- fail - cannot do CREATE SUBSCRIPTION CREATE SLOT inside transaction block
BEGIN;
CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub WITH (CREATE SLOT);
COMMIT;
CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub; CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub;
CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (DISABLED, NOCREATE SLOT); CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (DISABLED, NOCREATE SLOT);
reset client_min_messages; reset client_min_messages;
...@@ -42,7 +47,14 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; ...@@ -42,7 +47,14 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
\dRs \dRs
-- fail - cannot do DROP SUBSCRIPTION DROP SLOT inside transaction block
BEGIN;
DROP SUBSCRIPTION testsub DROP SLOT;
COMMIT;
BEGIN;
DROP SUBSCRIPTION testsub_foo NODROP SLOT; DROP SUBSCRIPTION testsub_foo NODROP SLOT;
COMMIT;
RESET SESSION AUTHORIZATION; RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment