Commit 6b787d9e authored by Tom Lane's avatar Tom Lane

Improve SQLSTATE reporting in some replication-related code.

I started out with the goal of reporting ERRCODE_CONNECTION_FAILURE
when walrcv_connect() fails, but as I looked around I realized that
whoever wrote this code was of the opinion that errcodes are purely
optional.  That's not my understanding of our project policy.  Hence,
make sure that an errcode is provided in each ereport that (a) is
ERROR or higher level and (b) isn't arguably an internal logic error.
Also fix some very dubious existing errcode assignments.

While this is not per policy, it's also largely cosmetic, since few
of these cases could get reported to applications.  So I don't
feel a need to back-patch.

Discussion: https://postgr.es/m/2189704.1623512522@sss.pgh.pa.us
parent d0303bc8
...@@ -468,7 +468,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) ...@@ -468,7 +468,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
if (!wrconn) if (!wrconn)
ereport(ERROR, ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err))); (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the publisher: %s", err)));
PG_TRY(); PG_TRY();
{ {
...@@ -565,7 +566,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) ...@@ -565,7 +566,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
if (!wrconn) if (!wrconn)
ereport(ERROR, ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err))); (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the publisher: %s", err)));
PG_TRY(); PG_TRY();
{ {
...@@ -820,7 +822,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) ...@@ -820,7 +822,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
{ {
if (sub->enabled && !slotname) if (sub->enabled && !slotname)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot set %s for enabled subscription", errmsg("cannot set %s for enabled subscription",
"slot_name = NONE"))); "slot_name = NONE")));
...@@ -876,7 +878,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) ...@@ -876,7 +878,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
if (!sub->slotname && enabled) if (!sub->slotname && enabled)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot enable subscription that does not have a slot name"))); errmsg("cannot enable subscription that does not have a slot name")));
values[Anum_pg_subscription_subenabled - 1] = values[Anum_pg_subscription_subenabled - 1] =
...@@ -928,7 +930,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) ...@@ -928,7 +930,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
{ {
if (!sub->enabled) if (!sub->enabled)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
...@@ -976,7 +978,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) ...@@ -976,7 +978,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
{ {
if (!sub->enabled) if (!sub->enabled)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
...@@ -997,7 +999,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) ...@@ -997,7 +999,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
if (!sub->enabled) if (!sub->enabled)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
parse_subscription_options(stmt->options, parse_subscription_options(stmt->options,
...@@ -1354,7 +1356,8 @@ ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missi ...@@ -1354,7 +1356,8 @@ ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missi
{ {
/* ERROR. */ /* ERROR. */
ereport(ERROR, ereport(ERROR,
(errmsg("could not drop replication slot \"%s\" on publisher: %s", (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not drop replication slot \"%s\" on publisher: %s",
slotname, res->err))); slotname, res->err)));
} }
...@@ -1505,7 +1508,8 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) ...@@ -1505,7 +1508,8 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
if (res->status != WALRCV_OK_TUPLES) if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive list of replicated tables from the publisher: %s", (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not receive list of replicated tables from the publisher: %s",
res->err))); res->err)));
/* Process tables. */ /* Process tables. */
...@@ -1569,7 +1573,8 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err) ...@@ -1569,7 +1573,8 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
} }
ereport(ERROR, ereport(ERROR,
(errmsg("could not connect to publisher when attempting to " (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to publisher when attempting to "
"drop replication slot \"%s\": %s", slotname, err), "drop replication slot \"%s\": %s", slotname, err),
/* translator: %s is an SQL ALTER command */ /* translator: %s is an SQL ALTER command */
errhint("Use %s to disassociate the subscription from the slot.", errhint("Use %s to disassociate the subscription from the slot.",
...@@ -1601,7 +1606,7 @@ check_duplicates_in_publist(List *publist, Datum *datums) ...@@ -1601,7 +1606,7 @@ check_duplicates_in_publist(List *publist, Datum *datums)
if (strcmp(name, pname) == 0) if (strcmp(name, pname) == 0)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("publication name \"%s\" used more than once", errmsg("publication name \"%s\" used more than once",
pname))); pname)));
} }
...@@ -1659,7 +1664,7 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char * ...@@ -1659,7 +1664,7 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *
oldpublist = lappend(oldpublist, makeString(name)); oldpublist = lappend(oldpublist, makeString(name));
else if (!addpub && !found) else if (!addpub && !found)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("publication \"%s\" is not in subscription \"%s\"", errmsg("publication \"%s\" is not in subscription \"%s\"",
name, subname))); name, subname)));
} }
......
...@@ -278,7 +278,8 @@ libpqrcv_get_conninfo(WalReceiverConn *conn) ...@@ -278,7 +278,8 @@ libpqrcv_get_conninfo(WalReceiverConn *conn)
if (conn_opts == NULL) if (conn_opts == NULL)
ereport(ERROR, ereport(ERROR,
(errmsg("could not parse connection string: %s", (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("could not parse connection string: %s",
_("out of memory")))); _("out of memory"))));
/* build a clean connection string from pieces */ /* build a clean connection string from pieces */
...@@ -350,7 +351,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) ...@@ -350,7 +351,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
{ {
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive database system identifier and timeline ID from " (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive database system identifier and timeline ID from "
"the primary server: %s", "the primary server: %s",
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
} }
...@@ -361,7 +363,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) ...@@ -361,7 +363,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("invalid response from primary server"), (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid response from primary server"),
errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.", errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
ntuples, nfields, 3, 1))); ntuples, nfields, 3, 1)));
} }
...@@ -437,13 +440,15 @@ libpqrcv_startstreaming(WalReceiverConn *conn, ...@@ -437,13 +440,15 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
if (!pubnames_str) if (!pubnames_str)
ereport(ERROR, ereport(ERROR,
(errmsg("could not start WAL streaming: %s", (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str, pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
strlen(pubnames_str)); strlen(pubnames_str));
if (!pubnames_literal) if (!pubnames_literal)
ereport(ERROR, ereport(ERROR,
(errmsg("could not start WAL streaming: %s", (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
appendStringInfo(&cmd, ", publication_names %s", pubnames_literal); appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
PQfreemem(pubnames_literal); PQfreemem(pubnames_literal);
...@@ -472,7 +477,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn, ...@@ -472,7 +477,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
{ {
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("could not start WAL streaming: %s", (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
} }
PQclear(res); PQclear(res);
...@@ -495,7 +501,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) ...@@ -495,7 +501,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
if (PQputCopyEnd(conn->streamConn, NULL) <= 0 || if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
PQflush(conn->streamConn)) PQflush(conn->streamConn))
ereport(ERROR, ereport(ERROR,
(errmsg("could not send end-of-streaming message to primary: %s", (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not send end-of-streaming message to primary: %s",
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
*next_tli = 0; *next_tli = 0;
...@@ -517,7 +524,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) ...@@ -517,7 +524,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
*/ */
if (PQnfields(res) < 2 || PQntuples(res) != 1) if (PQnfields(res) < 2 || PQntuples(res) != 1)
ereport(ERROR, ereport(ERROR,
(errmsg("unexpected result set after end-of-streaming"))); (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected result set after end-of-streaming")));
*next_tli = pg_strtoint32(PQgetvalue(res, 0, 0)); *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
PQclear(res); PQclear(res);
...@@ -531,7 +539,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) ...@@ -531,7 +539,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
/* End the copy */ /* End the copy */
if (PQendcopy(conn->streamConn)) if (PQendcopy(conn->streamConn))
ereport(ERROR, ereport(ERROR,
(errmsg("error while shutting down streaming COPY: %s", (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("error while shutting down streaming COPY: %s",
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
/* CommandComplete should follow */ /* CommandComplete should follow */
...@@ -540,7 +549,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) ...@@ -540,7 +549,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
ereport(ERROR, ereport(ERROR,
(errmsg("error reading result of streaming command: %s", (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("error reading result of streaming command: %s",
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
PQclear(res); PQclear(res);
...@@ -548,7 +558,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) ...@@ -548,7 +558,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
res = libpqrcv_PQgetResult(conn->streamConn); res = libpqrcv_PQgetResult(conn->streamConn);
if (res != NULL) if (res != NULL)
ereport(ERROR, ereport(ERROR,
(errmsg("unexpected result after CommandComplete: %s", (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected result after CommandComplete: %s",
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
} }
...@@ -574,7 +585,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, ...@@ -574,7 +585,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
{ {
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive timeline history file from " (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive timeline history file from "
"the primary server: %s", "the primary server: %s",
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
} }
...@@ -585,7 +597,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, ...@@ -585,7 +597,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("invalid response from primary server"), (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid response from primary server"),
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.", errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
ntuples, nfields))); ntuples, nfields)));
} }
...@@ -746,7 +759,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, ...@@ -746,7 +759,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
/* Try consuming some data. */ /* Try consuming some data. */
if (PQconsumeInput(conn->streamConn) == 0) if (PQconsumeInput(conn->streamConn) == 0)
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s", (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
/* Now that we've consumed some input, try again */ /* Now that we've consumed some input, try again */
...@@ -782,7 +796,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, ...@@ -782,7 +796,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
return -1; return -1;
ereport(ERROR, ereport(ERROR,
(errmsg("unexpected result after CommandComplete: %s", (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected result after CommandComplete: %s",
PQerrorMessage(conn->streamConn)))); PQerrorMessage(conn->streamConn))));
} }
...@@ -797,13 +812,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, ...@@ -797,13 +812,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
{ {
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s", (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
} }
} }
if (rawlen < -1) if (rawlen < -1)
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s", (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
/* Return received messages to caller */ /* Return received messages to caller */
...@@ -822,7 +839,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) ...@@ -822,7 +839,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 || if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
PQflush(conn->streamConn)) PQflush(conn->streamConn))
ereport(ERROR, ereport(ERROR,
(errmsg("could not send data to WAL stream: %s", (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not send data to WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
} }
...@@ -875,7 +893,8 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, ...@@ -875,7 +893,8 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
{ {
PQclear(res); PQclear(res);
ereport(ERROR, ereport(ERROR,
(errmsg("could not create replication slot \"%s\": %s", (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not create replication slot \"%s\": %s",
slotname, pchomp(PQerrorMessage(conn->streamConn))))); slotname, pchomp(PQerrorMessage(conn->streamConn)))));
} }
...@@ -920,7 +939,8 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, ...@@ -920,7 +939,8 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
/* Make sure we got expected number of fields. */ /* Make sure we got expected number of fields. */
if (nfields != nRetTypes) if (nfields != nRetTypes)
ereport(ERROR, ereport(ERROR,
(errmsg("invalid query response"), (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid query response"),
errdetail("Expected %d fields, got %d fields.", errdetail("Expected %d fields, got %d fields.",
nRetTypes, nfields))); nRetTypes, nfields)));
......
...@@ -723,13 +723,15 @@ fetch_remote_table_info(char *nspname, char *relname, ...@@ -723,13 +723,15 @@ fetch_remote_table_info(char *nspname, char *relname,
if (res->status != WALRCV_OK_TUPLES) if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR, ereport(ERROR,
(errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
nspname, relname, res->err))); nspname, relname, res->err)));
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
ereport(ERROR, ereport(ERROR,
(errmsg("table \"%s.%s\" not found on publisher", (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("table \"%s.%s\" not found on publisher",
nspname, relname))); nspname, relname)));
lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
...@@ -764,7 +766,8 @@ fetch_remote_table_info(char *nspname, char *relname, ...@@ -764,7 +766,8 @@ fetch_remote_table_info(char *nspname, char *relname,
if (res->status != WALRCV_OK_TUPLES) if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR, ereport(ERROR,
(errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
nspname, relname, res->err))); nspname, relname, res->err)));
/* We don't know the number of rows coming, so allocate enough space. */ /* We don't know the number of rows coming, so allocate enough space. */
...@@ -851,7 +854,8 @@ copy_table(Relation rel) ...@@ -851,7 +854,8 @@ copy_table(Relation rel)
pfree(cmd.data); pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT) if (res->status != WALRCV_OK_COPY_OUT)
ereport(ERROR, ereport(ERROR,
(errmsg("could not start initial contents copy for table \"%s.%s\": %s", (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not start initial contents copy for table \"%s.%s\": %s",
lrel.nspname, lrel.relname, res->err))); lrel.nspname, lrel.relname, res->err)));
walrcv_clear_result(res); walrcv_clear_result(res);
...@@ -967,7 +971,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -967,7 +971,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
walrcv_connect(MySubscription->conninfo, true, slotname, &err); walrcv_connect(MySubscription->conninfo, true, slotname, &err);
if (LogRepWorkerWalRcvConn == NULL) if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR, ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err))); (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the publisher: %s", err)));
Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT || Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC || MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
...@@ -1050,7 +1055,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -1050,7 +1055,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
0, NULL); 0, NULL);
if (res->status != WALRCV_OK_COMMAND) if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR, ereport(ERROR,
(errmsg("table copy could not start transaction on publisher: %s", (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("table copy could not start transaction on publisher: %s",
res->err))); res->err)));
walrcv_clear_result(res); walrcv_clear_result(res);
...@@ -1110,7 +1116,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -1110,7 +1116,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND) if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR, ereport(ERROR,
(errmsg("table copy could not finish transaction on publisher: %s", (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("table copy could not finish transaction on publisher: %s",
res->err))); res->err)));
walrcv_clear_result(res); walrcv_clear_result(res);
......
...@@ -2388,7 +2388,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -2388,7 +2388,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (now >= timeout) if (now >= timeout)
ereport(ERROR, ereport(ERROR,
(errmsg("terminating logical replication worker due to timeout"))); (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("terminating logical replication worker due to timeout")));
/* Check to see if it's time for a ping. */ /* Check to see if it's time for a ping. */
if (!ping_sent) if (!ping_sent)
...@@ -3207,7 +3208,8 @@ ApplyWorkerMain(Datum main_arg) ...@@ -3207,7 +3208,8 @@ ApplyWorkerMain(Datum main_arg)
MySubscription->name, &err); MySubscription->name, &err);
if (LogRepWorkerWalRcvConn == NULL) if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR, ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err))); (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the publisher: %s", err)));
/* /*
* We don't really use the output identify_system for anything but it * We don't really use the output identify_system for anything but it
......
...@@ -279,10 +279,13 @@ WalReceiverMain(void) ...@@ -279,10 +279,13 @@ WalReceiverMain(void)
PG_SETMASK(&UnBlockSig); PG_SETMASK(&UnBlockSig);
/* Establish the connection to the primary for XLOG streaming */ /* Establish the connection to the primary for XLOG streaming */
wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err); wrconn = walrcv_connect(conninfo, false,
cluster_name[0] ? cluster_name : "walreceiver",
&err);
if (!wrconn) if (!wrconn)
ereport(ERROR, ereport(ERROR,
(errmsg("could not connect to the primary server: %s", err))); (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the primary server: %s", err)));
/* /*
* Save user-visible connection string. This clobbers the original * Save user-visible connection string. This clobbers the original
...@@ -328,7 +331,8 @@ WalReceiverMain(void) ...@@ -328,7 +331,8 @@ WalReceiverMain(void)
if (strcmp(primary_sysid, standby_sysid) != 0) if (strcmp(primary_sysid, standby_sysid) != 0)
{ {
ereport(ERROR, ereport(ERROR,
(errmsg("database system identifier differs between the primary and standby"), (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("database system identifier differs between the primary and standby"),
errdetail("The primary's identifier is %s, the standby's identifier is %s.", errdetail("The primary's identifier is %s, the standby's identifier is %s.",
primary_sysid, standby_sysid))); primary_sysid, standby_sysid)));
} }
...@@ -339,7 +343,8 @@ WalReceiverMain(void) ...@@ -339,7 +343,8 @@ WalReceiverMain(void)
*/ */
if (primaryTLI < startpointTLI) if (primaryTLI < startpointTLI)
ereport(ERROR, ereport(ERROR,
(errmsg("highest timeline %u of the primary is behind recovery timeline %u", (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("highest timeline %u of the primary is behind recovery timeline %u",
primaryTLI, startpointTLI))); primaryTLI, startpointTLI)));
/* /*
...@@ -425,7 +430,8 @@ WalReceiverMain(void) ...@@ -425,7 +430,8 @@ WalReceiverMain(void)
*/ */
if (!RecoveryInProgress()) if (!RecoveryInProgress())
ereport(FATAL, ereport(FATAL,
(errmsg("cannot continue WAL streaming, recovery has already ended"))); (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot continue WAL streaming, recovery has already ended")));
/* Process any requests or signals received recently */ /* Process any requests or signals received recently */
ProcessWalRcvInterrupts(); ProcessWalRcvInterrupts();
...@@ -551,7 +557,8 @@ WalReceiverMain(void) ...@@ -551,7 +557,8 @@ WalReceiverMain(void)
if (now >= timeout) if (now >= timeout)
ereport(ERROR, ereport(ERROR,
(errmsg("terminating walreceiver due to timeout"))); (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("terminating walreceiver due to timeout")));
/* /*
* We didn't receive anything new, for half of * We didn't receive anything new, for half of
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment