Commit 10c0558f authored by Robert Haas's avatar Robert Haas

Fix several mistakes around parallel workers and client_encoding.

Previously, workers sent data to the leader using the client encoding.
That mostly worked, but the leader the converted the data back to the
server encoding.  Since not all encoding conversions are reversible,
that could provoke failures.  Fix by using the database encoding for
all communication between worker and leader.

Also, while temporary changes to GUC settings, as from the SET clause
of a function, are in general OK for parallel query, changing
client_encoding this way inside of a parallel worker is not OK.
Previously, that would have confused the leader; with these changes,
it would not confuse the leader, but it wouldn't do anything either.
So refuse such changes in parallel workers.

Also, the previous code naively assumed that when it received a
NotifyResonse from the worker, it could pass that directly back to the
user.  But now that worker-to-leader communication always uses the
database encoding, that's clearly no longer correct - though,
actually, the old way was always broken for V2 clients.  So
disassemble and reconstitute the message instead.

Issues reported by Peter Eisentraut.  Patch by me, reviewed by
Peter Eisentraut.
parent f8c58554
...@@ -810,7 +810,17 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) ...@@ -810,7 +810,17 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
case 'A': /* NotifyResponse */ case 'A': /* NotifyResponse */
{ {
/* Propagate NotifyResponse. */ /* Propagate NotifyResponse. */
pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1); int32 pid;
const char *channel;
const char *payload;
pid = pq_getmsgint(msg, 4);
channel = pq_getmsgrawstring(msg);
payload = pq_getmsgrawstring(msg);
pq_endmessage(msg);
NotifyMyFrontEnd(channel, payload, pid);
break; break;
} }
...@@ -988,6 +998,12 @@ ParallelWorkerMain(Datum main_arg) ...@@ -988,6 +998,12 @@ ParallelWorkerMain(Datum main_arg)
BackgroundWorkerInitializeConnectionByOid(fps->database_id, BackgroundWorkerInitializeConnectionByOid(fps->database_id,
fps->authenticated_user_id); fps->authenticated_user_id);
/*
* Set the client encoding to the database encoding, since that is what
* the leader will expect.
*/
SetClientEncoding(GetDatabaseEncoding());
/* Restore GUC values from launching backend. */ /* Restore GUC values from launching backend. */
gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC); gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC);
Assert(gucspace != NULL); Assert(gucspace != NULL);
......
...@@ -390,9 +390,6 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, ...@@ -390,9 +390,6 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
char *page_buffer); char *page_buffer);
static void asyncQueueAdvanceTail(void); static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void); static void ProcessIncomingNotify(void);
static void NotifyMyFrontEnd(const char *channel,
const char *payload,
int32 srcPid);
static bool AsyncExistsPendingNotify(const char *channel, const char *payload); static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
static void ClearPendingActionsAndNotifies(void); static void ClearPendingActionsAndNotifies(void);
...@@ -2076,7 +2073,7 @@ ProcessIncomingNotify(void) ...@@ -2076,7 +2073,7 @@ ProcessIncomingNotify(void)
/* /*
* Send NOTIFY message to my front end. * Send NOTIFY message to my front end.
*/ */
static void void
NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
{ {
if (whereToSendOutput == DestRemote) if (whereToSendOutput == DestRemote)
......
...@@ -755,6 +755,30 @@ assign_client_encoding(const char *newval, void *extra) ...@@ -755,6 +755,30 @@ assign_client_encoding(const char *newval, void *extra)
{ {
int encoding = *((int *) extra); int encoding = *((int *) extra);
/*
* Parallel workers send data to the leader, not the client. They always
* send data using the database encoding.
*/
if (IsParallelWorker())
{
/*
* During parallel worker startup, we want to accept the leader's
* client_encoding setting so that anyone who looks at the value in
* the worker sees the same value that they would see in the leader.
*/
if (InitializingParallelWorker)
return;
/*
* A change other than during startup, for example due to a SET clause
* attached to a function definition, should be rejected, as there is
* nothing we can do inside the worker to make it take effect.
*/
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot change client_encoding in a parallel worker")));
}
/* We do not expect an error if PrepareClientEncoding succeeded */ /* We do not expect an error if PrepareClientEncoding succeeded */
if (SetClientEncoding(encoding) < 0) if (SetClientEncoding(encoding) < 0)
elog(LOG, "SetClientEncoding(%d) failed", encoding); elog(LOG, "SetClientEncoding(%d) failed", encoding);
......
...@@ -65,6 +65,7 @@ ...@@ -65,6 +65,7 @@
* pq_copymsgbytes - copy raw data from a message buffer * pq_copymsgbytes - copy raw data from a message buffer
* pq_getmsgtext - get a counted text string (with conversion) * pq_getmsgtext - get a counted text string (with conversion)
* pq_getmsgstring - get a null-terminated text string (with conversion) * pq_getmsgstring - get a null-terminated text string (with conversion)
* pq_getmsgrawstring - get a null-terminated text string - NO conversion
* pq_getmsgend - verify message fully consumed * pq_getmsgend - verify message fully consumed
*/ */
...@@ -639,6 +640,35 @@ pq_getmsgstring(StringInfo msg) ...@@ -639,6 +640,35 @@ pq_getmsgstring(StringInfo msg)
return pg_client_to_server(str, slen); return pg_client_to_server(str, slen);
} }
/* --------------------------------
* pq_getmsgrawstring - get a null-terminated text string - NO conversion
*
* Returns a pointer directly into the message buffer.
* --------------------------------
*/
const char *
pq_getmsgrawstring(StringInfo msg)
{
char *str;
int slen;
str = &msg->data[msg->cursor];
/*
* It's safe to use strlen() here because a StringInfo is guaranteed to
* have a trailing null byte. But check we found a null inside the
* message.
*/
slen = strlen(str);
if (msg->cursor + slen >= msg->len)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid string in message")));
msg->cursor += slen + 1;
return str;
}
/* -------------------------------- /* --------------------------------
* pq_getmsgend - verify message fully consumed * pq_getmsgend - verify message fully consumed
* -------------------------------- * --------------------------------
......
...@@ -232,7 +232,7 @@ pq_parse_errornotice(StringInfo msg, ErrorData *edata) ...@@ -232,7 +232,7 @@ pq_parse_errornotice(StringInfo msg, ErrorData *edata)
pq_getmsgend(msg); pq_getmsgend(msg);
break; break;
} }
value = pq_getmsgstring(msg); value = pq_getmsgrawstring(msg);
switch (code) switch (code)
{ {
......
...@@ -28,6 +28,10 @@ extern volatile sig_atomic_t notifyInterruptPending; ...@@ -28,6 +28,10 @@ extern volatile sig_atomic_t notifyInterruptPending;
extern Size AsyncShmemSize(void); extern Size AsyncShmemSize(void);
extern void AsyncShmemInit(void); extern void AsyncShmemInit(void);
extern void NotifyMyFrontEnd(const char *channel,
const char *payload,
int32 srcPid);
/* notify-related SQL statements */ /* notify-related SQL statements */
extern void Async_Notify(const char *channel, const char *payload); extern void Async_Notify(const char *channel, const char *payload);
extern void Async_Listen(const char *channel); extern void Async_Listen(const char *channel);
......
...@@ -44,6 +44,7 @@ extern const char *pq_getmsgbytes(StringInfo msg, int datalen); ...@@ -44,6 +44,7 @@ extern const char *pq_getmsgbytes(StringInfo msg, int datalen);
extern void pq_copymsgbytes(StringInfo msg, char *buf, int datalen); extern void pq_copymsgbytes(StringInfo msg, char *buf, int datalen);
extern char *pq_getmsgtext(StringInfo msg, int rawbytes, int *nbytes); extern char *pq_getmsgtext(StringInfo msg, int rawbytes, int *nbytes);
extern const char *pq_getmsgstring(StringInfo msg); extern const char *pq_getmsgstring(StringInfo msg);
extern const char *pq_getmsgrawstring(StringInfo msg);
extern void pq_getmsgend(StringInfo msg); extern void pq_getmsgend(StringInfo msg);
#endif /* PQFORMAT_H */ #endif /* PQFORMAT_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment