Commit 9de77b54 authored by Tom Lane's avatar Tom Lane

Allow logical replication to transfer data in binary format.

This patch adds a "binary" option to CREATE/ALTER SUBSCRIPTION.
When that's set, the publisher will send data using the data type's
typsend function if any, rather than typoutput.  This is generally
faster, if slightly less robust.

As committed, we won't try to transfer user-defined array or composite
types in binary, for fear that type OIDs won't match at the subscriber.
This might be changed later, but it seems like fit material for a
follow-on patch.

Dave Cramer, reviewed by Daniel Gustafsson, Petr Jelinek, and others;
adjusted some by me

Discussion: https://postgr.es/m/CADK3HH+R3xMn=8t3Ct+uD+qJ1KD=Hbif5NFMJ+d5DkoCzp6Vgw@mail.gmail.com
parent 9add4050
......@@ -7472,7 +7472,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
(references <link linkend="catalog-pg-database"><structname>pg_database</structname></link>.<structfield>oid</structfield>)
</para>
<para>
OID of the database which the subscription resides in
OID of the database that the subscription resides in
</para></entry>
</row>
......@@ -7500,7 +7500,17 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
<structfield>subenabled</structfield> <type>bool</type>
</para>
<para>
If true, the subscription is enabled and should be replicating.
If true, the subscription is enabled and should be replicating
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>subbinary</structfield> <type>bool</type>
</para>
<para>
If true, the subscription will request that the publisher send data
in binary format
</para></entry>
</row>
......@@ -7518,8 +7528,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
<structfield>subslotname</structfield> <type>name</type>
</para>
<para>
Name of the replication slot in the upstream database. Also used
for local replication origin name.
Name of the replication slot in the upstream database (also used
for the local replication origin name)
</para></entry>
</row>
......@@ -7528,8 +7538,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
<structfield>subsynccommit</structfield> <type>text</type>
</para>
<para>
Contains the value of the <varname>synchronous_commit</varname>
setting for the subscription workers.
The <varname>synchronous_commit</varname>
setting for the subscription's workers to use
</para></entry>
</row>
......@@ -7538,8 +7548,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
<structfield>subpublications</structfield> <type>text[]</type>
</para>
<para>
Array of subscribed publication names. These reference the
publications on the publisher server. For more on publications
Array of subscribed publication names. These reference
publications defined in the upstream database. For more on publications
see <xref linkend="logical-replication-publication"/>.
</para></entry>
</row>
......
......@@ -163,8 +163,10 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<para>
This clause alters parameters originally set by
<xref linkend="sql-createsubscription"/>. See there for more
information. The allowed options are <literal>slot_name</literal> and
<literal>synchronous_commit</literal>
information. The parameters that can be altered
are <literal>slot_name</literal>,
<literal>synchronous_commit</literal>, and
<literal>binary</literal>.
</para>
</listitem>
</varlistentry>
......
......@@ -152,8 +152,9 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
<listitem>
<para>
The value of this parameter overrides the
<xref linkend="guc-synchronous-commit"/> setting. The default
value is <literal>off</literal>.
<xref linkend="guc-synchronous-commit"/> setting within this
subscription's apply worker processes. The default value
is <literal>off</literal>.
</para>
<para>
......@@ -178,6 +179,27 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</listitem>
</varlistentry>
<varlistentry>
<term><literal>binary</literal> (<type>boolean</type>)</term>
<listitem>
<para>
Specifies whether the subscription will request the publisher to
send the data in binary format (as opposed to text).
The default is <literal>false</literal>.
Even when this option is enabled, only data types that have
binary send and receive functions will be transferred in binary.
</para>
<para>
When doing cross-version replication, it could happen that the
publisher has a binary send function for some data type, but the
subscriber lacks a binary receive function for the type. In
such a case, data transfer will fail, and
the <literal>binary</literal> option cannot be used.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>connect</literal> (<type>boolean</type>)</term>
<listitem>
......
......@@ -65,6 +65,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->name = pstrdup(NameStr(subform->subname));
sub->owner = subform->subowner;
sub->enabled = subform->subenabled;
sub->binary = subform->subbinary;
/* Get conninfo */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
......
......@@ -1122,7 +1122,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
-- All columns of pg_subscription except subconninfo are readable.
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications)
GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications)
ON pg_subscription TO public;
......
......@@ -55,11 +55,15 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
* accommodate that.
*/
static void
parse_subscription_options(List *options, bool *connect, bool *enabled_given,
bool *enabled, bool *create_slot,
parse_subscription_options(List *options,
bool *connect,
bool *enabled_given, bool *enabled,
bool *create_slot,
bool *slot_name_given, char **slot_name,
bool *copy_data, char **synchronous_commit,
bool *refresh)
bool *copy_data,
char **synchronous_commit,
bool *refresh,
bool *binary_given, bool *binary)
{
ListCell *lc;
bool connect_given = false;
......@@ -90,6 +94,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
*synchronous_commit = NULL;
if (refresh)
*refresh = true;
if (binary)
{
*binary_given = false;
*binary = false;
}
/* Parse options */
foreach(lc, options)
......@@ -175,6 +184,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
refresh_given = true;
*refresh = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "binary") == 0 && binary)
{
if (*binary_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
*binary_given = true;
*binary = defGetBoolean(defel);
}
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
......@@ -322,6 +341,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
char *conninfo;
char *slotname;
bool slotname_given;
bool binary;
bool binary_given;
char originname[NAMEDATALEN];
bool create_slot;
List *publications;
......@@ -331,10 +352,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
*
* Connection and publication should not be specified here.
*/
parse_subscription_options(stmt->options, &connect, &enabled_given,
&enabled, &create_slot, &slotname_given,
&slotname, &copy_data, &synchronous_commit,
NULL);
parse_subscription_options(stmt->options,
&connect,
&enabled_given, &enabled,
&create_slot,
&slotname_given, &slotname,
&copy_data,
&synchronous_commit,
NULL, /* no "refresh" */
&binary_given, &binary);
/*
* Since creating a replication slot is not transactional, rolling back
......@@ -400,6 +426,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (slotname)
......@@ -669,10 +696,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
char *slotname;
bool slotname_given;
char *synchronous_commit;
parse_subscription_options(stmt->options, NULL, NULL, NULL,
NULL, &slotname_given, &slotname,
NULL, &synchronous_commit, NULL);
bool binary_given;
bool binary;
parse_subscription_options(stmt->options,
NULL, /* no "connect" */
NULL, NULL, /* no "enabled" */
NULL, /* no "create_slot" */
&slotname_given, &slotname,
NULL, /* no "copy_data" */
&synchronous_commit,
NULL, /* no "refresh" */
&binary_given, &binary);
if (slotname_given)
{
......@@ -697,6 +732,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
replaces[Anum_pg_subscription_subsynccommit - 1] = true;
}
if (binary_given)
{
values[Anum_pg_subscription_subbinary - 1] =
BoolGetDatum(binary);
replaces[Anum_pg_subscription_subbinary - 1] = true;
}
update_tuple = true;
break;
}
......@@ -706,9 +748,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
bool enabled,
enabled_given;
parse_subscription_options(stmt->options, NULL,
&enabled_given, &enabled, NULL,
NULL, NULL, NULL, NULL, NULL);
parse_subscription_options(stmt->options,
NULL, /* no "connect" */
&enabled_given, &enabled,
NULL, /* no "create_slot" */
NULL, NULL, /* no "slot_name" */
NULL, /* no "copy_data" */
NULL, /* no "synchronous_commit" */
NULL, /* no "refresh" */
NULL, NULL); /* no "binary" */
Assert(enabled_given);
if (!sub->slotname && enabled)
......@@ -744,9 +792,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
bool copy_data;
bool refresh;
parse_subscription_options(stmt->options, NULL, NULL, NULL,
NULL, NULL, NULL, &copy_data,
NULL, &refresh);
parse_subscription_options(stmt->options,
NULL, /* no "connect" */
NULL, NULL, /* no "enabled" */
NULL, /* no "create_slot" */
NULL, NULL, /* no "slot_name" */
&copy_data,
NULL, /* no "synchronous_commit" */
&refresh,
NULL, NULL); /* no "binary" */
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(stmt->publication);
......@@ -781,9 +835,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
parse_subscription_options(stmt->options, NULL, NULL, NULL,
NULL, NULL, NULL, &copy_data,
NULL, NULL);
parse_subscription_options(stmt->options,
NULL, /* no "connect" */
NULL, NULL, /* no "enabled" */
NULL, /* no "create_slot" */
NULL, NULL, /* no "slot_name" */
&copy_data,
NULL, /* no "synchronous_commit" */
NULL, /* no "refresh" */
NULL, NULL); /* no "binary" */
AlterSubscription_refresh(sub, copy_data);
......
......@@ -424,6 +424,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
PQfreemem(pubnames_literal);
pfree(pubnames_str);
if (options->proto.logical.binary &&
PQserverVersion(conn->streamConn) >= 140000)
appendStringInfoString(&cmd, ", binary 'true'");
appendStringInfoChar(&cmd, ')');
}
else
......
......@@ -17,7 +17,6 @@
#include "catalog/pg_type.h"
#include "libpq/pqformat.h"
#include "replication/logicalproto.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
......@@ -31,7 +30,7 @@
static void logicalrep_write_attrs(StringInfo out, Relation rel);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
HeapTuple tuple);
HeapTuple tuple, bool binary);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
......@@ -139,7 +138,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
* Write INSERT to the output stream.
*/
void
logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary)
{
pq_sendbyte(out, 'I'); /* action INSERT */
......@@ -147,7 +146,7 @@ logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newtuple);
logicalrep_write_tuple(out, rel, newtuple, binary);
}
/*
......@@ -179,7 +178,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
*/
void
logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
HeapTuple newtuple)
HeapTuple newtuple, bool binary)
{
pq_sendbyte(out, 'U'); /* action UPDATE */
......@@ -196,11 +195,11 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldtuple);
logicalrep_write_tuple(out, rel, oldtuple, binary);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newtuple);
logicalrep_write_tuple(out, rel, newtuple, binary);
}
/*
......@@ -248,7 +247,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
* Write DELETE to the output stream.
*/
void
logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary)
{
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
......@@ -264,7 +263,7 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldtuple);
logicalrep_write_tuple(out, rel, oldtuple, binary);
}
/*
......@@ -437,7 +436,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
* Write a tuple to the outputstream, in the most efficient format possible.
*/
static void
logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
{
TupleDesc desc;
Datum values[MaxTupleAttributeNumber];
......@@ -474,12 +473,18 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
if (isnull[i])
{
pq_sendbyte(out, 'n'); /* null column */
pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
continue;
}
else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
{
pq_sendbyte(out, 'u'); /* unchanged toast column */
/*
* Unchanged toasted datum. (Note that we don't promise to detect
* unchanged data in general; this is just a cheap check to avoid
* sending large values unnecessarily.)
*/
pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
continue;
}
......@@ -488,20 +493,48 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
elog(ERROR, "cache lookup failed for type %u", att->atttypid);
typclass = (Form_pg_type) GETSTRUCT(typtup);
pq_sendbyte(out, 't'); /* 'text' data follows */
outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
pfree(outputstr);
/*
* Choose whether to send in binary. Obviously, the option must be
* requested and the type must have a send function. Also, if the
* type is not built-in then it must not be a composite or array type.
* Such types contain type OIDs, which will likely not match at the
* receiver if it's not a built-in type.
*
* XXX this could be relaxed if we changed record_recv and array_recv
* to be less picky.
*
* XXX this fails to apply the restriction to domains over such types.
*/
if (binary &&
OidIsValid(typclass->typsend) &&
(att->atttypid < FirstGenbkiObjectId ||
(typclass->typtype != TYPTYPE_COMPOSITE &&
typclass->typelem == InvalidOid)))
{
bytea *outputbytes;
int len;
pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
len = VARSIZE(outputbytes) - VARHDRSZ;
pq_sendint(out, len, 4); /* length */
pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
pfree(outputbytes);
}
else
{
pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
pfree(outputstr);
}
ReleaseSysCache(typtup);
}
}
/*
* Read tuple in remote format from stream.
*
* The returned tuple points into the input stringinfo.
* Read tuple in logical replication format from stream.
*/
static void
logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
......@@ -512,38 +545,52 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
/* Get number of attributes */
natts = pq_getmsgint(in, 2);
memset(tuple->changed, 0, sizeof(tuple->changed));
/* Allocate space for per-column values; zero out unused StringInfoDatas */
tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
tuple->colstatus = (char *) palloc(natts * sizeof(char));
/* Read the data */
for (i = 0; i < natts; i++)
{
char kind;
int len;
StringInfo value = &tuple->colvalues[i];
kind = pq_getmsgbyte(in);
tuple->colstatus[i] = kind;
switch (kind)
{
case 'n': /* null */
tuple->values[i] = NULL;
tuple->changed[i] = true;
case LOGICALREP_COLUMN_NULL:
/* nothing more to do */
break;
case 'u': /* unchanged column */
case LOGICALREP_COLUMN_UNCHANGED:
/* we don't receive the value of an unchanged column */
tuple->values[i] = NULL;
break;
case 't': /* text formatted value */
{
int len;
tuple->changed[i] = true;
len = pq_getmsgint(in, 4); /* read length */
/* and data */
tuple->values[i] = palloc(len + 1);
pq_copymsgbytes(in, tuple->values[i], len);
tuple->values[i][len] = '\0';
}
case LOGICALREP_COLUMN_TEXT:
len = pq_getmsgint(in, 4); /* read length */
/* and data */
value->data = palloc(len + 1);
pq_copymsgbytes(in, value->data, len);
value->data[len] = '\0';
/* make StringInfo fully valid */
value->len = len;
value->cursor = 0;
value->maxlen = len;
break;
case LOGICALREP_COLUMN_BINARY:
len = pq_getmsgint(in, 4); /* read length */
/* and data */
value->data = palloc(len + 1);
pq_copymsgbytes(in, value->data, len);
/* not strictly necessary but per StringInfo practice */
value->data[len] = '\0';
/* make StringInfo fully valid */
value->len = len;
value->cursor = 0;
value->maxlen = len;
break;
default:
elog(ERROR, "unrecognized data representation type '%c'", kind);
......@@ -552,7 +599,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
}
/*
* Write relation attributes to the stream.
* Write relation attribute metadata to the stream.
*/
static void
logicalrep_write_attrs(StringInfo out, Relation rel)
......@@ -611,7 +658,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
}
/*
* Read relation attribute names from the stream.
* Read relation attribute metadata from the stream.
*/
static void
logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
......
......@@ -319,13 +319,13 @@ slot_store_error_callback(void *arg)
}
/*
* Store data in C string form into slot.
* This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
* use better.
* Store tuple data into slot.
*
* Incoming data can be either text or binary format.
*/
static void
slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
char **values)
slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
LogicalRepTupleData *tupleData)
{
int natts = slot->tts_tupleDescriptor->natts;
int i;
......@@ -343,27 +343,65 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* Call the "in" function for each non-dropped attribute */
/* Call the "in" function for each non-dropped, non-null attribute */
Assert(natts == rel->attrmap->maplen);
for (i = 0; i < natts; i++)
{
Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
int remoteattnum = rel->attrmap->attnums[i];
if (!att->attisdropped && remoteattnum >= 0 &&
values[remoteattnum] != NULL)
if (!att->attisdropped && remoteattnum >= 0)
{
Oid typinput;
Oid typioparam;
StringInfo colvalue = &tupleData->colvalues[remoteattnum];
errarg.local_attnum = i;
errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] =
OidInputFunctionCall(typinput, values[remoteattnum],
typioparam, att->atttypmod);
slot->tts_isnull[i] = false;
if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
{
Oid typinput;
Oid typioparam;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] =
OidInputFunctionCall(typinput, colvalue->data,
typioparam, att->atttypmod);
slot->tts_isnull[i] = false;
}
else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
{
Oid typreceive;
Oid typioparam;
/*
* In some code paths we may be asked to re-parse the same
* tuple data. Reset the StringInfo's cursor so that works.
*/
colvalue->cursor = 0;
getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
slot->tts_values[i] =
OidReceiveFunctionCall(typreceive, colvalue,
typioparam, att->atttypmod);
/* Trouble if it didn't eat the whole buffer */
if (colvalue->cursor != colvalue->len)
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("incorrect binary data format in logical replication column %d",
remoteattnum + 1)));
slot->tts_isnull[i] = false;
}
else
{
/*
* NULL value from remote. (We don't expect to see
* LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
* NULL.)
*/
slot->tts_values[i] = (Datum) 0;
slot->tts_isnull[i] = true;
}
errarg.local_attnum = -1;
errarg.remote_attnum = -1;
......@@ -371,8 +409,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
else
{
/*
* We assign NULL to dropped attributes, NULL values, and missing
* values (missing values should be later filled using
* We assign NULL to dropped attributes and missing values
* (missing values should be later filled using
* slot_fill_defaults).
*/
slot->tts_values[i] = (Datum) 0;
......@@ -387,20 +425,21 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
}
/*
* Replace selected columns with user data provided as C strings.
* Replace updated columns with data from the LogicalRepTupleData struct.
* This is somewhat similar to heap_modify_tuple but also calls the type
* input functions on the user data.
* "slot" is filled with a copy of the tuple in "srcslot", with
* columns selected by the "replaces" array replaced with data values
* from "values".
*
* "slot" is filled with a copy of the tuple in "srcslot", replacing
* columns provided in "tupleData" and leaving others as-is.
*
* Caution: unreplaced pass-by-ref columns in "slot" will point into the
* storage for "srcslot". This is OK for current usage, but someday we may
* need to materialize "slot" at the end to make it independent of "srcslot".
*/
static void
slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
LogicalRepRelMapEntry *rel,
char **values, bool *replaces)
slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
LogicalRepRelMapEntry *rel,
LogicalRepTupleData *tupleData)
{
int natts = slot->tts_tupleDescriptor->natts;
int i;
......@@ -438,31 +477,58 @@ slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
if (remoteattnum < 0)
continue;
if (!replaces[remoteattnum])
continue;
if (values[remoteattnum] != NULL)
if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
{
Oid typinput;
Oid typioparam;
StringInfo colvalue = &tupleData->colvalues[remoteattnum];
errarg.local_attnum = i;
errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] =
OidInputFunctionCall(typinput, values[remoteattnum],
typioparam, att->atttypmod);
slot->tts_isnull[i] = false;
if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
{
Oid typinput;
Oid typioparam;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] =
OidInputFunctionCall(typinput, colvalue->data,
typioparam, att->atttypmod);
slot->tts_isnull[i] = false;
}
else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
{
Oid typreceive;
Oid typioparam;
/*
* In some code paths we may be asked to re-parse the same
* tuple data. Reset the StringInfo's cursor so that works.
*/
colvalue->cursor = 0;
getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
slot->tts_values[i] =
OidReceiveFunctionCall(typreceive, colvalue,
typioparam, att->atttypmod);
/* Trouble if it didn't eat the whole buffer */
if (colvalue->cursor != colvalue->len)
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("incorrect binary data format in logical replication column %d",
remoteattnum + 1)));
slot->tts_isnull[i] = false;
}
else
{
/* must be LOGICALREP_COLUMN_NULL */
slot->tts_values[i] = (Datum) 0;
slot->tts_isnull[i] = true;
}
errarg.local_attnum = -1;
errarg.remote_attnum = -1;
}
else
{
slot->tts_values[i] = (Datum) 0;
slot->tts_isnull[i] = true;
}
}
/* Pop the error context stack */
......@@ -641,7 +707,7 @@ apply_handle_insert(StringInfo s)
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, newtup.values);
slot_store_data(remoteslot, rel, &newtup);
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
......@@ -765,7 +831,7 @@ apply_handle_update(StringInfo s)
target_rte = list_nth(estate->es_range_table, 0);
for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
{
if (newtup.changed[i])
if (newtup.colstatus[i] != LOGICALREP_COLUMN_UNCHANGED)
target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
i + 1 - FirstLowInvalidHeapAttributeNumber);
}
......@@ -776,8 +842,8 @@ apply_handle_update(StringInfo s)
/* Build the search tuple. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel,
has_oldtup ? oldtup.values : newtup.values);
slot_store_data(remoteslot, rel,
has_oldtup ? &oldtup : &newtup);
MemoryContextSwitchTo(oldctx);
/* For a partitioned table, apply update to correct partition. */
......@@ -831,8 +897,7 @@ apply_handle_update_internal(ResultRelInfo *relinfo,
{
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_modify_cstrings(remoteslot, localslot, relmapentry,
newtup->values, newtup->changed);
slot_modify_data(remoteslot, localslot, relmapentry, newtup);
MemoryContextSwitchTo(oldctx);
EvalPlanQualSetSlot(&epqstate, remoteslot);
......@@ -900,7 +965,7 @@ apply_handle_delete(StringInfo s)
/* Build the search tuple. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, oldtup.values);
slot_store_data(remoteslot, rel, &oldtup);
MemoryContextSwitchTo(oldctx);
/* For a partitioned table, apply delete to correct partition. */
......@@ -1096,9 +1161,9 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
if (found)
{
/* Apply the update. */
slot_modify_cstrings(remoteslot_part, localslot,
part_entry,
newtup->values, newtup->changed);
slot_modify_data(remoteslot_part, localslot,
part_entry,
newtup);
MemoryContextSwitchTo(oldctx);
}
else
......@@ -1312,8 +1377,8 @@ apply_handle_truncate(StringInfo s)
}
/*
* Even if we used CASCADE on the upstream primary we explicitly default to
* replaying changes without further cascading. This might be later
* Even if we used CASCADE on the upstream primary we explicitly default
* to replaying changes without further cascading. This might be later
* changeable with a user specified option.
*/
ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
......@@ -1850,60 +1915,21 @@ maybe_reread_subscription(void)
proc_exit(0);
}
/*
* Exit if connection string was changed. The launcher will start new
* worker.
*/
if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
{
ereport(LOG,
(errmsg("logical replication apply worker for subscription \"%s\" will "
"restart because the connection information was changed",
MySubscription->name)));
proc_exit(0);
}
/*
* Exit if subscription name was changed (it's used for
* fallback_application_name). The launcher will start new worker.
*/
if (strcmp(newsub->name, MySubscription->name) != 0)
{
ereport(LOG,
(errmsg("logical replication apply worker for subscription \"%s\" will "
"restart because subscription was renamed",
MySubscription->name)));
proc_exit(0);
}
/* !slotname should never happen when enabled is true. */
Assert(newsub->slotname);
/*
* We need to make new connection to new slot if slot name has changed so
* exit here as well if that's the case.
*/
if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
{
ereport(LOG,
(errmsg("logical replication apply worker for subscription \"%s\" will "
"restart because the replication slot name was changed",
MySubscription->name)));
proc_exit(0);
}
/*
* Exit if publication list was changed. The launcher will start new
* worker.
* Exit if any parameter that affects the remote connection was changed.
* The launcher will start a new worker.
*/
if (!equal(newsub->publications, MySubscription->publications))
if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
strcmp(newsub->name, MySubscription->name) != 0 ||
strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
newsub->binary != MySubscription->binary ||
!equal(newsub->publications, MySubscription->publications))
{
ereport(LOG,
(errmsg("logical replication apply worker for subscription \"%s\" will "
"restart because subscription's publications were changed",
(errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
MySubscription->name)));
proc_exit(0);
......@@ -2106,6 +2132,7 @@ ApplyWorkerMain(Datum main_arg)
options.slotname = myslotname;
options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
/* Start normal logical streaming replication. */
walrcv_startstreaming(wrconn, &options);
......
......@@ -15,6 +15,7 @@
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "commands/defrem.h"
#include "fmgr.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
......@@ -118,11 +119,14 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
static void
parse_output_parameters(List *options, uint32 *protocol_version,
List **publication_names)
List **publication_names, bool *binary)
{
ListCell *lc;
bool protocol_version_given = false;
bool publication_names_given = false;
bool binary_option_given = false;
*binary = false;
foreach(lc, options)
{
......@@ -168,6 +172,16 @@ parse_output_parameters(List *options, uint32 *protocol_version,
(errcode(ERRCODE_INVALID_NAME),
errmsg("invalid publication_names syntax")));
}
else if (strcmp(defel->defname, "binary") == 0)
{
if (binary_option_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
binary_option_given = true;
*binary = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
}
......@@ -202,7 +216,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
/* Parse the params and ERROR if we see any we don't recognize */
parse_output_parameters(ctx->output_plugin_options,
&data->protocol_version,
&data->publication_names);
&data->publication_names,
&data->binary);
/* Check if we support requested protocol */
if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
......@@ -411,7 +426,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, relation, tuple);
logicalrep_write_insert(ctx->out, relation, tuple,
data->binary);
OutputPluginWrite(ctx, true);
break;
}
......@@ -435,7 +451,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, relation, oldtuple, newtuple);
logicalrep_write_update(ctx->out, relation, oldtuple, newtuple,
data->binary);
OutputPluginWrite(ctx, true);
break;
}
......@@ -455,7 +472,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_delete(ctx->out, relation, oldtuple);
logicalrep_write_delete(ctx->out, relation, oldtuple,
data->binary);
OutputPluginWrite(ctx, true);
}
else
......
......@@ -4205,6 +4205,7 @@ getSubscriptions(Archive *fout)
int i_subslotname;
int i_subsynccommit;
int i_subpublications;
int i_subbinary;
int i,
ntups;
......@@ -4229,18 +4230,26 @@ getSubscriptions(Archive *fout)
query = createPQExpBuffer();
resetPQExpBuffer(query);
/* Get the subscriptions in current database. */
appendPQExpBuffer(query,
"SELECT s.tableoid, s.oid, s.subname,"
"(%s s.subowner) AS rolname, "
" s.subconninfo, s.subslotname, s.subsynccommit, "
" s.subpublications "
"FROM pg_subscription s "
"WHERE s.subdbid = (SELECT oid FROM pg_database"
" WHERE datname = current_database())",
"SELECT s.tableoid, s.oid, s.subname,\n"
" (%s s.subowner) AS rolname,\n"
" s.subconninfo, s.subslotname, s.subsynccommit,\n"
" s.subpublications,\n",
username_subquery);
if (fout->remoteVersion >= 140000)
appendPQExpBuffer(query,
" s.subbinary\n");
else
appendPQExpBuffer(query,
" false AS subbinary\n");
appendPQExpBuffer(query,
"FROM pg_subscription s\n"
"WHERE s.subdbid = (SELECT oid FROM pg_database\n"
" WHERE datname = current_database())");
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
ntups = PQntuples(res);
......@@ -4253,6 +4262,7 @@ getSubscriptions(Archive *fout)
i_subslotname = PQfnumber(res, "subslotname");
i_subsynccommit = PQfnumber(res, "subsynccommit");
i_subpublications = PQfnumber(res, "subpublications");
i_subbinary = PQfnumber(res, "subbinary");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
......@@ -4274,6 +4284,8 @@ getSubscriptions(Archive *fout)
pg_strdup(PQgetvalue(res, i, i_subsynccommit));
subinfo[i].subpublications =
pg_strdup(PQgetvalue(res, i, i_subpublications));
subinfo[i].subbinary =
pg_strdup(PQgetvalue(res, i, i_subbinary));
if (strlen(subinfo[i].rolname) == 0)
pg_log_warning("owner of subscription \"%s\" appears to be invalid",
......@@ -4342,6 +4354,9 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
else
appendPQExpBufferStr(query, "NONE");
if (strcmp(subinfo->subbinary, "t") == 0)
appendPQExpBuffer(query, ", binary = true");
if (strcmp(subinfo->subsynccommit, "off") != 0)
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
......
......@@ -625,6 +625,7 @@ typedef struct _SubscriptionInfo
char *rolname;
char *subconninfo;
char *subslotname;
char *subbinary;
char *subsynccommit;
char *subpublications;
} SubscriptionInfo;
......
......@@ -5963,7 +5963,7 @@ describeSubscriptions(const char *pattern, bool verbose)
PGresult *res;
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
false, false};
false, false, false};
if (pset.sversion < 100000)
{
......@@ -5989,6 +5989,12 @@ describeSubscriptions(const char *pattern, bool verbose)
if (verbose)
{
/* Binary mode is only supported in v14 and higher */
if (pset.sversion >= 140000)
appendPQExpBuffer(&buf,
", subbinary AS \"%s\"\n",
gettext_noop("Binary"));
appendPQExpBuffer(&buf,
", subsynccommit AS \"%s\"\n"
", subconninfo AS \"%s\"\n",
......
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202007131
#define CATALOG_VERSION_NO 202007181
#endif
......@@ -48,6 +48,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
bool subenabled; /* True if the subscription is enabled (the
* worker should be running) */
bool subbinary; /* True if the subscription wants the
* publisher to send data in binary */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
......@@ -73,6 +76,8 @@ typedef struct Subscription
char *name; /* Name of the subscription */
Oid owner; /* Oid of the subscription owner */
bool enabled; /* Indicates if the subscription is enabled */
bool binary; /* Indicates if the subscription wants data in
* binary format */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
......
......@@ -30,12 +30,19 @@
/* Tuple coming via logical replication. */
typedef struct LogicalRepTupleData
{
/* column values in text format, or NULL for a null value: */
char *values[MaxTupleAttributeNumber];
/* markers for changed/unchanged column values: */
bool changed[MaxTupleAttributeNumber];
/* Array of StringInfos, one per column; some may be unused */
StringInfoData *colvalues;
/* Array of markers for null/unchanged/text/binary, one per column */
char *colstatus;
} LogicalRepTupleData;
/* Possible values for LogicalRepTupleData.colstatus[colnum] */
/* These values are also used in the on-the-wire protocol */
#define LOGICALREP_COLUMN_NULL 'n'
#define LOGICALREP_COLUMN_UNCHANGED 'u'
#define LOGICALREP_COLUMN_TEXT 't'
#define LOGICALREP_COLUMN_BINARY 'b' /* added in PG14 */
typedef uint32 LogicalRepRelId;
/* Relation information */
......@@ -87,15 +94,15 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
XLogRecPtr origin_lsn);
extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
extern void logicalrep_write_insert(StringInfo out, Relation rel,
HeapTuple newtuple);
HeapTuple newtuple, bool binary);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
HeapTuple newtuple);
HeapTuple newtuple, bool binary);
extern LogicalRepRelId logicalrep_read_update(StringInfo in,
bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup);
extern void logicalrep_write_delete(StringInfo out, Relation rel,
HeapTuple oldtuple);
HeapTuple oldtuple, bool binary);
extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
LogicalRepTupleData *oldtup);
extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[],
......
......@@ -20,11 +20,11 @@ typedef struct PGOutputData
MemoryContext context; /* private memory context for transient
* allocations */
/* client info */
/* client-supplied info: */
uint32 protocol_version;
List *publication_names;
List *publications;
bool binary;
} PGOutputData;
#endif /* PGOUTPUT_H */
......@@ -177,6 +177,7 @@ typedef struct
{
uint32 proto_version; /* Logical protocol version */
List *publication_names; /* String list of publications */
bool binary; /* Ask publisher to use binary */
} logical;
} proto;
} WalRcvStreamOptions;
......
......@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | off | dbname=regress_doesnotexist
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
......@@ -91,10 +91,10 @@ ERROR: subscription "regress_doesnotexist" does not exist
ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
ERROR: unrecognized subscription parameter: "create_slot"
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Synchronous commit | Conninfo
-----------------+---------------------------+---------+---------------------+--------------------+------------------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | off | dbname=regress_doesnotexist2
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
-----------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | dbname=regress_doesnotexist2
(1 row)
BEGIN;
......@@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Synchronous commit | Conninfo
---------------------+---------------------------+---------+---------------------+--------------------+------------------------------
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | local | dbname=regress_doesnotexist2
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
---------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | local | dbname=regress_doesnotexist2
(1 row)
-- rename back to keep the rest simple
......@@ -155,6 +155,29 @@ DROP SUBSCRIPTION IF EXISTS regress_testsub;
NOTICE: subscription "regress_testsub" does not exist, skipping
DROP SUBSCRIPTION regress_testsub; -- fail
ERROR: subscription "regress_testsub" does not exist
-- fail - binary must be boolean
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = foo);
ERROR: binary requires a Boolean value
-- now it works
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | t | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
......
......@@ -117,6 +117,21 @@ COMMIT;
DROP SUBSCRIPTION IF EXISTS regress_testsub;
DROP SUBSCRIPTION regress_testsub; -- fail
-- fail - binary must be boolean
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = foo);
-- now it works
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
\dRs+
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
DROP SUBSCRIPTION regress_testsub;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
......
# Binary mode logical replication test
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 5;
# Create and initialize a publisher node
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;
# Create and initialize subscriber node
my $node_subscriber = get_new_node('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create tables on both sides of the replication
my $ddl = qq(
CREATE TABLE public.test_numerical (
a INTEGER PRIMARY KEY,
b NUMERIC,
c FLOAT,
d BIGINT
);
CREATE TABLE public.test_arrays (
a INTEGER[] PRIMARY KEY,
b NUMERIC[],
c TEXT[]
););
$node_publisher->safe_psql('postgres', $ddl);
$node_subscriber->safe_psql('postgres', $ddl);
# Configure logical replication
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tpub FOR ALL TABLES");
my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres';
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' "
. "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");
# Ensure nodes are in sync with each other
$node_publisher->wait_for_catchup('tsub');
$node_subscriber->poll_query_until('postgres',
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"
) or die "Timed out while waiting for subscriber to synchronize data";
# Insert some content and make sure it's replicated across
$node_publisher->safe_psql(
'postgres', qq(
INSERT INTO public.test_arrays (a, b, c) VALUES
('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
INSERT INTO public.test_numerical (a, b, c, d) VALUES
(1, 1.2, 1.3, 10),
(2, 2.2, 2.3, 20),
(3, 3.2, 3.3, 30);
));
$node_publisher->wait_for_catchup('tsub');
my $result = $node_subscriber->safe_psql('postgres',
"SELECT a, b, c, d FROM test_numerical ORDER BY a");
is( $result, '1|1.2|1.3|10
2|2.2|2.3|20
3|3.2|3.3|30', 'check replicated data on subscriber');
# Test updates as well
$node_publisher->safe_psql(
'postgres', qq(
UPDATE public.test_arrays SET b[1] = 42, c = NULL;
UPDATE public.test_numerical SET b = 42, c = NULL;
));
$node_publisher->wait_for_catchup('tsub');
$result = $node_subscriber->safe_psql('postgres',
"SELECT a, b, c FROM test_arrays ORDER BY a");
is( $result, '{1,2,3}|{42,1.2,1.3}|
{3,1,2}|{42,1.1,1.2}|', 'check updated replicated data on subscriber');
$result = $node_subscriber->safe_psql('postgres',
"SELECT a, b, c, d FROM test_numerical ORDER BY a");
is( $result, '1|42||10
2|42||20
3|42||30', 'check updated replicated data on subscriber');
# Test to reset back to text formatting, and then to binary again
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tsub SET (binary = false);");
$node_publisher->safe_psql(
'postgres', qq(
INSERT INTO public.test_numerical (a, b, c, d) VALUES
(4, 4.2, 4.3, 40);
));
$node_publisher->wait_for_catchup('tsub');
$result = $node_subscriber->safe_psql('postgres',
"SELECT a, b, c, d FROM test_numerical ORDER BY a");
is( $result, '1|42||10
2|42||20
3|42||30
4|4.2|4.3|40', 'check replicated data on subscriber');
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tsub SET (binary = true);");
$node_publisher->safe_psql(
'postgres', qq(
INSERT INTO public.test_arrays (a, b, c) VALUES
('{2,3,1}', '{1.2, 1.3, 1.1}', '{"two", "three", "one"}');
));
$node_publisher->wait_for_catchup('tsub');
$result = $node_subscriber->safe_psql('postgres',
"SELECT a, b, c FROM test_arrays ORDER BY a");
is( $result, '{1,2,3}|{42,1.2,1.3}|
{2,3,1}|{1.2,1.3,1.1}|{two,three,one}
{3,1,2}|{42,1.1,1.2}|', 'check replicated data on subscriber');
$node_subscriber->stop('fast');
$node_publisher->stop('fast');
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment