Commit 6f1b9aaa authored by Peter Eisentraut's avatar Peter Eisentraut

Fix logical replication between different encodings

When sending a tuple attribute, the previous coding erroneously sent the
length byte before encoding conversion, which would lead to protocol
failures on the receiving side if the length did not match the following
string.

To fix that, use pq_sendcountedtext() for sending tuple attributes,
which takes care of all of that internally.  To match the API of
pq_sendcountedtext(), send even text values without a trailing zero byte
and have the receiving end put it in place instead.  This matches how
the standard FE/BE protocol behaves.
Reported-by: default avatarKyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>
parent 5f21f529
...@@ -6107,11 +6107,14 @@ TupleData ...@@ -6107,11 +6107,14 @@ TupleData
</varlistentry> </varlistentry>
<varlistentry> <varlistentry>
<term> <term>
String Byte<replaceable>n</replaceable>
</term> </term>
<listitem> <listitem>
<para> <para>
The text value. The value of the column, in text format. (A future release
might support additional formats.)
<replaceable>n</replaceable> is the above length.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
...@@ -417,7 +417,6 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) ...@@ -417,7 +417,6 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
Form_pg_type typclass; Form_pg_type typclass;
Form_pg_attribute att = desc->attrs[i]; Form_pg_attribute att = desc->attrs[i];
char *outputstr; char *outputstr;
int len;
/* skip dropped columns */ /* skip dropped columns */
if (att->attisdropped) if (att->attisdropped)
...@@ -442,10 +441,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) ...@@ -442,10 +441,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
pq_sendbyte(out, 't'); /* 'text' data follows */ pq_sendbyte(out, 't'); /* 'text' data follows */
outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
len = strlen(outputstr) + 1; /* null terminated */ pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
pq_sendint(out, len, 4); /* length */
pq_sendstring(out, outputstr); /* data */
pfree(outputstr); pfree(outputstr);
ReleaseSysCache(typtup); ReleaseSysCache(typtup);
...@@ -492,7 +488,9 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) ...@@ -492,7 +488,9 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
len = pq_getmsgint(in, 4); /* read length */ len = pq_getmsgint(in, 4); /* read length */
/* and data */ /* and data */
tuple->values[i] = (char *) pq_getmsgbytes(in, len); tuple->values[i] = palloc(len + 1);
pq_copymsgbytes(in, tuple->values[i], len);
tuple->values[i][len] = '\0';
} }
break; break;
default: default:
......
# Test replication between databases with different encodings
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 1;
sub wait_for_caught_up
{
my ($node, $appname) = @_;
$node->poll_query_until('postgres',
"SELECT pg_current_wal_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';")
or die "Timed out while waiting for subscriber to catch up";
}
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical', extra => ['--locale=C', '--encoding=UTF8']);
$node_publisher->start;
my $node_subscriber = get_new_node('subscriber');
$node_subscriber->init(allows_streaming => 'logical', extra => ['--locale=C', '--encoding=LATIN1']);
$node_subscriber->start;
my $ddl = "CREATE TABLE test1 (a int, b text);";
$node_publisher->safe_psql('postgres', $ddl);
$node_subscriber->safe_psql('postgres', $ddl);
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
my $appname = 'encoding_test';
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR ALL TABLES;");
$node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION mypub;");
wait_for_caught_up($node_publisher, $appname);
$node_publisher->safe_psql('postgres', q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8
wait_for_caught_up($node_publisher, $appname);
is($node_subscriber->safe_psql('postgres', q{SELECT a FROM test1 WHERE b = E'Mot\xf6rhead'}), # LATIN1
qq(1),
'data replicated to subscriber');
$node_subscriber->stop;
$node_publisher->stop;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment