Commit 6da9759a authored by Peter Eisentraut's avatar Peter Eisentraut

Add RENAME support for PUBLICATIONs and SUBSCRIPTIONs

From: Petr Jelinek <petr.jelinek@2ndquadrant.com>
parent 713f7c47
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "catalog/pg_opclass.h" #include "catalog/pg_opclass.h"
#include "catalog/pg_opfamily.h" #include "catalog/pg_opfamily.h"
#include "catalog/pg_proc.h" #include "catalog/pg_proc.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_ts_config.h" #include "catalog/pg_ts_config.h"
#include "catalog/pg_ts_dict.h" #include "catalog/pg_ts_dict.h"
#include "catalog/pg_ts_parser.h" #include "catalog/pg_ts_parser.h"
...@@ -90,6 +91,12 @@ report_name_conflict(Oid classId, const char *name) ...@@ -90,6 +91,12 @@ report_name_conflict(Oid classId, const char *name)
case LanguageRelationId: case LanguageRelationId:
msgfmt = gettext_noop("language \"%s\" already exists"); msgfmt = gettext_noop("language \"%s\" already exists");
break; break;
case PublicationRelationId:
msgfmt = gettext_noop("publication \"%s\" already exists");
break;
case SubscriptionRelationId:
msgfmt = gettext_noop("subscription \"%s\" already exists");
break;
default: default:
elog(ERROR, "unsupported object class %u", classId); elog(ERROR, "unsupported object class %u", classId);
break; break;
...@@ -256,6 +263,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name) ...@@ -256,6 +263,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
IsThereOpFamilyInNamespace(new_name, opf->opfmethod, IsThereOpFamilyInNamespace(new_name, opf->opfmethod,
opf->opfnamespace); opf->opfnamespace);
} }
else if (classId == SubscriptionRelationId)
{
if (SearchSysCacheExists2(SUBSCRIPTIONNAME, MyDatabaseId,
CStringGetDatum(new_name)))
report_name_conflict(classId, new_name);
}
else if (nameCacheId >= 0) else if (nameCacheId >= 0)
{ {
if (OidIsValid(namespaceId)) if (OidIsValid(namespaceId))
...@@ -364,6 +377,8 @@ ExecRenameStmt(RenameStmt *stmt) ...@@ -364,6 +377,8 @@ ExecRenameStmt(RenameStmt *stmt)
case OBJECT_TSDICTIONARY: case OBJECT_TSDICTIONARY:
case OBJECT_TSPARSER: case OBJECT_TSPARSER:
case OBJECT_TSTEMPLATE: case OBJECT_TSTEMPLATE:
case OBJECT_PUBLICATION:
case OBJECT_SUBSCRIPTION:
{ {
ObjectAddress address; ObjectAddress address;
Relation catalog; Relation catalog;
......
...@@ -8151,6 +8151,15 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name ...@@ -8151,6 +8151,15 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
n->missing_ok = true; n->missing_ok = true;
$$ = (Node *)n; $$ = (Node *)n;
} }
| ALTER PUBLICATION name RENAME TO name
{
RenameStmt *n = makeNode(RenameStmt);
n->renameType = OBJECT_PUBLICATION;
n->object = list_make1(makeString($3));
n->newname = $6;
n->missing_ok = false;
$$ = (Node *)n;
}
| ALTER SCHEMA name RENAME TO name | ALTER SCHEMA name RENAME TO name
{ {
RenameStmt *n = makeNode(RenameStmt); RenameStmt *n = makeNode(RenameStmt);
...@@ -8169,6 +8178,15 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name ...@@ -8169,6 +8178,15 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
n->missing_ok = false; n->missing_ok = false;
$$ = (Node *)n; $$ = (Node *)n;
} }
| ALTER SUBSCRIPTION name RENAME TO name
{
RenameStmt *n = makeNode(RenameStmt);
n->renameType = OBJECT_SUBSCRIPTION;
n->object = list_make1(makeString($3));
n->newname = $6;
n->missing_ok = false;
$$ = (Node *)n;
}
| ALTER TABLE relation_expr RENAME TO name | ALTER TABLE relation_expr RENAME TO name
{ {
RenameStmt *n = makeNode(RenameStmt); RenameStmt *n = makeNode(RenameStmt);
......
...@@ -1259,6 +1259,21 @@ reread_subscription(void) ...@@ -1259,6 +1259,21 @@ reread_subscription(void)
proc_exit(0); proc_exit(0);
} }
/*
* Exit if subscription name was changed (it's used for
* fallback_application_name). The launcher will start new worker.
*/
if (strcmp(newsub->name, MySubscription->name) != 0)
{
ereport(LOG,
(errmsg("logical replication worker for subscription \"%s\" will "
"restart because subscription was renamed",
MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
}
/* /*
* Exit if publication list was changed. The launcher will start * Exit if publication list was changed. The launcher will start
* new worker. * new worker.
...@@ -1292,7 +1307,6 @@ reread_subscription(void) ...@@ -1292,7 +1307,6 @@ reread_subscription(void)
/* Check for other changes that should never happen too. */ /* Check for other changes that should never happen too. */
if (newsub->dbid != MySubscription->dbid || if (newsub->dbid != MySubscription->dbid ||
strcmp(newsub->name, MySubscription->name) != 0 ||
strcmp(newsub->slotname, MySubscription->slotname) != 0) strcmp(newsub->slotname, MySubscription->slotname) != 0)
{ {
elog(ERROR, "subscription %u changed unexpectedly", elog(ERROR, "subscription %u changed unexpectedly",
......
...@@ -1463,7 +1463,8 @@ psql_completion(const char *text, int start, int end) ...@@ -1463,7 +1463,8 @@ psql_completion(const char *text, int start, int end)
/* ALTER PUBLICATION <name> ...*/ /* ALTER PUBLICATION <name> ...*/
else if (Matches3("ALTER","PUBLICATION",MatchAny)) else if (Matches3("ALTER","PUBLICATION",MatchAny))
{ {
COMPLETE_WITH_LIST5("WITH", "ADD TABLE", "SET TABLE", "DROP TABLE", "OWNER TO"); COMPLETE_WITH_LIST6("WITH", "ADD TABLE", "SET TABLE", "DROP TABLE",
"OWNER TO", "RENAME TO");
} }
/* ALTER PUBLICATION <name> .. WITH ( ... */ /* ALTER PUBLICATION <name> .. WITH ( ... */
else if (HeadMatches3("ALTER", "PUBLICATION",MatchAny) && TailMatches2("WITH", "(")) else if (HeadMatches3("ALTER", "PUBLICATION",MatchAny) && TailMatches2("WITH", "("))
...@@ -1474,7 +1475,8 @@ psql_completion(const char *text, int start, int end) ...@@ -1474,7 +1475,8 @@ psql_completion(const char *text, int start, int end)
/* ALTER SUBSCRIPTION <name> ... */ /* ALTER SUBSCRIPTION <name> ... */
else if (Matches3("ALTER","SUBSCRIPTION",MatchAny)) else if (Matches3("ALTER","SUBSCRIPTION",MatchAny))
{ {
COMPLETE_WITH_LIST6("WITH", "CONNECTION", "SET PUBLICATION", "ENABLE", "DISABLE", "OWNER TO"); COMPLETE_WITH_LIST7("WITH", "CONNECTION", "SET PUBLICATION", "ENABLE",
"DISABLE", "OWNER TO", "RENAME TO");
} }
else if (HeadMatches3("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches2("WITH", "(")) else if (HeadMatches3("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches2("WITH", "("))
{ {
......
...@@ -148,7 +148,15 @@ DROP TABLE testpub_tbl1; ...@@ -148,7 +148,15 @@ DROP TABLE testpub_tbl1;
t | t | t t | t | t
(1 row) (1 row)
DROP PUBLICATION testpub_default; ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
\dRp testpub_foo
List of publications
Name | Owner | Inserts | Updates | Deletes
-------------+--------------------------+---------+---------+---------
testpub_foo | regress_publication_user | t | t | t
(1 row)
DROP PUBLICATION testpub_foo;
DROP PUBLICATION testpib_ins_trunct; DROP PUBLICATION testpib_ins_trunct;
DROP PUBLICATION testpub_fortbl; DROP PUBLICATION testpub_fortbl;
DROP SCHEMA pub_test CASCADE; DROP SCHEMA pub_test CASCADE;
......
...@@ -61,6 +61,14 @@ ALTER SUBSCRIPTION testsub DISABLE; ...@@ -61,6 +61,14 @@ ALTER SUBSCRIPTION testsub DISABLE;
(1 row) (1 row)
COMMIT; COMMIT;
DROP SUBSCRIPTION testsub NODROP SLOT; ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
\dRs
List of subscriptions
Name | Owner | Enabled | Publication
-------------+---------------------------+---------+--------------------
testsub_foo | regress_subscription_user | f | {testpub,testpub1}
(1 row)
DROP SUBSCRIPTION testsub_foo NODROP SLOT;
RESET SESSION AUTHORIZATION; RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user;
...@@ -73,7 +73,11 @@ DROP TABLE testpub_tbl1; ...@@ -73,7 +73,11 @@ DROP TABLE testpub_tbl1;
\dRp+ testpub_default \dRp+ testpub_default
DROP PUBLICATION testpub_default; ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
\dRp testpub_foo
DROP PUBLICATION testpub_foo;
DROP PUBLICATION testpib_ins_trunct; DROP PUBLICATION testpib_ins_trunct;
DROP PUBLICATION testpub_fortbl; DROP PUBLICATION testpub_fortbl;
......
...@@ -38,7 +38,11 @@ ALTER SUBSCRIPTION testsub DISABLE; ...@@ -38,7 +38,11 @@ ALTER SUBSCRIPTION testsub DISABLE;
COMMIT; COMMIT;
DROP SUBSCRIPTION testsub NODROP SLOT; ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
\dRs
DROP SUBSCRIPTION testsub_foo NODROP SLOT;
RESET SESSION AUTHORIZATION; RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user;
...@@ -169,8 +169,17 @@ $result = ...@@ -169,8 +169,17 @@ $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full"); $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full");
is($result, qq(11|0|100), 'check replicated insert after alter publication'); is($result, qq(11|0|100), 'check replicated insert after alter publication');
# check restart on rename
$oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';");
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed");
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';")
or die "Timed out while waiting for apply to restart";
# check all the cleanup # check all the cleanup
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed");
$result = $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment