Commit 46482432 authored by Amit Kapila's avatar Amit Kapila

Add support for streaming to built-in logical replication.

To add support for streaming of in-progress transactions into the
built-in logical replication, we need to do three things:

* Extend the logical replication protocol, so identify in-progress
transactions, and allow adding additional bits of information (e.g.
XID of subtransactions).

* Modify the output plugin (pgoutput) to implement the new stream
API callbacks, by leveraging the extended replication protocol.

* Modify the replication apply worker, to properly handle streamed
in-progress transaction by spilling the data to disk and then
replaying them on commit.

We however must explicitly disable streaming replication during
replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover we don't have a replication connection open so we
don't have where to send the data anyway.

Author: Tomas Vondra, Dilip Kumar and Amit Kapila
Reviewed-by: Amit Kapila, Kuntal Ghosh and Ajin Cherian
Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
parent 66f16306
...@@ -1509,6 +1509,22 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ...@@ -1509,6 +1509,22 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry><literal>WALWrite</literal></entry> <entry><literal>WALWrite</literal></entry>
<entry>Waiting for a write to a WAL file.</entry> <entry>Waiting for a write to a WAL file.</entry>
</row> </row>
<row>
<entry><literal>LogicalChangesRead</literal></entry>
<entry>Waiting for a read from a logical changes file.</entry>
</row>
<row>
<entry><literal>LogicalChangesWrite</literal></entry>
<entry>Waiting for a write to a logical changes file.</entry>
</row>
<row>
<entry><literal>LogicalSubxactRead</literal></entry>
<entry>Waiting for a read from a logical subxact file.</entry>
</row>
<row>
<entry><literal>LogicalSubxactWrite</literal></entry>
<entry>Waiting for a write to a logical subxact file.</entry>
</row>
</tbody> </tbody>
</tgroup> </tgroup>
</table> </table>
......
...@@ -165,8 +165,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < ...@@ -165,8 +165,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<xref linkend="sql-createsubscription"/>. See there for more <xref linkend="sql-createsubscription"/>. See there for more
information. The parameters that can be altered information. The parameters that can be altered
are <literal>slot_name</literal>, are <literal>slot_name</literal>,
<literal>synchronous_commit</literal>, and <literal>synchronous_commit</literal>,
<literal>binary</literal>. <literal>binary</literal>, and
<literal>streaming</literal>.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
...@@ -228,6 +228,17 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl ...@@ -228,6 +228,17 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><literal>streaming</literal> (<type>boolean</type>)</term>
<listitem>
<para>
Specifies whether streaming of in-progress transactions should
be enabled for this subscription. By default, all transactions
are fully decoded on the publisher, and only then sent to the
subscriber as a whole.
</para>
</listitem>
</varlistentry>
</variablelist></para> </variablelist></para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
...@@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok) ...@@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->owner = subform->subowner; sub->owner = subform->subowner;
sub->enabled = subform->subenabled; sub->enabled = subform->subenabled;
sub->binary = subform->subbinary; sub->binary = subform->subbinary;
sub->stream = subform->substream;
/* Get conninfo */ /* Get conninfo */
datum = SysCacheGetAttr(SUBSCRIPTIONOID, datum = SysCacheGetAttr(SUBSCRIPTIONOID,
......
...@@ -1128,7 +1128,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; ...@@ -1128,7 +1128,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
-- All columns of pg_subscription except subconninfo are readable. -- All columns of pg_subscription except subconninfo are readable.
REVOKE ALL ON pg_subscription FROM public; REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications) GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, substream, subslotname, subpublications)
ON pg_subscription TO public; ON pg_subscription TO public;
......
...@@ -63,7 +63,8 @@ parse_subscription_options(List *options, ...@@ -63,7 +63,8 @@ parse_subscription_options(List *options,
bool *copy_data, bool *copy_data,
char **synchronous_commit, char **synchronous_commit,
bool *refresh, bool *refresh,
bool *binary_given, bool *binary) bool *binary_given, bool *binary,
bool *streaming_given, bool *streaming)
{ {
ListCell *lc; ListCell *lc;
bool connect_given = false; bool connect_given = false;
...@@ -99,6 +100,11 @@ parse_subscription_options(List *options, ...@@ -99,6 +100,11 @@ parse_subscription_options(List *options,
*binary_given = false; *binary_given = false;
*binary = false; *binary = false;
} }
if (streaming)
{
*streaming_given = false;
*streaming = false;
}
/* Parse options */ /* Parse options */
foreach(lc, options) foreach(lc, options)
...@@ -194,6 +200,16 @@ parse_subscription_options(List *options, ...@@ -194,6 +200,16 @@ parse_subscription_options(List *options,
*binary_given = true; *binary_given = true;
*binary = defGetBoolean(defel); *binary = defGetBoolean(defel);
} }
else if (strcmp(defel->defname, "streaming") == 0 && streaming)
{
if (*streaming_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
*streaming_given = true;
*streaming = defGetBoolean(defel);
}
else else
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_SYNTAX_ERROR),
...@@ -337,6 +353,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) ...@@ -337,6 +353,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
bool enabled_given; bool enabled_given;
bool enabled; bool enabled;
bool copy_data; bool copy_data;
bool streaming;
bool streaming_given;
char *synchronous_commit; char *synchronous_commit;
char *conninfo; char *conninfo;
char *slotname; char *slotname;
...@@ -360,7 +378,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) ...@@ -360,7 +378,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
&copy_data, &copy_data,
&synchronous_commit, &synchronous_commit,
NULL, /* no "refresh" */ NULL, /* no "refresh" */
&binary_given, &binary); &binary_given, &binary,
&streaming_given, &streaming);
/* /*
* Since creating a replication slot is not transactional, rolling back * Since creating a replication slot is not transactional, rolling back
...@@ -427,6 +446,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) ...@@ -427,6 +446,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
values[Anum_pg_subscription_subconninfo - 1] = values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo); CStringGetTextDatum(conninfo);
if (slotname) if (slotname)
...@@ -698,6 +718,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ...@@ -698,6 +718,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
char *synchronous_commit; char *synchronous_commit;
bool binary_given; bool binary_given;
bool binary; bool binary;
bool streaming_given;
bool streaming;
parse_subscription_options(stmt->options, parse_subscription_options(stmt->options,
NULL, /* no "connect" */ NULL, /* no "connect" */
...@@ -707,7 +729,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ...@@ -707,7 +729,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
NULL, /* no "copy_data" */ NULL, /* no "copy_data" */
&synchronous_commit, &synchronous_commit,
NULL, /* no "refresh" */ NULL, /* no "refresh" */
&binary_given, &binary); &binary_given, &binary,
&streaming_given, &streaming);
if (slotname_given) if (slotname_given)
{ {
...@@ -739,6 +762,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ...@@ -739,6 +762,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
replaces[Anum_pg_subscription_subbinary - 1] = true; replaces[Anum_pg_subscription_subbinary - 1] = true;
} }
if (streaming_given)
{
values[Anum_pg_subscription_substream - 1] =
BoolGetDatum(streaming);
replaces[Anum_pg_subscription_substream - 1] = true;
}
update_tuple = true; update_tuple = true;
break; break;
} }
...@@ -756,7 +786,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ...@@ -756,7 +786,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
NULL, /* no "copy_data" */ NULL, /* no "copy_data" */
NULL, /* no "synchronous_commit" */ NULL, /* no "synchronous_commit" */
NULL, /* no "refresh" */ NULL, /* no "refresh" */
NULL, NULL); /* no "binary" */ NULL, NULL, /* no "binary" */
NULL, NULL); /* no streaming */
Assert(enabled_given); Assert(enabled_given);
if (!sub->slotname && enabled) if (!sub->slotname && enabled)
...@@ -800,8 +831,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ...@@ -800,8 +831,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
&copy_data, &copy_data,
NULL, /* no "synchronous_commit" */ NULL, /* no "synchronous_commit" */
&refresh, &refresh,
NULL, NULL); /* no "binary" */ NULL, NULL, /* no "binary" */
NULL, NULL); /* no "streaming" */
values[Anum_pg_subscription_subpublications - 1] = values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(stmt->publication); publicationListToArray(stmt->publication);
replaces[Anum_pg_subscription_subpublications - 1] = true; replaces[Anum_pg_subscription_subpublications - 1] = true;
...@@ -843,7 +874,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ...@@ -843,7 +874,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
&copy_data, &copy_data,
NULL, /* no "synchronous_commit" */ NULL, /* no "synchronous_commit" */
NULL, /* no "refresh" */ NULL, /* no "refresh" */
NULL, NULL); /* no "binary" */ NULL, NULL, /* no "binary" */
NULL, NULL); /* no "streaming" */
AlterSubscription_refresh(sub, copy_data); AlterSubscription_refresh(sub, copy_data);
......
...@@ -4141,6 +4141,18 @@ pgstat_get_wait_io(WaitEventIO w) ...@@ -4141,6 +4141,18 @@ pgstat_get_wait_io(WaitEventIO w)
case WAIT_EVENT_WAL_WRITE: case WAIT_EVENT_WAL_WRITE:
event_name = "WALWrite"; event_name = "WALWrite";
break; break;
case WAIT_EVENT_LOGICAL_CHANGES_READ:
event_name = "LogicalChangesRead";
break;
case WAIT_EVENT_LOGICAL_CHANGES_WRITE:
event_name = "LogicalChangesWrite";
break;
case WAIT_EVENT_LOGICAL_SUBXACT_READ:
event_name = "LogicalSubxactRead";
break;
case WAIT_EVENT_LOGICAL_SUBXACT_WRITE:
event_name = "LogicalSubxactWrite";
break;
/* no default case, so that compiler will warn */ /* no default case, so that compiler will warn */
} }
......
...@@ -425,6 +425,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, ...@@ -425,6 +425,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
appendStringInfo(&cmd, "proto_version '%u'", appendStringInfo(&cmd, "proto_version '%u'",
options->proto.logical.proto_version); options->proto.logical.proto_version);
if (options->proto.logical.streaming &&
PQserverVersion(conn->streamConn) >= 140000)
appendStringInfo(&cmd, ", streaming 'on'");
pubnames = options->proto.logical.publication_names; pubnames = options->proto.logical.publication_names;
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
if (!pubnames_str) if (!pubnames_str)
......
...@@ -138,10 +138,15 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) ...@@ -138,10 +138,15 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
* Write INSERT to the output stream. * Write INSERT to the output stream.
*/ */
void void
logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary) logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
HeapTuple newtuple, bool binary)
{ {
pq_sendbyte(out, 'I'); /* action INSERT */ pq_sendbyte(out, 'I'); /* action INSERT */
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
/* use Oid as relation identifier */ /* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel)); pq_sendint32(out, RelationGetRelid(rel));
...@@ -177,8 +182,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) ...@@ -177,8 +182,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
* Write UPDATE to the output stream. * Write UPDATE to the output stream.
*/ */
void void
logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
HeapTuple newtuple, bool binary) HeapTuple oldtuple, HeapTuple newtuple, bool binary)
{ {
pq_sendbyte(out, 'U'); /* action UPDATE */ pq_sendbyte(out, 'U'); /* action UPDATE */
...@@ -186,6 +191,10 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, ...@@ -186,6 +191,10 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
/* use Oid as relation identifier */ /* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel)); pq_sendint32(out, RelationGetRelid(rel));
...@@ -247,7 +256,8 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, ...@@ -247,7 +256,8 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
* Write DELETE to the output stream. * Write DELETE to the output stream.
*/ */
void void
logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary) logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
HeapTuple oldtuple, bool binary)
{ {
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
...@@ -255,6 +265,10 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool b ...@@ -255,6 +265,10 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool b
pq_sendbyte(out, 'D'); /* action DELETE */ pq_sendbyte(out, 'D'); /* action DELETE */
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
/* use Oid as relation identifier */ /* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel)); pq_sendint32(out, RelationGetRelid(rel));
...@@ -295,6 +309,7 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) ...@@ -295,6 +309,7 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
*/ */
void void
logicalrep_write_truncate(StringInfo out, logicalrep_write_truncate(StringInfo out,
TransactionId xid,
int nrelids, int nrelids,
Oid relids[], Oid relids[],
bool cascade, bool restart_seqs) bool cascade, bool restart_seqs)
...@@ -304,6 +319,10 @@ logicalrep_write_truncate(StringInfo out, ...@@ -304,6 +319,10 @@ logicalrep_write_truncate(StringInfo out,
pq_sendbyte(out, 'T'); /* action TRUNCATE */ pq_sendbyte(out, 'T'); /* action TRUNCATE */
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
pq_sendint32(out, nrelids); pq_sendint32(out, nrelids);
/* encode and send truncate flags */ /* encode and send truncate flags */
...@@ -346,12 +365,16 @@ logicalrep_read_truncate(StringInfo in, ...@@ -346,12 +365,16 @@ logicalrep_read_truncate(StringInfo in,
* Write relation description to the output stream. * Write relation description to the output stream.
*/ */
void void
logicalrep_write_rel(StringInfo out, Relation rel) logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
{ {
char *relname; char *relname;
pq_sendbyte(out, 'R'); /* sending RELATION */ pq_sendbyte(out, 'R'); /* sending RELATION */
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
/* use Oid as relation identifier */ /* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel)); pq_sendint32(out, RelationGetRelid(rel));
...@@ -396,7 +419,7 @@ logicalrep_read_rel(StringInfo in) ...@@ -396,7 +419,7 @@ logicalrep_read_rel(StringInfo in)
* This function will always write base type info. * This function will always write base type info.
*/ */
void void
logicalrep_write_typ(StringInfo out, Oid typoid) logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
{ {
Oid basetypoid = getBaseType(typoid); Oid basetypoid = getBaseType(typoid);
HeapTuple tup; HeapTuple tup;
...@@ -404,6 +427,10 @@ logicalrep_write_typ(StringInfo out, Oid typoid) ...@@ -404,6 +427,10 @@ logicalrep_write_typ(StringInfo out, Oid typoid)
pq_sendbyte(out, 'Y'); /* sending TYPE */ pq_sendbyte(out, 'Y'); /* sending TYPE */
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid)); tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
if (!HeapTupleIsValid(tup)) if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for type %u", basetypoid); elog(ERROR, "cache lookup failed for type %u", basetypoid);
...@@ -720,3 +747,126 @@ logicalrep_read_namespace(StringInfo in) ...@@ -720,3 +747,126 @@ logicalrep_read_namespace(StringInfo in)
return nspname; return nspname;
} }
/*
* Write the information for the start stream message to the output stream.
*/
void
logicalrep_write_stream_start(StringInfo out,
TransactionId xid, bool first_segment)
{
pq_sendbyte(out, 'S'); /* action STREAM START */
Assert(TransactionIdIsValid(xid));
/* transaction ID (we're starting to stream, so must be valid) */
pq_sendint32(out, xid);
/* 1 if this is the first streaming segment for this xid */
pq_sendbyte(out, first_segment ? 1 : 0);
}
/*
* Read the information about the start stream message from output stream.
*/
TransactionId
logicalrep_read_stream_start(StringInfo in, bool *first_segment)
{
TransactionId xid;
Assert(first_segment);
xid = pq_getmsgint(in, 4);
*first_segment = (pq_getmsgbyte(in) == 1);
return xid;
}
/*
* Write the stop stream message to the output stream.
*/
void
logicalrep_write_stream_stop(StringInfo out)
{
pq_sendbyte(out, 'E'); /* action STREAM END */
}
/*
* Write STREAM COMMIT to the output stream.
*/
void
logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
uint8 flags = 0;
pq_sendbyte(out, 'c'); /* action STREAM COMMIT */
Assert(TransactionIdIsValid(txn->xid));
/* transaction ID */
pq_sendint32(out, txn->xid);
/* send the flags field (unused for now) */
pq_sendbyte(out, flags);
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
pq_sendint64(out, txn->commit_time);
}
/*
* Read STREAM COMMIT from the output stream.
*/
TransactionId
logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
{
TransactionId xid;
uint8 flags;
xid = pq_getmsgint(in, 4);
/* read flags (unused for now) */
flags = pq_getmsgbyte(in);
if (flags != 0)
elog(ERROR, "unrecognized flags %u in commit message", flags);
/* read fields */
commit_data->commit_lsn = pq_getmsgint64(in);
commit_data->end_lsn = pq_getmsgint64(in);
commit_data->committime = pq_getmsgint64(in);
return xid;
}
/*
* Write STREAM ABORT to the output stream. Note that xid and subxid will be
* same for the top-level transaction abort.
*/
void
logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
TransactionId subxid)
{
pq_sendbyte(out, 'A'); /* action STREAM ABORT */
Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
/* transaction ID */
pq_sendint32(out, xid);
pq_sendint32(out, subxid);
}
/*
* Read STREAM ABORT from the output stream.
*/
void
logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
TransactionId *subxid)
{
Assert(xid && subxid);
*xid = pq_getmsgint(in, 4);
*subxid = pq_getmsgint(in, 4);
}
...@@ -18,11 +18,45 @@ ...@@ -18,11 +18,45 @@
* This module includes server facing code and shares libpqwalreceiver * This module includes server facing code and shares libpqwalreceiver
* module with walreceiver for providing the libpq specific functionality. * module with walreceiver for providing the libpq specific functionality.
* *
*
* STREAMED TRANSACTIONS
* ---------------------
* Streamed transactions (large transactions exceeding a memory limit on the
* upstream) are not applied immediately, but instead, the data is written
* to temporary files and then applied at once when the final commit arrives.
*
* Unlike the regular (non-streamed) case, handling streamed transactions has
* to handle aborts of both the toplevel transaction and subtransactions. This
* is achieved by tracking offsets for subtransactions, which is then used
* to truncate the file with serialized changes.
*
* The files are placed in tmp file directory by default, and the filenames
* include both the XID of the toplevel transaction and OID of the
* subscription. This is necessary so that different workers processing a
* remote transaction with the same XID doesn't interfere.
*
* We use BufFiles instead of using normal temporary files because (a) the
* BufFile infrastructure supports temporary files that exceed the OS file size
* limit, (b) provides a way for automatic clean up on the error and (c) provides
* a way to survive these files across local transactions and allow to open and
* close at stream start and close. We decided to use SharedFileSet
* infrastructure as without that it deletes the files on the closure of the
* file and if we decide to keep stream files open across the start/stop stream
* then it will consume a lot of memory (more than 8K for each BufFile and
* there could be multiple such BufFiles as the subscriber could receive
* multiple start/stop streams for different transactions before getting the
* commit). Moreover, if we don't use SharedFileSet then we also need to invent
* a new way to pass filenames to BufFile APIs so that we are allowed to open
* the file we desired across multiple stream-open calls for the same
* transaction.
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h" #include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/table.h" #include "access/table.h"
#include "access/tableam.h" #include "access/tableam.h"
#include "access/xact.h" #include "access/xact.h"
...@@ -33,7 +67,9 @@ ...@@ -33,7 +67,9 @@
#include "catalog/pg_inherits.h" #include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h" #include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h" #include "catalog/pg_subscription_rel.h"
#include "catalog/pg_tablespace.h"
#include "commands/tablecmds.h" #include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h" #include "commands/trigger.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "executor/execPartition.h" #include "executor/execPartition.h"
...@@ -63,7 +99,9 @@ ...@@ -63,7 +99,9 @@
#include "replication/walreceiver.h" #include "replication/walreceiver.h"
#include "replication/worker_internal.h" #include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h" #include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "storage/proc.h" #include "storage/proc.h"
...@@ -71,6 +109,7 @@ ...@@ -71,6 +109,7 @@
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/catcache.h" #include "utils/catcache.h"
#include "utils/dynahash.h"
#include "utils/datum.h" #include "utils/datum.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/guc.h" #include "utils/guc.h"
...@@ -99,9 +138,26 @@ typedef struct SlotErrCallbackArg ...@@ -99,9 +138,26 @@ typedef struct SlotErrCallbackArg
int remote_attnum; int remote_attnum;
} SlotErrCallbackArg; } SlotErrCallbackArg;
/*
* Stream xid hash entry. Whenever we see a new xid we create this entry in the
* xidhash and along with it create the streaming file and store the fileset handle.
* The subxact file is created iff there is any subxact info under this xid. This
* entry is used on the subsequent streams for the xid to get the corresponding
* fileset handles, so storing them in hash makes the search faster.
*/
typedef struct StreamXidHash
{
TransactionId xid; /* xid is the hash key and must be first */
SharedFileSet *stream_fileset; /* shared file set for stream data */
SharedFileSet *subxact_fileset; /* shared file set for subxact info */
} StreamXidHash;
static MemoryContext ApplyMessageContext = NULL; static MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyContext = NULL; MemoryContext ApplyContext = NULL;
/* per stream context for streaming transactions */
static MemoryContext LogicalStreamingContext = NULL;
WalReceiverConn *wrconn = NULL; WalReceiverConn *wrconn = NULL;
Subscription *MySubscription = NULL; Subscription *MySubscription = NULL;
...@@ -110,12 +166,66 @@ bool MySubscriptionValid = false; ...@@ -110,12 +166,66 @@ bool MySubscriptionValid = false;
bool in_remote_transaction = false; bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
/* fields valid only when processing streamed transaction */
bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
/*
* Hash table for storing the streaming xid information along with shared file
* set for streaming and subxact files.
*/
static HTAB *xidhash = NULL;
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
typedef struct SubXactInfo
{
TransactionId xid; /* XID of the subxact */
int fileno; /* file number in the buffile */
off_t offset; /* offset in the file */
} SubXactInfo;
/* Sub-transaction data for the current streaming transaction */
typedef struct ApplySubXactData
{
uint32 nsubxacts; /* number of sub-transactions */
uint32 nsubxacts_max; /* current capacity of subxacts */
TransactionId subxact_last; /* xid of the last sub-transaction */
SubXactInfo *subxacts; /* sub-xact offset in changes file */
} ApplySubXactData;
static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
static void subxact_filename(char *path, Oid subid, TransactionId xid);
static void changes_filename(char *path, Oid subid, TransactionId xid);
/*
* Information about subtransactions of a given toplevel transaction.
*/
static void subxact_info_write(Oid subid, TransactionId xid);
static void subxact_info_read(Oid subid, TransactionId xid);
static void subxact_info_add(TransactionId xid);
static inline void cleanup_subxact_info(void);
/*
* Serialize and deserialize changes for a toplevel transaction.
*/
static void stream_cleanup_files(Oid subid, TransactionId xid);
static void stream_open_file(Oid subid, TransactionId xid, bool first);
static void stream_write_change(char action, StringInfo s);
static void stream_close_file(void);
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
static void store_flush_position(XLogRecPtr remote_lsn); static void store_flush_position(XLogRecPtr remote_lsn);
static void maybe_reread_subscription(void); static void maybe_reread_subscription(void);
/* prototype needed because of stream_commit */
static void apply_dispatch(StringInfo s);
static void apply_handle_insert_internal(ResultRelInfo *relinfo, static void apply_handle_insert_internal(ResultRelInfo *relinfo,
EState *estate, TupleTableSlot *remoteslot); EState *estate, TupleTableSlot *remoteslot);
static void apply_handle_update_internal(ResultRelInfo *relinfo, static void apply_handle_update_internal(ResultRelInfo *relinfo,
...@@ -187,6 +297,42 @@ ensure_transaction(void) ...@@ -187,6 +297,42 @@ ensure_transaction(void)
return true; return true;
} }
/*
* Handle streamed transactions.
*
* If in streaming mode (receiving a block of streamed transaction), we
* simply redirect it to a file for the proper toplevel transaction.
*
* Returns true for streamed transactions, false otherwise (regular mode).
*/
static bool
handle_streamed_transaction(const char action, StringInfo s)
{
TransactionId xid;
/* not in streaming mode */
if (!in_streamed_transaction)
return false;
Assert(stream_fd != NULL);
Assert(TransactionIdIsValid(stream_xid));
/*
* We should have received XID of the subxact as the first part of the
* message, so extract it.
*/
xid = pq_getmsgint(s, 4);
Assert(TransactionIdIsValid(xid));
/* Add the new subxact to the array (unless already there). */
subxact_info_add(xid);
/* write the change to the current file */
stream_write_change(action, s);
return true;
}
/* /*
* Executor state preparation for evaluation of constraint expressions, * Executor state preparation for evaluation of constraint expressions,
...@@ -612,16 +758,335 @@ static void ...@@ -612,16 +758,335 @@ static void
apply_handle_origin(StringInfo s) apply_handle_origin(StringInfo s)
{ {
/* /*
* ORIGIN message can only come inside remote transaction and before any * ORIGIN message can only come inside streaming transaction or inside
* actual writes. * remote transaction and before any actual writes.
*/ */
if (!in_remote_transaction || if (!in_streamed_transaction &&
(IsTransactionState() && !am_tablesync_worker())) (!in_remote_transaction ||
(IsTransactionState() && !am_tablesync_worker())))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION), (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("ORIGIN message sent out of order"))); errmsg("ORIGIN message sent out of order")));
} }
/*
* Handle STREAM START message.
*/
static void
apply_handle_stream_start(StringInfo s)
{
bool first_segment;
HASHCTL hash_ctl;
Assert(!in_streamed_transaction);
/*
* Start a transaction on stream start, this transaction will be committed
* on the stream stop. We need the transaction for handling the buffile,
* used for serializing the streaming data and subxact info.
*/
ensure_transaction();
/* notify handle methods we're processing a remote transaction */
in_streamed_transaction = true;
/* extract XID of the top-level transaction */
stream_xid = logicalrep_read_stream_start(s, &first_segment);
/*
* Initialize the xidhash table if we haven't yet. This will be used for
* the entire duration of the apply worker so create it in permanent
* context.
*/
if (xidhash == NULL)
{
hash_ctl.keysize = sizeof(TransactionId);
hash_ctl.entrysize = sizeof(StreamXidHash);
hash_ctl.hcxt = ApplyContext;
xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
HASH_ELEM | HASH_CONTEXT);
}
/* open the spool file for this transaction */
stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
/* if this is not the first segment, open existing subxact file */
if (!first_segment)
subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
pgstat_report_activity(STATE_RUNNING, NULL);
}
/*
* Handle STREAM STOP message.
*/
static void
apply_handle_stream_stop(StringInfo s)
{
Assert(in_streamed_transaction);
/*
* Close the file with serialized changes, and serialize information about
* subxacts for the toplevel transaction.
*/
subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
stream_close_file();
/* We must be in a valid transaction state */
Assert(IsTransactionState());
/* Commit the per-stream transaction */
CommitTransactionCommand();
in_streamed_transaction = false;
/* Reset per-stream context */
MemoryContextReset(LogicalStreamingContext);
pgstat_report_activity(STATE_IDLE, NULL);
}
/*
* Handle STREAM abort message.
*/
static void
apply_handle_stream_abort(StringInfo s)
{
TransactionId xid;
TransactionId subxid;
Assert(!in_streamed_transaction);
logicalrep_read_stream_abort(s, &xid, &subxid);
/*
* If the two XIDs are the same, it's in fact abort of toplevel xact, so
* just delete the files with serialized info.
*/
if (xid == subxid)
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
else
{
/*
* OK, so it's a subxact. We need to read the subxact file for the
* toplevel transaction, determine the offset tracked for the subxact,
* and truncate the file with changes. We also remove the subxacts
* with higher offsets (or rather higher XIDs).
*
* We intentionally scan the array from the tail, because we're likely
* aborting a change for the most recent subtransactions.
*
* We can't use the binary search here as subxact XIDs won't
* necessarily arrive in sorted order, consider the case where we have
* released the savepoint for multiple subtransactions and then
* performed rollback to savepoint for one of the earlier
* sub-transaction.
*/
int64 i;
int64 subidx;
BufFile *fd;
bool found = false;
char path[MAXPGPATH];
StreamXidHash *ent;
subidx = -1;
ensure_transaction();
subxact_info_read(MyLogicalRepWorker->subid, xid);
for (i = subxact_data.nsubxacts; i > 0; i--)
{
if (subxact_data.subxacts[i - 1].xid == subxid)
{
subidx = (i - 1);
found = true;
break;
}
}
/*
* If it's an empty sub-transaction then we will not find the subxid
* here so just cleanup the subxact info and return.
*/
if (!found)
{
/* Cleanup the subxact info */
cleanup_subxact_info();
CommitTransactionCommand();
return;
}
Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
&found);
Assert(found);
/* open the changes file */
changes_filename(path, MyLogicalRepWorker->subid, xid);
fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
/* OK, truncate the file at the right offset */
BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
subxact_data.subxacts[subidx].offset);
BufFileClose(fd);
/* discard the subxacts added later */
subxact_data.nsubxacts = subidx;
/* write the updated subxact list */
subxact_info_write(MyLogicalRepWorker->subid, xid);
CommitTransactionCommand();
}
}
/*
* Handle STREAM COMMIT message.
*/
static void
apply_handle_stream_commit(StringInfo s)
{
TransactionId xid;
StringInfoData s2;
int nchanges;
char path[MAXPGPATH];
char *buffer = NULL;
bool found;
LogicalRepCommitData commit_data;
StreamXidHash *ent;
MemoryContext oldcxt;
BufFile *fd;
Assert(!in_streamed_transaction);
xid = logicalrep_read_stream_commit(s, &commit_data);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
ensure_transaction();
/*
* Allocate file handle and memory required to process all the messages in
* TopTransactionContext to avoid them getting reset after each message is
* processed.
*/
oldcxt = MemoryContextSwitchTo(TopTransactionContext);
/* open the spool file for the committed transaction */
changes_filename(path, MyLogicalRepWorker->subid, xid);
elog(DEBUG1, "replaying changes from file \"%s\"", path);
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
&found);
Assert(found);
fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
buffer = palloc(BLCKSZ);
initStringInfo(&s2);
MemoryContextSwitchTo(oldcxt);
remote_final_lsn = commit_data.commit_lsn;
/*
* Make sure the handle apply_dispatch methods are aware we're in a remote
* transaction.
*/
in_remote_transaction = true;
pgstat_report_activity(STATE_RUNNING, NULL);
/*
* Read the entries one by one and pass them through the same logic as in
* apply_dispatch.
*/
nchanges = 0;
while (true)
{
int nbytes;
int len;
CHECK_FOR_INTERRUPTS();
/* read length of the on-disk record */
nbytes = BufFileRead(fd, &len, sizeof(len));
/* have we reached end of the file? */
if (nbytes == 0)
break;
/* do we have a correct length? */
if (nbytes != sizeof(len))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from streaming transaction's changes file \"%s\": %m",
path)));
Assert(len > 0);
/* make sure we have sufficiently large buffer */
buffer = repalloc(buffer, len);
/* and finally read the data into the buffer */
if (BufFileRead(fd, buffer, len) != len)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from streaming transaction's changes file \"%s\": %m",
path)));
/* copy the buffer to the stringinfo and call apply_dispatch */
resetStringInfo(&s2);
appendBinaryStringInfo(&s2, buffer, len);
/* Ensure we are reading the data into our memory context. */
oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
apply_dispatch(&s2);
MemoryContextReset(ApplyMessageContext);
MemoryContextSwitchTo(oldcxt);
nchanges++;
if (nchanges % 1000 == 0)
elog(DEBUG1, "replayed %d changes from file '%s'",
nchanges, path);
}
BufFileClose(fd);
/*
* Update origin state so we can restart streaming from correct position
* in case of crash.
*/
replorigin_session_origin_lsn = commit_data.end_lsn;
replorigin_session_origin_timestamp = commit_data.committime;
pfree(buffer);
pfree(s2.data);
CommitTransactionCommand();
pgstat_report_stat(false);
store_flush_position(commit_data.end_lsn);
elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
nchanges, path);
in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(commit_data.end_lsn);
/* unlink the files with serialized changes and subxact info */
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
pgstat_report_activity(STATE_IDLE, NULL);
}
/* /*
* Handle RELATION message. * Handle RELATION message.
* *
...@@ -635,6 +1100,9 @@ apply_handle_relation(StringInfo s) ...@@ -635,6 +1100,9 @@ apply_handle_relation(StringInfo s)
{ {
LogicalRepRelation *rel; LogicalRepRelation *rel;
if (handle_streamed_transaction('R', s))
return;
rel = logicalrep_read_rel(s); rel = logicalrep_read_rel(s);
logicalrep_relmap_update(rel); logicalrep_relmap_update(rel);
} }
...@@ -650,6 +1118,9 @@ apply_handle_type(StringInfo s) ...@@ -650,6 +1118,9 @@ apply_handle_type(StringInfo s)
{ {
LogicalRepTyp typ; LogicalRepTyp typ;
if (handle_streamed_transaction('Y', s))
return;
logicalrep_read_typ(s, &typ); logicalrep_read_typ(s, &typ);
logicalrep_typmap_update(&typ); logicalrep_typmap_update(&typ);
} }
...@@ -686,6 +1157,9 @@ apply_handle_insert(StringInfo s) ...@@ -686,6 +1157,9 @@ apply_handle_insert(StringInfo s)
TupleTableSlot *remoteslot; TupleTableSlot *remoteslot;
MemoryContext oldctx; MemoryContext oldctx;
if (handle_streamed_transaction('I', s))
return;
ensure_transaction(); ensure_transaction();
relid = logicalrep_read_insert(s, &newtup); relid = logicalrep_read_insert(s, &newtup);
...@@ -801,6 +1275,9 @@ apply_handle_update(StringInfo s) ...@@ -801,6 +1275,9 @@ apply_handle_update(StringInfo s)
RangeTblEntry *target_rte; RangeTblEntry *target_rte;
MemoryContext oldctx; MemoryContext oldctx;
if (handle_streamed_transaction('U', s))
return;
ensure_transaction(); ensure_transaction();
relid = logicalrep_read_update(s, &has_oldtup, &oldtup, relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
...@@ -950,6 +1427,9 @@ apply_handle_delete(StringInfo s) ...@@ -950,6 +1427,9 @@ apply_handle_delete(StringInfo s)
TupleTableSlot *remoteslot; TupleTableSlot *remoteslot;
MemoryContext oldctx; MemoryContext oldctx;
if (handle_streamed_transaction('D', s))
return;
ensure_transaction(); ensure_transaction();
relid = logicalrep_read_delete(s, &oldtup); relid = logicalrep_read_delete(s, &oldtup);
...@@ -1320,6 +1800,9 @@ apply_handle_truncate(StringInfo s) ...@@ -1320,6 +1800,9 @@ apply_handle_truncate(StringInfo s)
List *relids_logged = NIL; List *relids_logged = NIL;
ListCell *lc; ListCell *lc;
if (handle_streamed_transaction('T', s))
return;
ensure_transaction(); ensure_transaction();
remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
...@@ -1458,6 +1941,22 @@ apply_dispatch(StringInfo s) ...@@ -1458,6 +1941,22 @@ apply_dispatch(StringInfo s)
case 'O': case 'O':
apply_handle_origin(s); apply_handle_origin(s);
break; break;
/* STREAM START */
case 'S':
apply_handle_stream_start(s);
break;
/* STREAM END */
case 'E':
apply_handle_stream_stop(s);
break;
/* STREAM ABORT */
case 'A':
apply_handle_stream_abort(s);
break;
/* STREAM COMMIT */
case 'c':
apply_handle_stream_commit(s);
break;
default: default:
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION), (errcode(ERRCODE_PROTOCOL_VIOLATION),
...@@ -1570,6 +2069,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -1570,6 +2069,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
"ApplyMessageContext", "ApplyMessageContext",
ALLOCSET_DEFAULT_SIZES); ALLOCSET_DEFAULT_SIZES);
/*
* This memory context is used for per-stream data when the streaming mode
* is enabled. This context is reset on each stream stop.
*/
LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
"LogicalStreamingContext",
ALLOCSET_DEFAULT_SIZES);
/* mark as idle, before starting to loop */ /* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
...@@ -1674,7 +2181,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -1674,7 +2181,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* confirm all writes so far */ /* confirm all writes so far */
send_feedback(last_received, false, false); send_feedback(last_received, false, false);
if (!in_remote_transaction) if (!in_remote_transaction && !in_streamed_transaction)
{ {
/* /*
* If we didn't get any transactions for a while there might be * If we didn't get any transactions for a while there might be
...@@ -1938,6 +2445,7 @@ maybe_reread_subscription(void) ...@@ -1938,6 +2445,7 @@ maybe_reread_subscription(void)
strcmp(newsub->name, MySubscription->name) != 0 || strcmp(newsub->name, MySubscription->name) != 0 ||
strcmp(newsub->slotname, MySubscription->slotname) != 0 || strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
newsub->binary != MySubscription->binary || newsub->binary != MySubscription->binary ||
newsub->stream != MySubscription->stream ||
!equal(newsub->publications, MySubscription->publications)) !equal(newsub->publications, MySubscription->publications))
{ {
ereport(LOG, ereport(LOG,
...@@ -1979,6 +2487,439 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) ...@@ -1979,6 +2487,439 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
MySubscriptionValid = false; MySubscriptionValid = false;
} }
/*
* subxact_info_write
* Store information about subxacts for a toplevel transaction.
*
* For each subxact we store offset of it's first change in the main file.
* The file is always over-written as a whole.
*
* XXX We should only store subxacts that were not aborted yet.
*/
static void
subxact_info_write(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
bool found;
Size len;
StreamXidHash *ent;
BufFile *fd;
Assert(TransactionIdIsValid(xid));
/* find the xid entry in the xidhash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
&found);
/* we must found the entry for its top transaction by this time */
Assert(found);
/*
* If there is no subtransaction then nothing to do, but if already have
* subxact file then delete that.
*/
if (subxact_data.nsubxacts == 0)
{
if (ent->subxact_fileset)
{
cleanup_subxact_info();
SharedFileSetDeleteAll(ent->subxact_fileset);
pfree(ent->subxact_fileset);
ent->subxact_fileset = NULL;
}
return;
}
subxact_filename(path, subid, xid);
/*
* Create the subxact file if it not already created, otherwise open the
* existing file.
*/
if (ent->subxact_fileset == NULL)
{
MemoryContext oldctx;
/*
* We need to maintain shared fileset across multiple stream
* start/stop calls. So, need to allocate it in a persistent context.
*/
oldctx = MemoryContextSwitchTo(ApplyContext);
ent->subxact_fileset = palloc(sizeof(SharedFileSet));
SharedFileSetInit(ent->subxact_fileset, NULL);
MemoryContextSwitchTo(oldctx);
fd = BufFileCreateShared(ent->subxact_fileset, path);
}
else
fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
/* Write the subxact count and subxact info */
BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
BufFileWrite(fd, subxact_data.subxacts, len);
BufFileClose(fd);
/* free the memory allocated for subxact info */
cleanup_subxact_info();
}
/*
* subxact_info_read
* Restore information about subxacts of a streamed transaction.
*
* Read information about subxacts into the structure subxact_data that can be
* used later.
*/
static void
subxact_info_read(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
bool found;
Size len;
BufFile *fd;
StreamXidHash *ent;
MemoryContext oldctx;
Assert(TransactionIdIsValid(xid));
Assert(!subxact_data.subxacts);
Assert(subxact_data.nsubxacts == 0);
Assert(subxact_data.nsubxacts_max == 0);
/* Find the stream xid entry in the xidhash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
&found);
/*
* If subxact_fileset is not valid that mean we don't have any subxact
* info
*/
if (ent->subxact_fileset == NULL)
return;
subxact_filename(path, subid, xid);
fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
/* read number of subxact items */
if (BufFileRead(fd, &subxact_data.nsubxacts,
sizeof(subxact_data.nsubxacts)) !=
sizeof(subxact_data.nsubxacts))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
path)));
len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
/* we keep the maximum as a power of 2 */
subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
/*
* Allocate subxact information in the logical streaming context. We need
* this information during the complete stream so that we can add the sub
* transaction info to this. On stream stop we will flush this information
* to the subxact file and reset the logical streaming context.
*/
oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
sizeof(SubXactInfo));
MemoryContextSwitchTo(oldctx);
if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
path)));
BufFileClose(fd);
}
/*
* subxact_info_add
* Add information about a subxact (offset in the main file).
*/
static void
subxact_info_add(TransactionId xid)
{
SubXactInfo *subxacts = subxact_data.subxacts;
int64 i;
/* We must have a valid top level stream xid and a stream fd. */
Assert(TransactionIdIsValid(stream_xid));
Assert(stream_fd != NULL);
/*
* If the XID matches the toplevel transaction, we don't want to add it.
*/
if (stream_xid == xid)
return;
/*
* In most cases we're checking the same subxact as we've already seen in
* the last call, so make sure to ignore it (this change comes later).
*/
if (subxact_data.subxact_last == xid)
return;
/* OK, remember we're processing this XID. */
subxact_data.subxact_last = xid;
/*
* Check if the transaction is already present in the array of subxact. We
* intentionally scan the array from the tail, because we're likely adding
* a change for the most recent subtransactions.
*
* XXX Can we rely on the subxact XIDs arriving in sorted order? That
* would allow us to use binary search here.
*/
for (i = subxact_data.nsubxacts; i > 0; i--)
{
/* found, so we're done */
if (subxacts[i - 1].xid == xid)
return;
}
/* This is a new subxact, so we need to add it to the array. */
if (subxact_data.nsubxacts == 0)
{
MemoryContext oldctx;
subxact_data.nsubxacts_max = 128;
/*
* Allocate this memory for subxacts in per-stream context, see
* subxact_info_read.
*/
oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
MemoryContextSwitchTo(oldctx);
}
else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
{
subxact_data.nsubxacts_max *= 2;
subxacts = repalloc(subxacts,
subxact_data.nsubxacts_max * sizeof(SubXactInfo));
}
subxacts[subxact_data.nsubxacts].xid = xid;
/*
* Get the current offset of the stream file and store it as offset of
* this subxact.
*/
BufFileTell(stream_fd,
&subxacts[subxact_data.nsubxacts].fileno,
&subxacts[subxact_data.nsubxacts].offset);
subxact_data.nsubxacts++;
subxact_data.subxacts = subxacts;
}
/* format filename for file containing the info about subxacts */
static void
subxact_filename(char *path, Oid subid, TransactionId xid)
{
snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
}
/* format filename for file containing serialized changes */
static inline void
changes_filename(char *path, Oid subid, TransactionId xid)
{
snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
}
/*
* stream_cleanup_files
* Cleanup files for a subscription / toplevel transaction.
*
* Remove files with serialized changes and subxact info for a particular
* toplevel transaction. Each subscription has a separate set of files.
*/
static void
stream_cleanup_files(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
StreamXidHash *ent;
/* Remove the xid entry from the stream xid hash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_REMOVE,
NULL);
/* By this time we must have created the transaction entry */
Assert(ent != NULL);
/* Delete the change file and release the stream fileset memory */
changes_filename(path, subid, xid);
SharedFileSetDeleteAll(ent->stream_fileset);
pfree(ent->stream_fileset);
ent->stream_fileset = NULL;
/* Delete the subxact file and release the memory, if it exist */
if (ent->subxact_fileset)
{
subxact_filename(path, subid, xid);
SharedFileSetDeleteAll(ent->subxact_fileset);
pfree(ent->subxact_fileset);
ent->subxact_fileset = NULL;
}
}
/*
* stream_open_file
* Open a file that we'll use to serialize changes for a toplevel
* transaction.
*
* Open a file for streamed changes from a toplevel transaction identified
* by stream_xid (global variable). If it's the first chunk of streamed
* changes for this transaction, initialize the shared fileset and create the
* buffile, otherwise open the previously created file.
*
* This can only be called at the beginning of a "streaming" block, i.e.
* between stream_start/stream_stop messages from the upstream.
*/
static void
stream_open_file(Oid subid, TransactionId xid, bool first_segment)
{
char path[MAXPGPATH];
bool found;
MemoryContext oldcxt;
StreamXidHash *ent;
Assert(in_streamed_transaction);
Assert(OidIsValid(subid));
Assert(TransactionIdIsValid(xid));
Assert(stream_fd == NULL);
/* create or find the xid entry in the xidhash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_ENTER | HASH_FIND,
&found);
Assert(first_segment || found);
changes_filename(path, subid, xid);
elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
/*
* Create/open the buffiles under the logical streaming context so that we
* have those files until stream stop.
*/
oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
/*
* If this is the first streamed segment, the file must not exist, so make
* sure we're the ones creating it. Otherwise just open the file for
* writing, in append mode.
*/
if (first_segment)
{
MemoryContext savectx;
SharedFileSet *fileset;
/*
* We need to maintain shared fileset across multiple stream
* start/stop calls. So, need to allocate it in a persistent context.
*/
savectx = MemoryContextSwitchTo(ApplyContext);
fileset = palloc(sizeof(SharedFileSet));
SharedFileSetInit(fileset, NULL);
MemoryContextSwitchTo(savectx);
stream_fd = BufFileCreateShared(fileset, path);
/* Remember the fileset for the next stream of the same transaction */
ent->xid = xid;
ent->stream_fileset = fileset;
ent->subxact_fileset = NULL;
}
else
{
/*
* Open the file and seek to the end of the file because we always
* append the changes file.
*/
stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
BufFileSeek(stream_fd, 0, 0, SEEK_END);
}
MemoryContextSwitchTo(oldcxt);
}
/*
* stream_close_file
* Close the currently open file with streamed changes.
*
* This can only be called at the end of a streaming block, i.e. at stream_stop
* message from the upstream.
*/
static void
stream_close_file(void)
{
Assert(in_streamed_transaction);
Assert(TransactionIdIsValid(stream_xid));
Assert(stream_fd != NULL);
BufFileClose(stream_fd);
stream_xid = InvalidTransactionId;
stream_fd = NULL;
}
/*
* stream_write_change
* Serialize a change to a file for the current toplevel transaction.
*
* The change is serialized in a simple format, with length (not including
* the length), action code (identifying the message type) and message
* contents (without the subxact TransactionId value).
*/
static void
stream_write_change(char action, StringInfo s)
{
int len;
Assert(in_streamed_transaction);
Assert(TransactionIdIsValid(stream_xid));
Assert(stream_fd != NULL);
/* total on-disk size, including the action type character */
len = (s->len - s->cursor) + sizeof(char);
/* first write the size */
BufFileWrite(stream_fd, &len, sizeof(len));
/* then the action */
BufFileWrite(stream_fd, &action, sizeof(action));
/* and finally the remaining part of the buffer (after the XID) */
len = (s->len - s->cursor);
BufFileWrite(stream_fd, &s->data[s->cursor], len);
}
/*
* Cleanup the memory for subxacts and reset the related variables.
*/
static inline void
cleanup_subxact_info()
{
if (subxact_data.subxacts)
pfree(subxact_data.subxacts);
subxact_data.subxacts = NULL;
subxact_data.subxact_last = InvalidTransactionId;
subxact_data.nsubxacts = 0;
subxact_data.nsubxacts_max = 0;
}
/* Logical Replication Apply worker entry point */ /* Logical Replication Apply worker entry point */
void void
ApplyWorkerMain(Datum main_arg) ApplyWorkerMain(Datum main_arg)
...@@ -2151,6 +3092,7 @@ ApplyWorkerMain(Datum main_arg) ...@@ -2151,6 +3092,7 @@ ApplyWorkerMain(Datum main_arg)
options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary; options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
/* Start normal logical streaming replication. */ /* Start normal logical streaming replication. */
walrcv_startstreaming(wrconn, &options); walrcv_startstreaming(wrconn, &options);
......
...@@ -47,17 +47,40 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx, ...@@ -47,17 +47,40 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferChange *change); ReorderBufferChange *change);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id); RepOriginId origin_id);
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
static bool publications_valid; static bool publications_valid;
static bool in_streaming;
static List *LoadPublications(List *pubnames); static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid, static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue); uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx); static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx);
/* /*
* Entry in the map used to remember which relation schemas we sent. * Entry in the map used to remember which relation schemas we sent.
* *
* The schema_sent flag determines if the current schema record was already
* sent to the subscriber (in which case we don't need to send it again).
*
* The schema cache on downstream is however updated only at commit time,
* and with streamed transactions the commit order may be different from
* the order the transactions are sent in. Also, the (sub) transactions
* might get aborted so we need to send the schema for each (sub) transaction
* so that we don't loose the schema information on abort. For handling this,
* we maintain the list of xids (streamed_txns) for those we have already sent
* the schema.
*
* For partitions, 'pubactions' considers not only the table's own * For partitions, 'pubactions' considers not only the table's own
* publications, but also those of all of its ancestors. * publications, but also those of all of its ancestors.
*/ */
...@@ -70,6 +93,8 @@ typedef struct RelationSyncEntry ...@@ -70,6 +93,8 @@ typedef struct RelationSyncEntry
* have been sent for this to be true. * have been sent for this to be true.
*/ */
bool schema_sent; bool schema_sent;
List *streamed_txns; /* streamed toplevel transactions with this
* schema */
bool replicate_valid; bool replicate_valid;
PublicationActions pubactions; PublicationActions pubactions;
...@@ -95,10 +120,15 @@ typedef struct RelationSyncEntry ...@@ -95,10 +120,15 @@ typedef struct RelationSyncEntry
static HTAB *RelationSyncCache = NULL; static HTAB *RelationSyncCache = NULL;
static void init_rel_sync_cache(MemoryContext decoding_context); static void init_rel_sync_cache(MemoryContext decoding_context);
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue); uint32 hashvalue);
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
TransactionId xid);
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
TransactionId xid);
/* /*
* Specify output plugin callbacks * Specify output plugin callbacks
...@@ -115,16 +145,26 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) ...@@ -115,16 +145,26 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->commit_cb = pgoutput_commit_txn; cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter; cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown; cb->shutdown_cb = pgoutput_shutdown;
/* transaction streaming */
cb->stream_start_cb = pgoutput_stream_start;
cb->stream_stop_cb = pgoutput_stream_stop;
cb->stream_abort_cb = pgoutput_stream_abort;
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change;
cb->stream_truncate_cb = pgoutput_truncate;
} }
static void static void
parse_output_parameters(List *options, uint32 *protocol_version, parse_output_parameters(List *options, uint32 *protocol_version,
List **publication_names, bool *binary) List **publication_names, bool *binary,
bool *enable_streaming)
{ {
ListCell *lc; ListCell *lc;
bool protocol_version_given = false; bool protocol_version_given = false;
bool publication_names_given = false; bool publication_names_given = false;
bool binary_option_given = false; bool binary_option_given = false;
bool streaming_given = false;
*binary = false; *binary = false;
...@@ -182,6 +222,16 @@ parse_output_parameters(List *options, uint32 *protocol_version, ...@@ -182,6 +222,16 @@ parse_output_parameters(List *options, uint32 *protocol_version,
*binary = defGetBoolean(defel); *binary = defGetBoolean(defel);
} }
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
streaming_given = true;
*enable_streaming = defGetBoolean(defel);
}
else else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
} }
...@@ -194,6 +244,7 @@ static void ...@@ -194,6 +244,7 @@ static void
pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init) bool is_init)
{ {
bool enable_streaming = false;
PGOutputData *data = palloc0(sizeof(PGOutputData)); PGOutputData *data = palloc0(sizeof(PGOutputData));
/* Create our memory context for private allocations. */ /* Create our memory context for private allocations. */
...@@ -217,7 +268,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ...@@ -217,7 +268,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
parse_output_parameters(ctx->output_plugin_options, parse_output_parameters(ctx->output_plugin_options,
&data->protocol_version, &data->protocol_version,
&data->publication_names, &data->publication_names,
&data->binary); &data->binary,
&enable_streaming);
/* Check if we support requested protocol */ /* Check if we support requested protocol */
if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM) if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
...@@ -237,6 +289,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ...@@ -237,6 +289,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("publication_names parameter missing"))); errmsg("publication_names parameter missing")));
/*
* Decide whether to enable streaming. It is disabled by default, in
* which case we just update the flag in decoding context. Otherwise
* we only allow it with sufficient version of the protocol, and when
* the output plugin supports it.
*/
if (!enable_streaming)
ctx->streaming = false;
else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested proto_version=%d does not support streaming, need %d or higher",
data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
else if (!ctx->streaming)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("streaming requested, but not supported by output plugin")));
/* Also remember we're currently not streaming any transaction. */
in_streaming = false;
/* Init publication state. */ /* Init publication state. */
data->publications = NIL; data->publications = NIL;
publications_valid = false; publications_valid = false;
...@@ -247,6 +320,11 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ...@@ -247,6 +320,11 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
/* Initialize relation schema cache. */ /* Initialize relation schema cache. */
init_rel_sync_cache(CacheMemoryContext); init_rel_sync_cache(CacheMemoryContext);
} }
else
{
/* Disable the streaming during the slot initialization mode. */
ctx->streaming = false;
}
} }
/* /*
...@@ -305,9 +383,47 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ...@@ -305,9 +383,47 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
*/ */
static void static void
maybe_send_schema(LogicalDecodingContext *ctx, maybe_send_schema(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, ReorderBufferChange *change,
Relation relation, RelationSyncEntry *relentry) Relation relation, RelationSyncEntry *relentry)
{ {
if (relentry->schema_sent) bool schema_sent;
TransactionId xid = InvalidTransactionId;
TransactionId topxid = InvalidTransactionId;
/*
* Remember XID of the (sub)transaction for the change. We don't care if
* it's top-level transaction or not (we have already sent that XID in
* start of the current streaming block).
*
* If we're not in a streaming block, just use InvalidTransactionId and
* the write methods will not include it.
*/
if (in_streaming)
xid = change->txn->xid;
if (change->txn->toptxn)
topxid = change->txn->toptxn->xid;
else
topxid = xid;
/*
* Do we need to send the schema? We do track streamed transactions
* separately, because those may be applied later (and the regular
* transactions won't see their effects until then) and in an order that
* we don't know at this point.
*
* XXX There is a scope of optimization here. Currently, we always send
* the schema first time in a streaming transaction but we can probably
* avoid that by checking 'relentry->schema_sent' flag. However, before
* doing that we need to study its impact on the case where we have a mix
* of streaming and non-streaming transactions.
*/
if (in_streaming)
schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
else
schema_sent = relentry->schema_sent;
if (schema_sent)
return; return;
/* If needed, send the ancestor's schema first. */ /* If needed, send the ancestor's schema first. */
...@@ -323,19 +439,24 @@ maybe_send_schema(LogicalDecodingContext *ctx, ...@@ -323,19 +439,24 @@ maybe_send_schema(LogicalDecodingContext *ctx,
relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc), relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
CreateTupleDescCopy(outdesc)); CreateTupleDescCopy(outdesc));
MemoryContextSwitchTo(oldctx); MemoryContextSwitchTo(oldctx);
send_relation_and_attrs(ancestor, ctx); send_relation_and_attrs(ancestor, xid, ctx);
RelationClose(ancestor); RelationClose(ancestor);
} }
send_relation_and_attrs(relation, ctx); send_relation_and_attrs(relation, xid, ctx);
relentry->schema_sent = true;
if (in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
else
relentry->schema_sent = true;
} }
/* /*
* Sends a relation * Sends a relation
*/ */
static void static void
send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx) send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx)
{ {
TupleDesc desc = RelationGetDescr(relation); TupleDesc desc = RelationGetDescr(relation);
int i; int i;
...@@ -359,17 +480,19 @@ send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx) ...@@ -359,17 +480,19 @@ send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
continue; continue;
OutputPluginPrepareWrite(ctx, false); OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, att->atttypid); logicalrep_write_typ(ctx->out, xid, att->atttypid);
OutputPluginWrite(ctx, false); OutputPluginWrite(ctx, false);
} }
OutputPluginPrepareWrite(ctx, false); OutputPluginPrepareWrite(ctx, false);
logicalrep_write_rel(ctx->out, relation); logicalrep_write_rel(ctx->out, xid, relation);
OutputPluginWrite(ctx, false); OutputPluginWrite(ctx, false);
} }
/* /*
* Sends the decoded DML over wire. * Sends the decoded DML over wire.
*
* This is called both in streaming and non-streaming modes.
*/ */
static void static void
pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
...@@ -378,10 +501,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ...@@ -378,10 +501,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
MemoryContext old; MemoryContext old;
RelationSyncEntry *relentry; RelationSyncEntry *relentry;
TransactionId xid = InvalidTransactionId;
if (!is_publishable_relation(relation)) if (!is_publishable_relation(relation))
return; return;
/*
* Remember the xid for the change in streaming mode. We need to send xid
* with each change in the streaming mode so that subscriber can make
* their association and on aborts, it can discard the corresponding
* changes.
*/
if (in_streaming)
xid = change->txn->xid;
relentry = get_rel_sync_entry(data, RelationGetRelid(relation)); relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
/* First check the table filter */ /* First check the table filter */
...@@ -406,7 +539,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ...@@ -406,7 +539,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Avoid leaking memory by using and resetting our own context */ /* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context); old = MemoryContextSwitchTo(data->context);
maybe_send_schema(ctx, relation, relentry); maybe_send_schema(ctx, txn, change, relation, relentry);
/* Send the data */ /* Send the data */
switch (change->action) switch (change->action)
...@@ -426,7 +559,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ...@@ -426,7 +559,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
} }
OutputPluginPrepareWrite(ctx, true); OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, relation, tuple, logicalrep_write_insert(ctx->out, xid, relation, tuple,
data->binary); data->binary);
OutputPluginWrite(ctx, true); OutputPluginWrite(ctx, true);
break; break;
...@@ -451,8 +584,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ...@@ -451,8 +584,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
} }
OutputPluginPrepareWrite(ctx, true); OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, relation, oldtuple, newtuple, logicalrep_write_update(ctx->out, xid, relation, oldtuple,
data->binary); newtuple, data->binary);
OutputPluginWrite(ctx, true); OutputPluginWrite(ctx, true);
break; break;
} }
...@@ -472,7 +605,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ...@@ -472,7 +605,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
} }
OutputPluginPrepareWrite(ctx, true); OutputPluginPrepareWrite(ctx, true);
logicalrep_write_delete(ctx->out, relation, oldtuple, logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
data->binary); data->binary);
OutputPluginWrite(ctx, true); OutputPluginWrite(ctx, true);
} }
...@@ -498,6 +631,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ...@@ -498,6 +631,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int i; int i;
int nrelids; int nrelids;
Oid *relids; Oid *relids;
TransactionId xid = InvalidTransactionId;
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
if (in_streaming)
xid = change->txn->xid;
old = MemoryContextSwitchTo(data->context); old = MemoryContextSwitchTo(data->context);
...@@ -526,13 +664,14 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ...@@ -526,13 +664,14 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
continue; continue;
relids[nrelids++] = relid; relids[nrelids++] = relid;
maybe_send_schema(ctx, relation, relentry); maybe_send_schema(ctx, txn, change, relation, relentry);
} }
if (nrelids > 0) if (nrelids > 0)
{ {
OutputPluginPrepareWrite(ctx, true); OutputPluginPrepareWrite(ctx, true);
logicalrep_write_truncate(ctx->out, logicalrep_write_truncate(ctx->out,
xid,
nrelids, nrelids,
relids, relids,
change->data.truncate.cascade, change->data.truncate.cascade,
...@@ -605,6 +744,118 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue) ...@@ -605,6 +744,118 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
rel_sync_cache_publication_cb(arg, cacheid, hashvalue); rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
} }
/*
* START STREAM callback
*/
static void
pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
/* we can't nest streaming of transactions */
Assert(!in_streaming);
/*
* If we already sent the first stream for this transaction then don't
* send the origin id in the subsequent streams.
*/
if (rbtxn_is_streamed(txn))
send_replication_origin = false;
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
if (send_replication_origin)
{
char *origin;
/* Message boundary */
OutputPluginWrite(ctx, false);
OutputPluginPrepareWrite(ctx, true);
if (replorigin_by_oid(txn->origin_id, true, &origin))
logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
}
OutputPluginWrite(ctx, true);
/* we're streaming a chunk of transaction now */
in_streaming = true;
}
/*
* STOP STREAM callback
*/
static void
pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
/* we should be streaming a trasanction */
Assert(in_streaming);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_stop(ctx->out);
OutputPluginWrite(ctx, true);
/* we've stopped streaming a transaction */
in_streaming = false;
}
/*
* Notify downstream to discard the streamed transaction (along with all
* it's subtransactions, if it's a toplevel transaction).
*/
static void
pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn)
{
ReorderBufferTXN *toptxn;
/*
* The abort should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
Assert(!in_streaming);
/* determine the toplevel transaction */
toptxn = (txn->toptxn) ? txn->toptxn : txn;
Assert(rbtxn_is_streamed(toptxn));
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
OutputPluginWrite(ctx, true);
cleanup_rel_sync_cache(toptxn->xid, false);
}
/*
* Notify downstream to apply the streamed transaction (along with all
* it's subtransactions).
*/
static void
pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
/*
* The commit should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
Assert(!in_streaming);
Assert(rbtxn_is_streamed(txn));
OutputPluginUpdateProgress(ctx);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
cleanup_rel_sync_cache(txn->xid, true);
}
/* /*
* Initialize the relation schema sync cache for a decoding session. * Initialize the relation schema sync cache for a decoding session.
* *
...@@ -641,6 +892,39 @@ init_rel_sync_cache(MemoryContext cachectx) ...@@ -641,6 +892,39 @@ init_rel_sync_cache(MemoryContext cachectx)
(Datum) 0); (Datum) 0);
} }
/*
* We expect relatively small number of streamed transactions.
*/
static bool
get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
{
ListCell *lc;
foreach(lc, entry->streamed_txns)
{
if (xid == (uint32) lfirst_int(lc))
return true;
}
return false;
}
/*
* Add the xid in the rel sync entry for which we have already sent the schema
* of the relation.
*/
static void
set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
{
MemoryContext oldctx;
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
entry->streamed_txns = lappend_int(entry->streamed_txns, xid);
MemoryContextSwitchTo(oldctx);
}
/* /*
* Find or create entry in the relation schema cache. * Find or create entry in the relation schema cache.
* *
...@@ -771,11 +1055,58 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) ...@@ -771,11 +1055,58 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
} }
if (!found) if (!found)
{
entry->schema_sent = false; entry->schema_sent = false;
entry->streamed_txns = NULL;
}
return entry; return entry;
} }
/*
* Cleanup list of streamed transactions and update the schema_sent flag.
*
* When a streamed transaction commits or aborts, we need to remove the
* toplevel XID from the schema cache. If the transaction aborted, the
* subscriber will simply throw away the schema records we streamed, so
* we don't need to do anything else.
*
* If the transaction is committed, the subscriber will update the relation
* cache - so tweak the schema_sent flag accordingly.
*/
static void
cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
{
HASH_SEQ_STATUS hash_seq;
RelationSyncEntry *entry;
ListCell *lc;
Assert(RelationSyncCache != NULL);
hash_seq_init(&hash_seq, RelationSyncCache);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
/*
* We can set the schema_sent flag for an entry that has committed xid
* in the list as that ensures that the subscriber would have the
* corresponding schema and we don't need to send it unless there is
* any invalidation for that relation.
*/
foreach(lc, entry->streamed_txns)
{
if (xid == (uint32) lfirst_int(lc))
{
if (is_commit)
entry->schema_sent = true;
entry->streamed_txns =
foreach_delete_current(entry->streamed_txns, lc);
break;
}
}
}
}
/* /*
* Relcache invalidation callback * Relcache invalidation callback
*/ */
...@@ -811,7 +1142,11 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) ...@@ -811,7 +1142,11 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
* Reset schema sent status as the relation definition may have changed. * Reset schema sent status as the relation definition may have changed.
*/ */
if (entry != NULL) if (entry != NULL)
{
entry->schema_sent = false; entry->schema_sent = false;
list_free(entry->streamed_txns);
entry->streamed_txns = NULL;
}
} }
/* /*
......
...@@ -4202,6 +4202,7 @@ getSubscriptions(Archive *fout) ...@@ -4202,6 +4202,7 @@ getSubscriptions(Archive *fout)
int i_oid; int i_oid;
int i_subname; int i_subname;
int i_rolname; int i_rolname;
int i_substream;
int i_subconninfo; int i_subconninfo;
int i_subslotname; int i_subslotname;
int i_subsynccommit; int i_subsynccommit;
...@@ -4241,10 +4242,17 @@ getSubscriptions(Archive *fout) ...@@ -4241,10 +4242,17 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 140000) if (fout->remoteVersion >= 140000)
appendPQExpBuffer(query, appendPQExpBuffer(query,
" s.subbinary\n"); " s.subbinary,\n");
else else
appendPQExpBuffer(query, appendPQExpBuffer(query,
" false AS subbinary\n"); " false AS subbinary,\n");
if (fout->remoteVersion >= 140000)
appendPQExpBuffer(query,
" s.substream\n");
else
appendPQExpBuffer(query,
" false AS substream\n");
appendPQExpBuffer(query, appendPQExpBuffer(query,
"FROM pg_subscription s\n" "FROM pg_subscription s\n"
...@@ -4264,6 +4272,7 @@ getSubscriptions(Archive *fout) ...@@ -4264,6 +4272,7 @@ getSubscriptions(Archive *fout)
i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subsynccommit = PQfnumber(res, "subsynccommit");
i_subpublications = PQfnumber(res, "subpublications"); i_subpublications = PQfnumber(res, "subpublications");
i_subbinary = PQfnumber(res, "subbinary"); i_subbinary = PQfnumber(res, "subbinary");
i_substream = PQfnumber(res, "substream");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
...@@ -4287,6 +4296,8 @@ getSubscriptions(Archive *fout) ...@@ -4287,6 +4296,8 @@ getSubscriptions(Archive *fout)
pg_strdup(PQgetvalue(res, i, i_subpublications)); pg_strdup(PQgetvalue(res, i, i_subpublications));
subinfo[i].subbinary = subinfo[i].subbinary =
pg_strdup(PQgetvalue(res, i, i_subbinary)); pg_strdup(PQgetvalue(res, i, i_subbinary));
subinfo[i].substream =
pg_strdup(PQgetvalue(res, i, i_substream));
if (strlen(subinfo[i].rolname) == 0) if (strlen(subinfo[i].rolname) == 0)
pg_log_warning("owner of subscription \"%s\" appears to be invalid", pg_log_warning("owner of subscription \"%s\" appears to be invalid",
...@@ -4358,6 +4369,9 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo) ...@@ -4358,6 +4369,9 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
if (strcmp(subinfo->subbinary, "t") == 0) if (strcmp(subinfo->subbinary, "t") == 0)
appendPQExpBuffer(query, ", binary = true"); appendPQExpBuffer(query, ", binary = true");
if (strcmp(subinfo->substream, "f") != 0)
appendPQExpBuffer(query, ", streaming = on");
if (strcmp(subinfo->subsynccommit, "off") != 0) if (strcmp(subinfo->subsynccommit, "off") != 0)
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
......
...@@ -626,6 +626,7 @@ typedef struct _SubscriptionInfo ...@@ -626,6 +626,7 @@ typedef struct _SubscriptionInfo
char *subconninfo; char *subconninfo;
char *subslotname; char *subslotname;
char *subbinary; char *subbinary;
char *substream;
char *subsynccommit; char *subsynccommit;
char *subpublications; char *subpublications;
} SubscriptionInfo; } SubscriptionInfo;
......
...@@ -5979,7 +5979,7 @@ describeSubscriptions(const char *pattern, bool verbose) ...@@ -5979,7 +5979,7 @@ describeSubscriptions(const char *pattern, bool verbose)
PGresult *res; PGresult *res;
printQueryOpt myopt = pset.popt; printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false, static const bool translate_columns[] = {false, false, false, false,
false, false, false}; false, false, false, false};
if (pset.sversion < 100000) if (pset.sversion < 100000)
{ {
...@@ -6005,11 +6005,13 @@ describeSubscriptions(const char *pattern, bool verbose) ...@@ -6005,11 +6005,13 @@ describeSubscriptions(const char *pattern, bool verbose)
if (verbose) if (verbose)
{ {
/* Binary mode is only supported in v14 and higher */ /* Binary mode and streaming are only supported in v14 and higher */
if (pset.sversion >= 140000) if (pset.sversion >= 140000)
appendPQExpBuffer(&buf, appendPQExpBuffer(&buf,
", subbinary AS \"%s\"\n", ", subbinary AS \"%s\"\n"
gettext_noop("Binary")); ", substream AS \"%s\"\n",
gettext_noop("Binary"),
gettext_noop("Streaming"));
appendPQExpBuffer(&buf, appendPQExpBuffer(&buf,
", subsynccommit AS \"%s\"\n" ", subsynccommit AS \"%s\"\n"
......
...@@ -53,6 +53,6 @@ ...@@ -53,6 +53,6 @@
*/ */
/* yyyymmddN */ /* yyyymmddN */
#define CATALOG_VERSION_NO 202009021 #define CATALOG_VERSION_NO 202009031
#endif #endif
...@@ -51,6 +51,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW ...@@ -51,6 +51,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
bool subbinary; /* True if the subscription wants the bool subbinary; /* True if the subscription wants the
* publisher to send data in binary */ * publisher to send data in binary */
bool substream; /* Stream in-progress transactions. */
#ifdef CATALOG_VARLEN /* variable-length fields start here */ #ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */ /* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL; text subconninfo BKI_FORCE_NOT_NULL;
...@@ -78,6 +80,7 @@ typedef struct Subscription ...@@ -78,6 +80,7 @@ typedef struct Subscription
bool enabled; /* Indicates if the subscription is enabled */ bool enabled; /* Indicates if the subscription is enabled */
bool binary; /* Indicates if the subscription wants data in bool binary; /* Indicates if the subscription wants data in
* binary format */ * binary format */
bool stream; /* Allow streaming in-progress transactions. */
char *conninfo; /* Connection string to the publisher */ char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */ char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */ char *synccommit; /* Synchronous commit setting for worker */
......
...@@ -982,7 +982,11 @@ typedef enum ...@@ -982,7 +982,11 @@ typedef enum
WAIT_EVENT_WAL_READ, WAIT_EVENT_WAL_READ,
WAIT_EVENT_WAL_SYNC, WAIT_EVENT_WAL_SYNC,
WAIT_EVENT_WAL_SYNC_METHOD_ASSIGN, WAIT_EVENT_WAL_SYNC_METHOD_ASSIGN,
WAIT_EVENT_WAL_WRITE WAIT_EVENT_WAL_WRITE,
WAIT_EVENT_LOGICAL_CHANGES_READ,
WAIT_EVENT_LOGICAL_CHANGES_WRITE,
WAIT_EVENT_LOGICAL_SUBXACT_READ,
WAIT_EVENT_LOGICAL_SUBXACT_WRITE
} WaitEventIO; } WaitEventIO;
/* ---------- /* ----------
......
...@@ -23,9 +23,13 @@ ...@@ -23,9 +23,13 @@
* we can support. LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we * we can support. LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we
* have backwards compatibility for. The client requests protocol version at * have backwards compatibility for. The client requests protocol version at
* connect time. * connect time.
*
* LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with
* support for streaming large transactions.
*/ */
#define LOGICALREP_PROTO_MIN_VERSION_NUM 1 #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
#define LOGICALREP_PROTO_VERSION_NUM 1 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
#define LOGICALREP_PROTO_VERSION_NUM 2
/* /*
* This struct stores a tuple received via logical replication. * This struct stores a tuple received via logical replication.
...@@ -98,25 +102,45 @@ extern void logicalrep_read_commit(StringInfo in, ...@@ -98,25 +102,45 @@ extern void logicalrep_read_commit(StringInfo in,
extern void logicalrep_write_origin(StringInfo out, const char *origin, extern void logicalrep_write_origin(StringInfo out, const char *origin,
XLogRecPtr origin_lsn); XLogRecPtr origin_lsn);
extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
extern void logicalrep_write_insert(StringInfo out, Relation rel, extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
HeapTuple newtuple, bool binary); Relation rel, HeapTuple newtuple,
bool binary);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, extern void logicalrep_write_update(StringInfo out, TransactionId xid,
Relation rel, HeapTuple oldtuple,
HeapTuple newtuple, bool binary); HeapTuple newtuple, bool binary);
extern LogicalRepRelId logicalrep_read_update(StringInfo in, extern LogicalRepRelId logicalrep_read_update(StringInfo in,
bool *has_oldtuple, LogicalRepTupleData *oldtup, bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup); LogicalRepTupleData *newtup);
extern void logicalrep_write_delete(StringInfo out, Relation rel, extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
HeapTuple oldtuple, bool binary); Relation rel, HeapTuple oldtuple,
bool binary);
extern LogicalRepRelId logicalrep_read_delete(StringInfo in, extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
LogicalRepTupleData *oldtup); LogicalRepTupleData *oldtup);
extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
int nrelids, Oid relids[],
bool cascade, bool restart_seqs); bool cascade, bool restart_seqs);
extern List *logicalrep_read_truncate(StringInfo in, extern List *logicalrep_read_truncate(StringInfo in,
bool *cascade, bool *restart_seqs); bool *cascade, bool *restart_seqs);
extern void logicalrep_write_rel(StringInfo out, Relation rel); extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
extern void logicalrep_write_typ(StringInfo out, Oid typoid); extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
Oid typoid);
extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp); extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp);
extern void logicalrep_write_stream_start(StringInfo out, TransactionId xid,
bool first_segment);
extern TransactionId logicalrep_read_stream_start(StringInfo in,
bool *first_segment);
extern void logicalrep_write_stream_stop(StringInfo out);
extern TransactionId logicalrep_read_stream_stop(StringInfo in);
extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
extern TransactionId logicalrep_read_stream_commit(StringInfo out,
LogicalRepCommitData *commit_data);
extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
TransactionId subxid);
extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
TransactionId *subxid);
#endif /* LOGICAL_PROTO_H */ #endif /* LOGICAL_PROTO_H */
...@@ -178,6 +178,7 @@ typedef struct ...@@ -178,6 +178,7 @@ typedef struct
uint32 proto_version; /* Logical protocol version */ uint32 proto_version; /* Logical protocol version */
List *publication_names; /* String list of publications */ List *publication_names; /* String list of publications */
bool binary; /* Ask publisher to use binary */ bool binary; /* Ask publisher to use binary */
bool streaming; /* Streaming of large transactions */
} logical; } logical;
} proto; } proto;
} WalRcvStreamOptions; } WalRcvStreamOptions;
......
...@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ...@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- -----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
...@@ -91,10 +91,10 @@ ERROR: subscription "regress_doesnotexist" does not exist ...@@ -91,10 +91,10 @@ ERROR: subscription "regress_doesnotexist" does not exist
ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
ERROR: unrecognized subscription parameter: "create_slot" ERROR: unrecognized subscription parameter: "create_slot"
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
-----------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------ -----------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | dbname=regress_doesnotexist2 regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | off | dbname=regress_doesnotexist2
(1 row) (1 row)
BEGIN; BEGIN;
...@@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ...@@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar" ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off. HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
---------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------ ---------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | local | dbname=regress_doesnotexist2 regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | local | dbname=regress_doesnotexist2
(1 row) (1 row)
-- rename back to keep the rest simple -- rename back to keep the rest simple
...@@ -162,19 +162,42 @@ ERROR: binary requires a Boolean value ...@@ -162,19 +162,42 @@ ERROR: binary requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- -----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | t | off | dbname=regress_doesnotexist regress_testsub | regress_subscription_user | f | {testpub} | t | f | off | dbname=regress_doesnotexist
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- -----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;
-- fail - streaming must be boolean
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo);
ERROR: streaming requires a Boolean value
-- now it works
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
(1 row) (1 row)
DROP SUBSCRIPTION regress_testsub; DROP SUBSCRIPTION regress_testsub;
......
...@@ -132,6 +132,21 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); ...@@ -132,6 +132,21 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub; DROP SUBSCRIPTION regress_testsub;
-- fail - streaming must be boolean
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo);
-- now it works
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
\dRs+
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
DROP SUBSCRIPTION regress_testsub;
RESET SESSION AUTHORIZATION; RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2; DROP ROLE regress_subscription_user2;
......
# Test streaming of simple large transaction
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 4;
# Create publisher node
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = get_new_node('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
$node_publisher->wait_for_catchup($appname);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
# Insert, update and delete enough rows to exceed the 64kB limit.
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(3334|3334|3334), 'check extra columns contain local defaults');
# Test the streaming in binary mode
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET (binary = on)"
);
# Insert, update and delete enough rows to exceed the 64kB limit.
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(6667|6667|6667), 'check extra columns contain local defaults');
# Change the local values of the extra columns on the subscriber,
# update publisher, and check that subscriber retains the expected
# values. This is to ensure that non-streaming transactions behave
# properly after a streaming transaction.
$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'");
$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)");
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab");
is($result, qq(6667|6667|6667), 'check extra columns contain locally changed data');
$node_subscriber->stop;
$node_publisher->stop;
...@@ -111,6 +111,7 @@ Append ...@@ -111,6 +111,7 @@ Append
AppendPath AppendPath
AppendRelInfo AppendRelInfo
AppendState AppendState
ApplySubXactData
Archive Archive
ArchiveEntryPtrType ArchiveEntryPtrType
ArchiveFormat ArchiveFormat
...@@ -2370,6 +2371,7 @@ StopList ...@@ -2370,6 +2371,7 @@ StopList
StopWorkersData StopWorkersData
StrategyNumber StrategyNumber
StreamCtl StreamCtl
StreamXidHash
StringInfo StringInfo
StringInfoData StringInfoData
StripnullState StripnullState
...@@ -2380,6 +2382,7 @@ SubPlanState ...@@ -2380,6 +2382,7 @@ SubPlanState
SubTransactionId SubTransactionId
SubXactCallback SubXactCallback
SubXactCallbackItem SubXactCallbackItem
SubXactInfo
SubXactEvent SubXactEvent
SubplanResultRelHashElem SubplanResultRelHashElem
SubqueryScan SubqueryScan
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment