Commit 8de72b66 authored by Simon Riggs's avatar Simon Riggs

COPY FREEZE and mark committed on fresh tables.

When a relfilenode is created in this subtransaction or
a committed child transaction and it cannot otherwise
be seen by our own process, mark tuples committed ahead
of transaction commit for all COPY commands in same
transaction. If FREEZE specified on COPY
and pre-conditions met then rows will also be frozen.
Both options designed to avoid revisiting rows after commit,
increasing performance of subsequent commands after
data load and upgrade. pg_restore changes later.

Simon Riggs, review comments from Heikki Linnakangas, Noah Misch and design
input from Tom Lane, Robert Haas and Kevin Grittner
parent 44c03efe
......@@ -34,6 +34,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
FORMAT <replaceable class="parameter">format_name</replaceable>
OIDS [ <replaceable class="parameter">boolean</replaceable> ]
FREEZE [ <replaceable class="parameter">boolean</replaceable> ]
DELIMITER '<replaceable class="parameter">delimiter_character</replaceable>'
NULL '<replaceable class="parameter">null_string</replaceable>'
HEADER [ <replaceable class="parameter">boolean</replaceable> ]
......@@ -181,6 +182,28 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
</listitem>
</varlistentry>
<varlistentry>
<term><literal>FREEZE</literal></term>
<listitem>
<para>
Specifies copying the data with rows already frozen, just as they
would be after running the <command>VACUUM FREEZE</> command.
This is intended as a performance option for initial data loading.
Rows will be frozen only if the table being loaded has been created
in the current subtransaction, there are no cursors open and there
are no older snapshots held by this transaction. If those conditions
are not met the command will continue without error though will not
freeze rows.
</para>
<para>
Note that all sessions will immediately be able to see the data
once it has been successfully loaded. This violates the normal rules
of MVCC visibility and by specifying this option the user acknowledges
explicitly that this is understood.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>DELIMITER</literal></term>
<listitem>
......
......@@ -1875,6 +1875,14 @@ FreeBulkInsertState(BulkInsertState bistate)
* The HEAP_INSERT_SKIP_FSM option is passed directly to
* RelationGetBufferForTuple, which see for more info.
*
* HEAP_INSERT_COMMITTED should only be specified for inserts into
* relfilenodes created during the current subtransaction and when
* there are no prior snapshots or pre-existing portals open.
*
* HEAP_INSERT_FROZEN only has meaning when HEAP_INSERT_COMMITTED is
* also set. This causes rows to be frozen, which is an MVCC violation
* and requires explicit options chosen by user.
*
* Note that these options will be applied when inserting into the heap's
* TOAST table, too, if the tuple requires any out-of-line data.
*
......@@ -2078,6 +2086,13 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
if (options & HEAP_INSERT_COMMITTED)
{
tup->t_data->t_infomask |= HEAP_XMIN_COMMITTED;
if (options & HEAP_INSERT_FROZEN)
HeapTupleHeaderSetXmin(tup->t_data, FrozenTransactionId);
}
else
HeapTupleHeaderSetXmin(tup->t_data, xid);
HeapTupleHeaderSetCmin(tup->t_data, cid);
HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */
......
......@@ -44,6 +44,7 @@
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/portal.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
......@@ -109,6 +110,7 @@ typedef struct CopyStateData
char *filename; /* filename, or NULL for STDIN/STDOUT */
bool binary; /* binary format? */
bool oids; /* include OIDs? */
bool freeze; /* freeze rows on loading? */
bool csv_mode; /* Comma Separated Value format? */
bool header_line; /* CSV header line? */
char *null_print; /* NULL marker string (server encoding!) */
......@@ -895,6 +897,14 @@ ProcessCopyOptions(CopyState cstate,
errmsg("conflicting or redundant options")));
cstate->oids = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "freeze") == 0)
{
if (cstate->freeze)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
cstate->freeze = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "delimiter") == 0)
{
if (cstate->delim)
......@@ -1974,7 +1984,30 @@ CopyFrom(CopyState cstate)
hi_options |= HEAP_INSERT_SKIP_FSM;
if (!XLogIsNeeded())
hi_options |= HEAP_INSERT_SKIP_WAL;
/*
* Optimize if new relfilenode was created in this subxact or
* one of its committed children and we won't see those rows later
* as part of an earlier scan or command. This ensures that if this
* subtransaction aborts then the frozen rows won't be visible
* after xact cleanup. Note that the stronger test of exactly
* which subtransaction created it is crucial for correctness
* of this optimisation.
*/
if (ThereAreNoPriorRegisteredSnapshots() &&
ThereAreNoReadyPortals() &&
cstate->rel->rd_newRelfilenodeSubid == GetCurrentSubTransactionId())
{
hi_options |= HEAP_INSERT_COMMITTED;
if (cstate->freeze)
hi_options |= HEAP_INSERT_FROZEN;
}
}
if (cstate->freeze && (hi_options & HEAP_INSERT_FROZEN) == 0)
ereport(NOTICE,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("FREEZE option specified but pre-conditions not met")));
/*
* We need a ResultRelInfo so we can use the regular executor's
......
......@@ -2383,6 +2383,10 @@ copy_opt_item:
{
$$ = makeDefElem("oids", (Node *)makeInteger(TRUE));
}
| FREEZE
{
$$ = makeDefElem("freeze", (Node *)makeInteger(TRUE));
}
| DELIMITER opt_as Sconst
{
$$ = makeDefElem("delimiter", (Node *)makeString($3));
......
......@@ -1055,3 +1055,22 @@ pg_cursor(PG_FUNCTION_ARGS)
return (Datum) 0;
}
bool
ThereAreNoReadyPortals(void)
{
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
hash_seq_init(&status, PortalHashTable);
while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
{
Portal portal = hentry->portal;
if (portal->status == PORTAL_READY)
return false;
}
return true;
}
......@@ -1184,3 +1184,12 @@ DeleteAllExportedSnapshotFiles(void)
FreeDir(s_dir);
}
bool
ThereAreNoPriorRegisteredSnapshots(void)
{
if (RegisteredSnapshots <= 1)
return true;
return false;
}
......@@ -26,6 +26,8 @@
/* "options" flag bits for heap_insert */
#define HEAP_INSERT_SKIP_WAL 0x0001
#define HEAP_INSERT_SKIP_FSM 0x0002
#define HEAP_INSERT_COMMITTED 0x0004
#define HEAP_INSERT_FROZEN 0x0008
typedef struct BulkInsertStateData *BulkInsertState;
......
......@@ -220,5 +220,6 @@ extern void PortalDefineQuery(Portal portal,
extern Node *PortalListGetPrimaryStmt(List *stmts);
extern void PortalCreateHoldStore(Portal portal);
extern void PortalHashTableDeleteAll(void);
extern bool ThereAreNoReadyPortals(void);
#endif /* PORTAL_H */
......@@ -48,5 +48,6 @@ extern Datum pg_export_snapshot(PG_FUNCTION_ARGS);
extern void ImportSnapshot(const char *idstr);
extern bool XactHasExportedSnapshots(void);
extern void DeleteAllExportedSnapshotFiles(void);
extern bool ThereAreNoPriorRegisteredSnapshots(void);
#endif /* SNAPMGR_H */
......@@ -254,6 +254,112 @@ SELECT * FROM testnull;
|
(4 rows)
CREATE TABLE vistest (LIKE testeoc);
BEGIN;
TRUNCATE vistest;
COPY vistest FROM stdin CSV;
SELECT * FROM vistest;
a
---
a
b
(2 rows)
SAVEPOINT s1;
TRUNCATE vistest;
COPY vistest FROM stdin CSV;
SELECT * FROM vistest;
a
---
d
e
(2 rows)
COMMIT;
BEGIN;
TRUNCATE vistest;
COPY vistest FROM stdin CSV FREEZE;
SELECT * FROM vistest;
a
---
a
b
(2 rows)
SAVEPOINT s1;
TRUNCATE vistest;
COPY vistest FROM stdin CSV FREEZE;
SELECT * FROM vistest;
a
---
d
e
(2 rows)
COMMIT;
TRUNCATE vistest;
COPY vistest FROM stdin CSV FREEZE;
NOTICE: FREEZE option specified but pre-conditions not met
SELECT * FROM vistest;
a
---
a
b
(2 rows)
BEGIN;
INSERT INTO vistest VALUES ('z');
SAVEPOINT s1;
TRUNCATE vistest;
ROLLBACK TO SAVEPOINT s1;
-- FREEZE should be silently ignored here
COPY vistest FROM stdin CSV FREEZE;
NOTICE: FREEZE option specified but pre-conditions not met
SELECT * FROM vistest;
a
---
a
b
z
d
e
(5 rows)
COMMIT;
CREATE FUNCTION truncate_in_subxact() RETURNS VOID AS
$$
BEGIN
SELECT * FROM nonexistent;
EXCEPTION
WHEN OTHERS THEN
TRUNCATE vistest;
END;
$$ language plpgsql;
BEGIN;
INSERT INTO vistest VALUES ('z');
SELECT truncate_in_subxact();
truncate_in_subxact
---------------------
(1 row)
COPY vistest FROM stdin CSV FREEZE;
SELECT * FROM vistest;
a
---
d
e
(2 rows)
COMMIT;
SELECT * FROM vistest;
a
---
d
e
(2 rows)
DROP TABLE vistest;
DROP TABLE x, y;
DROP FUNCTION fn_x_before();
DROP FUNCTION fn_x_after();
......@@ -179,6 +179,84 @@ COPY testnull FROM stdin WITH NULL AS E'\\0';
SELECT * FROM testnull;
CREATE TABLE vistest (LIKE testeoc);
BEGIN;
TRUNCATE vistest;
COPY vistest FROM stdin CSV;
a
b
\.
SELECT * FROM vistest;
SAVEPOINT s1;
TRUNCATE vistest;
COPY vistest FROM stdin CSV;
d
e
\.
SELECT * FROM vistest;
COMMIT;
BEGIN;
TRUNCATE vistest;
COPY vistest FROM stdin CSV FREEZE;
a
b
\.
SELECT * FROM vistest;
SAVEPOINT s1;
TRUNCATE vistest;
COPY vistest FROM stdin CSV FREEZE;
d
e
\.
SELECT * FROM vistest;
COMMIT;
BEGIN;
TRUNCATE vistest;
COPY vistest FROM stdin CSV FREEZE;
x
y
\.
SELECT * FROM vistest;
COMMIT;
TRUNCATE vistest;
COPY vistest FROM stdin CSV FREEZE;
p
g
\.
BEGIN;
INSERT INTO vistest VALUES ('z');
SAVEPOINT s1;
TRUNCATE vistest;
ROLLBACK TO SAVEPOINT s1;
-- FREEZE should be silently ignored here
COPY vistest FROM stdin CSV FREEZE;
d
e
\.
SELECT * FROM vistest;
COMMIT;
CREATE FUNCTION truncate_in_subxact() RETURNS VOID AS
$$
BEGIN
SELECT * FROM nonexistent;
EXCEPTION
WHEN OTHERS THEN
TRUNCATE vistest;
END;
$$ language plpgsql;
BEGIN;
INSERT INTO vistest VALUES ('z');
SELECT truncate_in_subxact();
COPY vistest FROM stdin CSV FREEZE;
d
e
\.
SELECT * FROM vistest;
COMMIT;
SELECT * FROM vistest;
DROP TABLE vistest;
DROP FUNCTION truncate_in_subxact();
DROP TABLE x, y;
DROP FUNCTION fn_x_before();
DROP FUNCTION fn_x_after();
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment