Commit b663a413 authored by Tomas Vondra's avatar Tomas Vondra

Implement support for bulk inserts in postgres_fdw

Extends the FDW API to allow batching inserts into foreign tables. That
is usually much more efficient than inserting individual rows, due to
high latency for each round-trip to the foreign server.

It was possible to implement something similar in the regular FDW API,
but it was inconvenient and there were issues with reporting the number
of actually inserted rows etc. This extends the FDW API with two new
functions:

* GetForeignModifyBatchSize - allows the FDW picking optimal batch size

* ExecForeignBatchInsert - inserts a batch of rows at once

Currently, only INSERT queries support batching. Support for DELETE and
UPDATE may be added in the future.

This also implements batching for postgres_fdw. The batch size may be
specified using "batch_size" option both at the server and table level.

The initial patch version was written by me, but it was rewritten and
improved in many ways by Takayuki Tsunakawa.

Author: Takayuki Tsunakawa
Reviewed-by: Tomas Vondra, Amit Langote
Discussion: https://postgr.es/m/20200628151002.7x5laxwpgvkyiu3q@development
parent ad600bba
......@@ -1705,13 +1705,16 @@ deparseRangeTblRef(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
* The statement text is appended to buf, and we also create an integer List
* of the columns being retrieved by WITH CHECK OPTION or RETURNING (if any),
* which is returned to *retrieved_attrs.
*
* This also stores end position of the VALUES clause, so that we can rebuild
* an INSERT for a batch of rows later.
*/
void
deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
Index rtindex, Relation rel,
List *targetAttrs, bool doNothing,
List *withCheckOptionList, List *returningList,
List **retrieved_attrs)
List **retrieved_attrs, int *values_end_len)
{
AttrNumber pindex;
bool first;
......@@ -1754,6 +1757,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
}
else
appendStringInfoString(buf, " DEFAULT VALUES");
*values_end_len = buf->len;
if (doNothing)
appendStringInfoString(buf, " ON CONFLICT DO NOTHING");
......@@ -1763,6 +1767,54 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
withCheckOptionList, returningList, retrieved_attrs);
}
/*
* rebuild remote INSERT statement
*
* Provided a number of rows in a batch, builds INSERT statement with the
* right number of parameters.
*/
void
rebuildInsertSql(StringInfo buf, char *orig_query,
int values_end_len, int num_cols,
int num_rows)
{
int i, j;
int pindex;
bool first;
/* Make sure the values_end_len is sensible */
Assert((values_end_len > 0) && (values_end_len <= strlen(orig_query)));
/* Copy up to the end of the first record from the original query */
appendBinaryStringInfo(buf, orig_query, values_end_len);
/*
* Add records to VALUES clause (we already have parameters for the
* first row, so start at the right offset).
*/
pindex = num_cols + 1;
for (i = 0; i < num_rows; i++)
{
appendStringInfoString(buf, ", (");
first = true;
for (j = 0; j < num_cols; j++)
{
if (!first)
appendStringInfoString(buf, ", ");
first = false;
appendStringInfo(buf, "$%d", pindex);
pindex++;
}
appendStringInfoChar(buf, ')');
}
/* Copy stuff after VALUES clause from the original query */
appendStringInfoString(buf, orig_query + values_end_len);
}
/*
* deparse remote UPDATE statement
*
......
......@@ -3887,9 +3887,10 @@ EXPLAIN (VERBOSE, COSTS OFF) EXECUTE st7;
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Insert on public.ft1
Remote SQL: INSERT INTO "S 1"."T 1"("C 1", c2, c3, c4, c5, c6, c7, c8) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
Batch Size: 1
-> Result
Output: NULL::integer, 1001, 101, 'foo'::text, NULL::timestamp with time zone, NULL::timestamp without time zone, NULL::character varying, 'ft1 '::character(10), NULL::user_enum
(4 rows)
(5 rows)
ALTER TABLE "S 1"."T 1" RENAME TO "T 0";
ALTER FOREIGN TABLE ft1 OPTIONS (SET table_name 'T 0');
......@@ -3920,9 +3921,10 @@ EXPLAIN (VERBOSE, COSTS OFF) EXECUTE st7;
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Insert on public.ft1
Remote SQL: INSERT INTO "S 1"."T 0"("C 1", c2, c3, c4, c5, c6, c7, c8) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
Batch Size: 1
-> Result
Output: NULL::integer, 1001, 101, 'foo'::text, NULL::timestamp with time zone, NULL::timestamp without time zone, NULL::character varying, 'ft1 '::character(10), NULL::user_enum
(4 rows)
(5 rows)
ALTER TABLE "S 1"."T 0" RENAME TO "T 1";
ALTER FOREIGN TABLE ft1 OPTIONS (SET table_name 'T 1');
......@@ -4244,12 +4246,13 @@ INSERT INTO ft2 (c1,c2,c3) SELECT c1+1000,c2+100, c3 || c3 FROM ft2 LIMIT 20;
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Insert on public.ft2
Remote SQL: INSERT INTO "S 1"."T 1"("C 1", c2, c3, c4, c5, c6, c7, c8) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
Batch Size: 1
-> Subquery Scan on "*SELECT*"
Output: "*SELECT*"."?column?", "*SELECT*"."?column?_1", NULL::integer, "*SELECT*"."?column?_2", NULL::timestamp with time zone, NULL::timestamp without time zone, NULL::character varying, 'ft2 '::character(10), NULL::user_enum
-> Foreign Scan on public.ft2 ft2_1
Output: (ft2_1.c1 + 1000), (ft2_1.c2 + 100), (ft2_1.c3 || ft2_1.c3)
Remote SQL: SELECT "C 1", c2, c3 FROM "S 1"."T 1" LIMIT 20::bigint
(7 rows)
(8 rows)
INSERT INTO ft2 (c1,c2,c3) SELECT c1+1000,c2+100, c3 || c3 FROM ft2 LIMIT 20;
INSERT INTO ft2 (c1,c2,c3)
......@@ -5360,9 +5363,10 @@ INSERT INTO ft2 (c1,c2,c3) VALUES (1200,999,'foo') RETURNING tableoid::regclass;
Insert on public.ft2
Output: (ft2.tableoid)::regclass
Remote SQL: INSERT INTO "S 1"."T 1"("C 1", c2, c3, c4, c5, c6, c7, c8) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
Batch Size: 1
-> Result
Output: 1200, 999, NULL::integer, 'foo'::text, NULL::timestamp with time zone, NULL::timestamp without time zone, NULL::character varying, 'ft2 '::character(10), NULL::user_enum
(5 rows)
(6 rows)
INSERT INTO ft2 (c1,c2,c3) VALUES (1200,999,'foo') RETURNING tableoid::regclass;
tableoid
......@@ -6212,9 +6216,10 @@ INSERT INTO rw_view VALUES (0, 5);
--------------------------------------------------------------------------------
Insert on public.foreign_tbl
Remote SQL: INSERT INTO public.base_tbl(a, b) VALUES ($1, $2) RETURNING a, b
Batch Size: 1
-> Result
Output: 0, 5
(4 rows)
(5 rows)
INSERT INTO rw_view VALUES (0, 5); -- should fail
ERROR: new row violates check option for view "rw_view"
......@@ -6225,9 +6230,10 @@ INSERT INTO rw_view VALUES (0, 15);
--------------------------------------------------------------------------------
Insert on public.foreign_tbl
Remote SQL: INSERT INTO public.base_tbl(a, b) VALUES ($1, $2) RETURNING a, b
Batch Size: 1
-> Result
Output: 0, 15
(4 rows)
(5 rows)
INSERT INTO rw_view VALUES (0, 15); -- ok
SELECT * FROM foreign_tbl;
......@@ -8923,7 +8929,7 @@ DO $d$
END;
$d$;
ERROR: invalid option "password"
HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size
HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size
CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
PL/pgSQL function inline_code_block line 3 at EXECUTE
-- If we add a password for our user mapping instead, we should get a different
......@@ -9112,3 +9118,138 @@ SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
loopback2 | t
(1 row)
-- ===================================================================
-- batch insert
-- ===================================================================
BEGIN;
CREATE SERVER batch10 FOREIGN DATA WRAPPER postgres_fdw OPTIONS( batch_size '10' );
SELECT count(*)
FROM pg_foreign_server
WHERE srvname = 'batch10'
AND srvoptions @> array['batch_size=10'];
count
-------
1
(1 row)
ALTER SERVER batch10 OPTIONS( SET batch_size '20' );
SELECT count(*)
FROM pg_foreign_server
WHERE srvname = 'batch10'
AND srvoptions @> array['batch_size=10'];
count
-------
0
(1 row)
SELECT count(*)
FROM pg_foreign_server
WHERE srvname = 'batch10'
AND srvoptions @> array['batch_size=20'];
count
-------
1
(1 row)
CREATE FOREIGN TABLE table30 ( x int ) SERVER batch10 OPTIONS ( batch_size '30' );
SELECT COUNT(*)
FROM pg_foreign_table
WHERE ftrelid = 'table30'::regclass
AND ftoptions @> array['batch_size=30'];
count
-------
1
(1 row)
ALTER FOREIGN TABLE table30 OPTIONS ( SET batch_size '40');
SELECT COUNT(*)
FROM pg_foreign_table
WHERE ftrelid = 'table30'::regclass
AND ftoptions @> array['batch_size=30'];
count
-------
0
(1 row)
SELECT COUNT(*)
FROM pg_foreign_table
WHERE ftrelid = 'table30'::regclass
AND ftoptions @> array['batch_size=40'];
count
-------
1
(1 row)
ROLLBACK;
CREATE TABLE batch_table ( x int );
CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '10' );
EXPLAIN (VERBOSE, COSTS OFF) INSERT INTO ftable SELECT * FROM generate_series(1, 10) i;
QUERY PLAN
-------------------------------------------------------------
Insert on public.ftable
Remote SQL: INSERT INTO public.batch_table(x) VALUES ($1)
Batch Size: 10
-> Function Scan on pg_catalog.generate_series i
Output: i.i
Function Call: generate_series(1, 10)
(6 rows)
INSERT INTO ftable SELECT * FROM generate_series(1, 10) i;
INSERT INTO ftable SELECT * FROM generate_series(11, 31) i;
INSERT INTO ftable VALUES (32);
INSERT INTO ftable VALUES (33), (34);
SELECT COUNT(*) FROM ftable;
count
-------
34
(1 row)
TRUNCATE batch_table;
DROP FOREIGN TABLE ftable;
-- Disable batch insert
CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '1' );
EXPLAIN (VERBOSE, COSTS OFF) INSERT INTO ftable VALUES (1), (2);
QUERY PLAN
-------------------------------------------------------------
Insert on public.ftable
Remote SQL: INSERT INTO public.batch_table(x) VALUES ($1)
Batch Size: 1
-> Values Scan on "*VALUES*"
Output: "*VALUES*".column1
(5 rows)
INSERT INTO ftable VALUES (1), (2);
SELECT COUNT(*) FROM ftable;
count
-------
2
(1 row)
DROP FOREIGN TABLE ftable;
DROP TABLE batch_table;
-- Use partitioning
CREATE TABLE batch_table ( x int ) PARTITION BY HASH (x);
CREATE TABLE batch_table_p0 (LIKE batch_table);
CREATE FOREIGN TABLE batch_table_p0f
PARTITION OF batch_table
FOR VALUES WITH (MODULUS 3, REMAINDER 0)
SERVER loopback
OPTIONS (table_name 'batch_table_p0', batch_size '10');
CREATE TABLE batch_table_p1 (LIKE batch_table);
CREATE FOREIGN TABLE batch_table_p1f
PARTITION OF batch_table
FOR VALUES WITH (MODULUS 3, REMAINDER 1)
SERVER loopback
OPTIONS (table_name 'batch_table_p1', batch_size '1');
CREATE TABLE batch_table_p2
PARTITION OF batch_table
FOR VALUES WITH (MODULUS 3, REMAINDER 2);
INSERT INTO batch_table SELECT * FROM generate_series(1, 66) i;
SELECT COUNT(*) FROM batch_table;
count
-------
66
(1 row)
-- Clean up
DROP TABLE batch_table CASCADE;
......@@ -142,6 +142,17 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
errmsg("%s requires a non-negative integer value",
def->defname)));
}
else if (strcmp(def->defname, "batch_size") == 0)
{
int batch_size;
batch_size = strtol(defGetString(def), NULL, 10);
if (batch_size <= 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s requires a non-negative integer value",
def->defname)));
}
else if (strcmp(def->defname, "password_required") == 0)
{
bool pw_required = defGetBoolean(def);
......@@ -203,6 +214,9 @@ InitPgFdwOptions(void)
/* fetch_size is available on both server and table */
{"fetch_size", ForeignServerRelationId, false},
{"fetch_size", ForeignTableRelationId, false},
/* batch_size is available on both server and table */
{"batch_size", ForeignServerRelationId, false},
{"batch_size", ForeignTableRelationId, false},
{"password_required", UserMappingRelationId, false},
/*
......
This diff is collapsed.
......@@ -161,7 +161,10 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
Index rtindex, Relation rel,
List *targetAttrs, bool doNothing,
List *withCheckOptionList, List *returningList,
List **retrieved_attrs);
List **retrieved_attrs, int *values_end_len);
extern void rebuildInsertSql(StringInfo buf, char *orig_query,
int values_end_len, int num_cols,
int num_rows);
extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
Index rtindex, Relation rel,
List *targetAttrs,
......
......@@ -2738,3 +2738,96 @@ COMMIT;
-- should not be output because they should be closed at the end of
-- the above transaction.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
-- ===================================================================
-- batch insert
-- ===================================================================
BEGIN;
CREATE SERVER batch10 FOREIGN DATA WRAPPER postgres_fdw OPTIONS( batch_size '10' );
SELECT count(*)
FROM pg_foreign_server
WHERE srvname = 'batch10'
AND srvoptions @> array['batch_size=10'];
ALTER SERVER batch10 OPTIONS( SET batch_size '20' );
SELECT count(*)
FROM pg_foreign_server
WHERE srvname = 'batch10'
AND srvoptions @> array['batch_size=10'];
SELECT count(*)
FROM pg_foreign_server
WHERE srvname = 'batch10'
AND srvoptions @> array['batch_size=20'];
CREATE FOREIGN TABLE table30 ( x int ) SERVER batch10 OPTIONS ( batch_size '30' );
SELECT COUNT(*)
FROM pg_foreign_table
WHERE ftrelid = 'table30'::regclass
AND ftoptions @> array['batch_size=30'];
ALTER FOREIGN TABLE table30 OPTIONS ( SET batch_size '40');
SELECT COUNT(*)
FROM pg_foreign_table
WHERE ftrelid = 'table30'::regclass
AND ftoptions @> array['batch_size=30'];
SELECT COUNT(*)
FROM pg_foreign_table
WHERE ftrelid = 'table30'::regclass
AND ftoptions @> array['batch_size=40'];
ROLLBACK;
CREATE TABLE batch_table ( x int );
CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '10' );
EXPLAIN (VERBOSE, COSTS OFF) INSERT INTO ftable SELECT * FROM generate_series(1, 10) i;
INSERT INTO ftable SELECT * FROM generate_series(1, 10) i;
INSERT INTO ftable SELECT * FROM generate_series(11, 31) i;
INSERT INTO ftable VALUES (32);
INSERT INTO ftable VALUES (33), (34);
SELECT COUNT(*) FROM ftable;
TRUNCATE batch_table;
DROP FOREIGN TABLE ftable;
-- Disable batch insert
CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '1' );
EXPLAIN (VERBOSE, COSTS OFF) INSERT INTO ftable VALUES (1), (2);
INSERT INTO ftable VALUES (1), (2);
SELECT COUNT(*) FROM ftable;
DROP FOREIGN TABLE ftable;
DROP TABLE batch_table;
-- Use partitioning
CREATE TABLE batch_table ( x int ) PARTITION BY HASH (x);
CREATE TABLE batch_table_p0 (LIKE batch_table);
CREATE FOREIGN TABLE batch_table_p0f
PARTITION OF batch_table
FOR VALUES WITH (MODULUS 3, REMAINDER 0)
SERVER loopback
OPTIONS (table_name 'batch_table_p0', batch_size '10');
CREATE TABLE batch_table_p1 (LIKE batch_table);
CREATE FOREIGN TABLE batch_table_p1f
PARTITION OF batch_table
FOR VALUES WITH (MODULUS 3, REMAINDER 1)
SERVER loopback
OPTIONS (table_name 'batch_table_p1', batch_size '1');
CREATE TABLE batch_table_p2
PARTITION OF batch_table
FOR VALUES WITH (MODULUS 3, REMAINDER 2);
INSERT INTO batch_table SELECT * FROM generate_series(1, 66) i;
SELECT COUNT(*) FROM batch_table;
-- Clean up
DROP TABLE batch_table CASCADE;
......@@ -523,8 +523,9 @@ BeginForeignModify(ModifyTableState *mtstate,
Begin executing a foreign table modification operation. This routine is
called during executor startup. It should perform any initialization
needed prior to the actual table modifications. Subsequently,
<function>ExecForeignInsert</function>, <function>ExecForeignUpdate</function> or
<function>ExecForeignDelete</function> will be called for each tuple to be
<function>ExecForeignInsert/ExecForeignBatchInsert</function>,
<function>ExecForeignUpdate</function> or
<function>ExecForeignDelete</function> will be called for tuple(s) to be
inserted, updated, or deleted.
</para>
......@@ -614,6 +615,81 @@ ExecForeignInsert(EState *estate,
<para>
<programlisting>
TupleTableSlot **
ExecForeignBatchInsert(EState *estate,
ResultRelInfo *rinfo,
TupleTableSlot **slots,
TupleTableSlot *planSlots,
int *numSlots);
</programlisting>
Insert multiple tuples in bulk into the foreign table.
The parameters are the same for <function>ExecForeignInsert</function>
except <literal>slots</literal> and <literal>planSlots</literal> contain
multiple tuples and <literal>*numSlots></literal> specifies the number of
tuples in those arrays.
</para>
<para>
The return value is an array of slots containing the data that was
actually inserted (this might differ from the data supplied, for
example as a result of trigger actions.)
The passed-in <literal>slots</literal> can be re-used for this purpose.
The number of successfully inserted tuples is returned in
<literal>*numSlots</literal>.
</para>
<para>
The data in the returned slot is used only if the <command>INSERT</command>
statement involves a view
<literal>WITH CHECK OPTION</literal>; or if the foreign table has
an <literal>AFTER ROW</literal> trigger. Triggers require all columns,
but the FDW could choose to optimize away returning some or all columns
depending on the contents of the
<literal>WITH CHECK OPTION</literal> constraints.
</para>
<para>
If the <function>ExecForeignBatchInsert</function> or
<function>GetForeignModifyBatchSize</function> pointer is set to
<literal>NULL</literal>, attempts to insert into the foreign table will
use <function>ExecForeignInsert</function>.
This function is not used if the <command>INSERT</command> has the
<literal>RETURNING></literal> clause.
</para>
<para>
Note that this function is also called when inserting routed tuples into
a foreign-table partition. See the callback functions
described below that allow the FDW to support that.
</para>
<para>
<programlisting>
int
GetForeignModifyBatchSize(ResultRelInfo *rinfo);
</programlisting>
Report the maximum number of tuples that a single
<function>ExecForeignBatchInsert</function> call can handle for
the specified foreign table. That is, The executor passes at most
the number of tuples that this function returns to
<function>ExecForeignBatchInsert</function>.
<literal>rinfo</literal> is the <structname>ResultRelInfo</structname> struct describing
the target foreign table.
The FDW is expected to provide a foreign server and/or foreign
table option for the user to set this value, or some hard-coded value.
</para>
<para>
If the <function>ExecForeignBatchInsert</function> or
<function>GetForeignModifyBatchSize</function> pointer is set to
<literal>NULL</literal>, attempts to insert into the foreign table will
use <function>ExecForeignInsert</function>.
</para>
<para>
<programlisting>
TupleTableSlot *
ExecForeignUpdate(EState *estate,
ResultRelInfo *rinfo,
......@@ -741,8 +817,9 @@ BeginForeignInsert(ModifyTableState *mtstate,
in both cases when it is the partition chosen for tuple routing and the
target specified in a <command>COPY FROM</command> command. It should
perform any initialization needed prior to the actual insertion.
Subsequently, <function>ExecForeignInsert</function> will be called for
each tuple to be inserted into the foreign table.
Subsequently, <function>ExecForeignInsert</function> or
<function>ExecForeignBatchInsert</function> will be called for
tuple(s) to be inserted into the foreign table.
</para>
<para>
......@@ -773,8 +850,8 @@ BeginForeignInsert(ModifyTableState *mtstate,
<para>
Note that if the FDW does not support routable foreign-table partitions
and/or executing <command>COPY FROM</command> on foreign tables, this
function or <function>ExecForeignInsert</function> subsequently called
must throw error as needed.
function or <function>ExecForeignInsert/ExecForeignBatchInsert</function>
subsequently called must throw error as needed.
</para>
<para>
......
......@@ -354,6 +354,19 @@ OPTIONS (ADD password_required 'false');
</listitem>
</varlistentry>
<varlistentry>
<term><literal>batch_size</literal></term>
<listitem>
<para>
This option specifies the number of rows <filename>postgres_fdw</filename>
should insert in each insert operation. It can be specified for a
foreign table or a foreign server. The option specified on a table
overrides an option specified for the server.
The default is <literal>1</literal>.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect3>
......
......@@ -993,6 +993,23 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
/*
* Determine if the FDW supports batch insert and determine the batch
* size (a FDW may support batching, but it may be disabled for the
* server/table or for this particular query).
*
* If the FDW does not support batching, we set the batch size to 1.
*/
if (partRelInfo->ri_FdwRoutine != NULL &&
partRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
partRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
partRelInfo->ri_BatchSize =
partRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(partRelInfo);
else
partRelInfo->ri_BatchSize = 1;
Assert(partRelInfo->ri_BatchSize >= 1);
partRelInfo->ri_CopyMultiInsertBuffer = NULL;
/*
......
......@@ -58,6 +58,13 @@
#include "utils/rel.h"
static void ExecBatchInsert(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo,
TupleTableSlot **slots,
TupleTableSlot **planSlots,
int numSlots,
EState *estate,
bool canSetTag);
static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo,
ItemPointer conflictTid,
......@@ -389,6 +396,7 @@ ExecInsert(ModifyTableState *mtstate,
ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
OnConflictAction onconflict = node->onConflictAction;
PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
MemoryContext oldContext;
/*
* If the input result relation is a partitioned table, find the leaf
......@@ -441,6 +449,55 @@ ExecInsert(ModifyTableState *mtstate,
ExecComputeStoredGenerated(resultRelInfo, estate, slot,
CMD_INSERT);
/*
* If the FDW supports batching, and batching is requested, accumulate
* rows and insert them in batches. Otherwise use the per-row inserts.
*/
if (resultRelInfo->ri_BatchSize > 1)
{
/*
* If a certain number of tuples have already been accumulated,
* or a tuple has come for a different relation than that for
* the accumulated tuples, perform the batch insert
*/
if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize)
{
ExecBatchInsert(mtstate, resultRelInfo,
resultRelInfo->ri_Slots,
resultRelInfo->ri_PlanSlots,
resultRelInfo->ri_NumSlots,
estate, canSetTag);
resultRelInfo->ri_NumSlots = 0;
}
oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
if (resultRelInfo->ri_Slots == NULL)
{
resultRelInfo->ri_Slots = palloc(sizeof(TupleTableSlot *) *
resultRelInfo->ri_BatchSize);
resultRelInfo->ri_PlanSlots = palloc(sizeof(TupleTableSlot *) *
resultRelInfo->ri_BatchSize);
}
resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
MakeSingleTupleTableSlot(slot->tts_tupleDescriptor,
slot->tts_ops);
ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
slot);
resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor,
planSlot->tts_ops);
ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
planSlot);
resultRelInfo->ri_NumSlots++;
MemoryContextSwitchTo(oldContext);
return NULL;
}
/*
* insert into foreign table: let the FDW do it
*/
......@@ -698,6 +755,70 @@ ExecInsert(ModifyTableState *mtstate,
return result;
}
/* ----------------------------------------------------------------
* ExecBatchInsert
*
* Insert multiple tuples in an efficient way.
* Currently, this handles inserting into a foreign table without
* RETURNING clause.
* ----------------------------------------------------------------
*/
static void
ExecBatchInsert(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo,
TupleTableSlot **slots,
TupleTableSlot **planSlots,
int numSlots,
EState *estate,
bool canSetTag)
{
int i;
int numInserted = numSlots;
TupleTableSlot *slot = NULL;
TupleTableSlot **rslots;
/*
* insert into foreign table: let the FDW do it
*/
rslots = resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
resultRelInfo,
slots,
planSlots,
&numInserted);
for (i = 0; i < numInserted; i++)
{
slot = rslots[i];
/*
* AFTER ROW Triggers or RETURNING expressions might reference the
* tableoid column, so (re-)initialize tts_tableOid before evaluating
* them.
*/
slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, slot, NIL,
mtstate->mt_transition_capture);
/*
* Check any WITH CHECK OPTION constraints from parent views. See the
* comment in ExecInsert.
*/
if (resultRelInfo->ri_WithCheckOptions != NIL)
ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate);
}
if (canSetTag && numInserted > 0)
estate->es_processed += numInserted;
for (i = 0; i < numSlots; i++)
{
ExecDropSingleTupleTableSlot(slots[i]);
ExecDropSingleTupleTableSlot(planSlots[i]);
}
}
/* ----------------------------------------------------------------
* ExecDelete
*
......@@ -1937,6 +2058,9 @@ ExecModifyTable(PlanState *pstate)
ItemPointerData tuple_ctid;
HeapTupleData oldtupdata;
HeapTuple oldtuple;
PartitionTupleRouting *proute = node->mt_partition_tuple_routing;
List *relinfos = NIL;
ListCell *lc;
CHECK_FOR_INTERRUPTS();
......@@ -2152,6 +2276,25 @@ ExecModifyTable(PlanState *pstate)
return slot;
}
/*
* Insert remaining tuples for batch insert.
*/
if (proute)
relinfos = estate->es_tuple_routing_result_relations;
else
relinfos = estate->es_opened_result_relations;
foreach(lc, relinfos)
{
resultRelInfo = lfirst(lc);
if (resultRelInfo->ri_NumSlots > 0)
ExecBatchInsert(node, resultRelInfo,
resultRelInfo->ri_Slots,
resultRelInfo->ri_PlanSlots,
resultRelInfo->ri_NumSlots,
estate, node->canSetTag);
}
/*
* We're done, but fire AFTER STATEMENT triggers before exiting.
*/
......@@ -2650,6 +2793,23 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
}
}
/*
* Determine if the FDW supports batch insert and determine the batch
* size (a FDW may support batching, but it may be disabled for the
* server/table).
*/
if (!resultRelInfo->ri_usesFdwDirectModify &&
operation == CMD_INSERT &&
resultRelInfo->ri_FdwRoutine != NULL &&
resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
resultRelInfo->ri_BatchSize =
resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
else
resultRelInfo->ri_BatchSize = 1;
Assert(resultRelInfo->ri_BatchSize >= 1);
/*
* Lastly, if this is not the primary (canSetTag) ModifyTable node, add it
* to estate->es_auxmodifytables so that it will be run to completion by
......
......@@ -277,6 +277,21 @@ list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2,
return list;
}
List *
list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2,
ListCell datum3, ListCell datum4, ListCell datum5)
{
List *list = new_list(t, 5);
list->elements[0] = datum1;
list->elements[1] = datum2;
list->elements[2] = datum3;
list->elements[3] = datum4;
list->elements[4] = datum5;
check_list_invariants(list);
return list;
}
/*
* Make room for a new head cell in the given (non-NIL) list.
*
......
......@@ -85,6 +85,14 @@ typedef TupleTableSlot *(*ExecForeignInsert_function) (EState *estate,
TupleTableSlot *slot,
TupleTableSlot *planSlot);
typedef TupleTableSlot **(*ExecForeignBatchInsert_function) (EState *estate,
ResultRelInfo *rinfo,
TupleTableSlot **slots,
TupleTableSlot **planSlots,
int *numSlots);
typedef int (*GetForeignModifyBatchSize_function) (ResultRelInfo *rinfo);
typedef TupleTableSlot *(*ExecForeignUpdate_function) (EState *estate,
ResultRelInfo *rinfo,
TupleTableSlot *slot,
......@@ -209,6 +217,8 @@ typedef struct FdwRoutine
PlanForeignModify_function PlanForeignModify;
BeginForeignModify_function BeginForeignModify;
ExecForeignInsert_function ExecForeignInsert;
ExecForeignBatchInsert_function ExecForeignBatchInsert;
GetForeignModifyBatchSize_function GetForeignModifyBatchSize;
ExecForeignUpdate_function ExecForeignUpdate;
ExecForeignDelete_function ExecForeignDelete;
EndForeignModify_function EndForeignModify;
......
......@@ -446,6 +446,12 @@ typedef struct ResultRelInfo
/* true when modifying foreign table directly */
bool ri_usesFdwDirectModify;
/* batch insert stuff */
int ri_NumSlots; /* number of slots in the array */
int ri_BatchSize; /* max slots inserted in a single batch */
TupleTableSlot **ri_Slots; /* input tuples for batch insert */
TupleTableSlot **ri_PlanSlots;
/* list of WithCheckOption's to be checked */
List *ri_WithCheckOptions;
......
......@@ -213,6 +213,10 @@ list_length(const List *l)
#define list_make4(x1,x2,x3,x4) \
list_make4_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \
list_make_ptr_cell(x3), list_make_ptr_cell(x4))
#define list_make5(x1,x2,x3,x4,x5) \
list_make5_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \
list_make_ptr_cell(x3), list_make_ptr_cell(x4), \
list_make_ptr_cell(x5))
#define list_make1_int(x1) \
list_make1_impl(T_IntList, list_make_int_cell(x1))
......@@ -224,6 +228,10 @@ list_length(const List *l)
#define list_make4_int(x1,x2,x3,x4) \
list_make4_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \
list_make_int_cell(x3), list_make_int_cell(x4))
#define list_make5_int(x1,x2,x3,x4,x5) \
list_make5_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \
list_make_int_cell(x3), list_make_int_cell(x4), \
list_make_int_cell(x5))
#define list_make1_oid(x1) \
list_make1_impl(T_OidList, list_make_oid_cell(x1))
......@@ -235,6 +243,10 @@ list_length(const List *l)
#define list_make4_oid(x1,x2,x3,x4) \
list_make4_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \
list_make_oid_cell(x3), list_make_oid_cell(x4))
#define list_make5_oid(x1,x2,x3,x4,x5) \
list_make5_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \
list_make_oid_cell(x3), list_make_oid_cell(x4), \
list_make_oid_cell(x5))
/*
* Locate the n'th cell (counting from 0) of the list.
......@@ -520,6 +532,9 @@ extern List *list_make3_impl(NodeTag t, ListCell datum1, ListCell datum2,
ListCell datum3);
extern List *list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2,
ListCell datum3, ListCell datum4);
extern List *list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2,
ListCell datum3, ListCell datum4,
ListCell datum5);
extern pg_nodiscard List *lappend(List *list, void *datum);
extern pg_nodiscard List *lappend_int(List *list, int datum);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment