Commit db16c656 authored by Alvaro Herrera's avatar Alvaro Herrera

Rename the logical replication global "wrconn"

The worker.c global wrconn is only meant to be used by logical apply/
tablesync workers, but there are other variables with the same name. To
reduce future confusion rename the global from "wrconn" to
"LogRepWorkerWalRcvConn".

While this is just cosmetic, it seems better to backpatch it all the way
back to 10 where this code appeared, to avoid future backpatching
issues.

Author: Peter Smith <smithpb2250@gmail.com>
Discussion: https://postgr.es/m/CAHut+Pu7Jv9L2BOEx_Z0UtJxfDevQSAUW2mJqWU+CtmDrEZVAg@mail.gmail.com
parent 7dde9872
...@@ -643,8 +643,8 @@ static void ...@@ -643,8 +643,8 @@ static void
logicalrep_worker_onexit(int code, Datum arg) logicalrep_worker_onexit(int code, Datum arg)
{ {
/* Disconnect gracefully from the remote side. */ /* Disconnect gracefully from the remote side. */
if (wrconn) if (LogRepWorkerWalRcvConn)
walrcv_disconnect(wrconn); walrcv_disconnect(LogRepWorkerWalRcvConn);
logicalrep_worker_detach(); logicalrep_worker_detach();
......
...@@ -302,8 +302,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) ...@@ -302,8 +302,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn); MyLogicalRepWorker->relstate_lsn);
/* End wal streaming so wrconn can be re-used to drop the slot. */ /*
walrcv_endstreaming(wrconn, &tli); * End streaming so that LogRepWorkerWalRcvConn can be used to drop
* the slot.
*/
walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
/* /*
* Cleanup the tablesync slot. * Cleanup the tablesync slot.
...@@ -322,7 +325,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) ...@@ -322,7 +325,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
* otherwise, it won't be dropped till the corresponding subscription * otherwise, it won't be dropped till the corresponding subscription
* is dropped. So passing missing_ok = false. * is dropped. So passing missing_ok = false.
*/ */
ReplicationSlotDropAtPubNode(wrconn, syncslotname, false); ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
finish_sync_worker(); finish_sync_worker();
} }
...@@ -642,7 +645,7 @@ copy_read_data(void *outbuf, int minread, int maxread) ...@@ -642,7 +645,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
for (;;) for (;;)
{ {
/* Try read the data. */ /* Try read the data. */
len = walrcv_receive(wrconn, &buf, &fd); len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
...@@ -715,7 +718,8 @@ fetch_remote_table_info(char *nspname, char *relname, ...@@ -715,7 +718,8 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND c.relname = %s", " AND c.relname = %s",
quote_literal_cstr(nspname), quote_literal_cstr(nspname),
quote_literal_cstr(relname)); quote_literal_cstr(relname));
res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow); res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
lengthof(tableRow), tableRow);
if (res->status != WALRCV_OK_TUPLES) if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR, ereport(ERROR,
...@@ -752,9 +756,11 @@ fetch_remote_table_info(char *nspname, char *relname, ...@@ -752,9 +756,11 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND a.attrelid = %u" " AND a.attrelid = %u"
" ORDER BY a.attnum", " ORDER BY a.attnum",
lrel->remoteid, lrel->remoteid,
(walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""), (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
"AND a.attgenerated = ''" : ""),
lrel->remoteid); lrel->remoteid);
res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow); res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
lengthof(attrRow), attrRow);
if (res->status != WALRCV_OK_TUPLES) if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR, ereport(ERROR,
...@@ -841,7 +847,7 @@ copy_table(Relation rel) ...@@ -841,7 +847,7 @@ copy_table(Relation rel)
appendStringInfo(&cmd, " FROM %s) TO STDOUT", appendStringInfo(&cmd, " FROM %s) TO STDOUT",
quote_qualified_identifier(lrel.nspname, lrel.relname)); quote_qualified_identifier(lrel.nspname, lrel.relname));
} }
res = walrcv_exec(wrconn, cmd.data, 0, NULL); res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
pfree(cmd.data); pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT) if (res->status != WALRCV_OK_COPY_OUT)
ereport(ERROR, ereport(ERROR,
...@@ -957,8 +963,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -957,8 +963,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* application_name, so that it is different from the main apply worker, * application_name, so that it is different from the main apply worker,
* so that synchronous replication can distinguish them. * so that synchronous replication can distinguish them.
*/ */
wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err); LogRepWorkerWalRcvConn =
if (wrconn == NULL) walrcv_connect(MySubscription->conninfo, true, slotname, &err);
if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR, ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err))); (errmsg("could not connect to the publisher: %s", err)));
...@@ -985,7 +992,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -985,7 +992,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* breakdown then it wouldn't have succeeded so trying it next time * breakdown then it wouldn't have succeeded so trying it next time
* seems like a better bet. * seems like a better bet.
*/ */
ReplicationSlotDropAtPubNode(wrconn, slotname, true); ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
} }
else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY) else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
{ {
...@@ -1038,7 +1045,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -1038,7 +1045,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* ensures that both the replication slot we create (see below) and the * ensures that both the replication slot we create (see below) and the
* COPY are consistent with each other. * COPY are consistent with each other.
*/ */
res = walrcv_exec(wrconn, res = walrcv_exec(LogRepWorkerWalRcvConn,
"BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ", "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
0, NULL); 0, NULL);
if (res->status != WALRCV_OK_COMMAND) if (res->status != WALRCV_OK_COMMAND)
...@@ -1058,7 +1065,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -1058,7 +1065,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* slot leading to a dangling slot on the server. * slot leading to a dangling slot on the server.
*/ */
HOLD_INTERRUPTS(); HOLD_INTERRUPTS();
walrcv_create_slot(wrconn, slotname, false /* permanent */ , walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
CRS_USE_SNAPSHOT, origin_startpos); CRS_USE_SNAPSHOT, origin_startpos);
RESUME_INTERRUPTS(); RESUME_INTERRUPTS();
...@@ -1100,7 +1107,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -1100,7 +1107,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
copy_table(rel); copy_table(rel);
PopActiveSnapshot(); PopActiveSnapshot();
res = walrcv_exec(wrconn, "COMMIT", 0, NULL); res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND) if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR, ereport(ERROR,
(errmsg("table copy could not finish transaction on publisher: %s", (errmsg("table copy could not finish transaction on publisher: %s",
......
...@@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL; ...@@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL;
/* per stream context for streaming transactions */ /* per stream context for streaming transactions */
static MemoryContext LogicalStreamingContext = NULL; static MemoryContext LogicalStreamingContext = NULL;
WalReceiverConn *wrconn = NULL; WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
Subscription *MySubscription = NULL; Subscription *MySubscription = NULL;
bool MySubscriptionValid = false; bool MySubscriptionValid = false;
...@@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextSwitchTo(ApplyMessageContext); MemoryContextSwitchTo(ApplyMessageContext);
len = walrcv_receive(wrconn, &buf, &fd); len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
if (len != 0) if (len != 0)
{ {
...@@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextReset(ApplyMessageContext); MemoryContextReset(ApplyMessageContext);
} }
len = walrcv_receive(wrconn, &buf, &fd); len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
} }
} }
...@@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
} }
/* All done */ /* All done */
walrcv_endstreaming(wrconn, &tli); walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
} }
/* /*
...@@ -2396,7 +2396,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) ...@@ -2396,7 +2396,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
LSN_FORMAT_ARGS(writepos), LSN_FORMAT_ARGS(writepos),
LSN_FORMAT_ARGS(flushpos)); LSN_FORMAT_ARGS(flushpos));
walrcv_send(wrconn, reply_message->data, reply_message->len); walrcv_send(LogRepWorkerWalRcvConn,
reply_message->data, reply_message->len);
if (recvpos > last_recvpos) if (recvpos > last_recvpos)
last_recvpos = recvpos; last_recvpos = recvpos;
...@@ -3090,9 +3091,9 @@ ApplyWorkerMain(Datum main_arg) ...@@ -3090,9 +3091,9 @@ ApplyWorkerMain(Datum main_arg)
origin_startpos = replorigin_session_get_progress(false); origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand(); CommitTransactionCommand();
wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name, LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
&err); MySubscription->name, &err);
if (wrconn == NULL) if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR, ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err))); (errmsg("could not connect to the publisher: %s", err)));
...@@ -3100,7 +3101,7 @@ ApplyWorkerMain(Datum main_arg) ...@@ -3100,7 +3101,7 @@ ApplyWorkerMain(Datum main_arg)
* We don't really use the output identify_system for anything but it * We don't really use the output identify_system for anything but it
* does some initializations on the upstream so let's still call it. * does some initializations on the upstream so let's still call it.
*/ */
(void) walrcv_identify_system(wrconn, &startpointTLI); (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
} }
/* /*
...@@ -3116,14 +3117,14 @@ ApplyWorkerMain(Datum main_arg) ...@@ -3116,14 +3117,14 @@ ApplyWorkerMain(Datum main_arg)
options.startpoint = origin_startpos; options.startpoint = origin_startpos;
options.slotname = myslotname; options.slotname = myslotname;
options.proto.logical.proto_version = options.proto.logical.proto_version =
walrcv_server_version(wrconn) >= 140000 ? walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM; LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary; options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream; options.proto.logical.streaming = MySubscription->stream;
/* Start normal logical streaming replication. */ /* Start normal logical streaming replication. */
walrcv_startstreaming(wrconn, &options); walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
/* Run the main loop. */ /* Run the main loop. */
LogicalRepApplyLoop(origin_startpos); LogicalRepApplyLoop(origin_startpos);
......
...@@ -62,7 +62,7 @@ typedef struct LogicalRepWorker ...@@ -62,7 +62,7 @@ typedef struct LogicalRepWorker
extern MemoryContext ApplyContext; extern MemoryContext ApplyContext;
/* libpqreceiver connection */ /* libpqreceiver connection */
extern struct WalReceiverConn *wrconn; extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
/* Worker and subscription objects. */ /* Worker and subscription objects. */
extern Subscription *MySubscription; extern Subscription *MySubscription;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment