Commit 99cea49d authored by Tomas Vondra's avatar Tomas Vondra

Fix copying data into slots with FDW batching

Commit b676ac44 optimized handling of tuple slots with bulk inserts
into foreign tables, so that the slots are initialized only once and
reused for all batches. The data was however copied into the slots only
after the initialization, inserting duplicate values when the slot gets
reused. Fixed by moving the ExecCopySlot outside the init branch.

The existing postgres_fdw tests failed to catch this due to inserting
data into foreign tables without unique indexes, and then checking only
the number of inserted rows. This adds a new test with both a unique
index and a check of inserted values.

Reported-by: Alexander Pyhalov
Discussion: https://postgr.es/m/7a8cf8d56b3d18e5c0bccd6cd42d04ac%40postgrespro.ru
parent 6b787d9e
...@@ -9761,7 +9761,87 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test; ...@@ -9761,7 +9761,87 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test;
(2 rows) (2 rows)
-- Clean up -- Clean up
DROP TABLE batch_table, batch_cp_upd_test CASCADE; DROP TABLE batch_table, batch_cp_upd_test, batch_table_p0, batch_table_p1 CASCADE;
-- Use partitioning
ALTER SERVER loopback OPTIONS (ADD batch_size '10');
CREATE TABLE batch_table ( x int, field1 text, field2 text) PARTITION BY HASH (x);
CREATE TABLE batch_table_p0 (LIKE batch_table);
ALTER TABLE batch_table_p0 ADD CONSTRAINT p0_pkey PRIMARY KEY (x);
CREATE FOREIGN TABLE batch_table_p0f
PARTITION OF batch_table
FOR VALUES WITH (MODULUS 2, REMAINDER 0)
SERVER loopback
OPTIONS (table_name 'batch_table_p0');
CREATE TABLE batch_table_p1 (LIKE batch_table);
ALTER TABLE batch_table_p1 ADD CONSTRAINT p1_pkey PRIMARY KEY (x);
CREATE FOREIGN TABLE batch_table_p1f
PARTITION OF batch_table
FOR VALUES WITH (MODULUS 2, REMAINDER 1)
SERVER loopback
OPTIONS (table_name 'batch_table_p1');
INSERT INTO batch_table SELECT i, 'test'||i, 'test'|| i FROM generate_series(1, 50) i;
SELECT COUNT(*) FROM batch_table;
count
-------
50
(1 row)
SELECT * FROM batch_table ORDER BY x;
x | field1 | field2
----+--------+--------
1 | test1 | test1
2 | test2 | test2
3 | test3 | test3
4 | test4 | test4
5 | test5 | test5
6 | test6 | test6
7 | test7 | test7
8 | test8 | test8
9 | test9 | test9
10 | test10 | test10
11 | test11 | test11
12 | test12 | test12
13 | test13 | test13
14 | test14 | test14
15 | test15 | test15
16 | test16 | test16
17 | test17 | test17
18 | test18 | test18
19 | test19 | test19
20 | test20 | test20
21 | test21 | test21
22 | test22 | test22
23 | test23 | test23
24 | test24 | test24
25 | test25 | test25
26 | test26 | test26
27 | test27 | test27
28 | test28 | test28
29 | test29 | test29
30 | test30 | test30
31 | test31 | test31
32 | test32 | test32
33 | test33 | test33
34 | test34 | test34
35 | test35 | test35
36 | test36 | test36
37 | test37 | test37
38 | test38 | test38
39 | test39 | test39
40 | test40 | test40
41 | test41 | test41
42 | test42 | test42
43 | test43 | test43
44 | test44 | test44
45 | test45 | test45
46 | test46 | test46
47 | test47 | test47
48 | test48 | test48
49 | test49 | test49
50 | test50 | test50
(50 rows)
ALTER SERVER loopback OPTIONS (DROP batch_size);
-- =================================================================== -- ===================================================================
-- test asynchronous execution -- test asynchronous execution
-- =================================================================== -- ===================================================================
......
...@@ -3083,7 +3083,34 @@ UPDATE batch_cp_upd_test t SET a = 1 FROM (VALUES (1), (2)) s(a) WHERE t.a = s.a ...@@ -3083,7 +3083,34 @@ UPDATE batch_cp_upd_test t SET a = 1 FROM (VALUES (1), (2)) s(a) WHERE t.a = s.a
SELECT tableoid::regclass, * FROM batch_cp_upd_test; SELECT tableoid::regclass, * FROM batch_cp_upd_test;
-- Clean up -- Clean up
DROP TABLE batch_table, batch_cp_upd_test CASCADE; DROP TABLE batch_table, batch_cp_upd_test, batch_table_p0, batch_table_p1 CASCADE;
-- Use partitioning
ALTER SERVER loopback OPTIONS (ADD batch_size '10');
CREATE TABLE batch_table ( x int, field1 text, field2 text) PARTITION BY HASH (x);
CREATE TABLE batch_table_p0 (LIKE batch_table);
ALTER TABLE batch_table_p0 ADD CONSTRAINT p0_pkey PRIMARY KEY (x);
CREATE FOREIGN TABLE batch_table_p0f
PARTITION OF batch_table
FOR VALUES WITH (MODULUS 2, REMAINDER 0)
SERVER loopback
OPTIONS (table_name 'batch_table_p0');
CREATE TABLE batch_table_p1 (LIKE batch_table);
ALTER TABLE batch_table_p1 ADD CONSTRAINT p1_pkey PRIMARY KEY (x);
CREATE FOREIGN TABLE batch_table_p1f
PARTITION OF batch_table
FOR VALUES WITH (MODULUS 2, REMAINDER 1)
SERVER loopback
OPTIONS (table_name 'batch_table_p1');
INSERT INTO batch_table SELECT i, 'test'||i, 'test'|| i FROM generate_series(1, 50) i;
SELECT COUNT(*) FROM batch_table;
SELECT * FROM batch_table ORDER BY x;
ALTER SERVER loopback OPTIONS (DROP batch_size);
-- =================================================================== -- ===================================================================
-- test asynchronous execution -- test asynchronous execution
......
...@@ -717,18 +717,20 @@ ExecInsert(ModifyTableState *mtstate, ...@@ -717,18 +717,20 @@ ExecInsert(ModifyTableState *mtstate,
resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] = resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
MakeSingleTupleTableSlot(tdesc, slot->tts_ops); MakeSingleTupleTableSlot(tdesc, slot->tts_ops);
ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
slot);
resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] = resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
MakeSingleTupleTableSlot(tdesc, planSlot->tts_ops); MakeSingleTupleTableSlot(tdesc, planSlot->tts_ops);
ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
planSlot);
/* remember how many batch slots we initialized */ /* remember how many batch slots we initialized */
resultRelInfo->ri_NumSlotsInitialized++; resultRelInfo->ri_NumSlotsInitialized++;
} }
ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
slot);
ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
planSlot);
resultRelInfo->ri_NumSlots++; resultRelInfo->ri_NumSlots++;
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment