Commit 056a5a3f authored by Peter Eisentraut's avatar Peter Eisentraut

Allow committing inside cursor loop

Previously, committing or aborting inside a cursor loop was prohibited
because that would close and remove the cursor.  To allow that,
automatically convert such cursors to holdable cursors so they survive
commits or rollbacks.  Portals now have a new state "auto-held", which
means they have been converted automatically from pinned.  An auto-held
portal is kept on transaction commit or rollback, but is still removed
when returning to the main loop on error.

This supports all languages that have cursor loop constructs: PL/pgSQL,
PL/Python, PL/Perl.
Reviewed-by: default avatarIldus Kurbangaliev <i.kurbangaliev@postgrespro.ru>
parent a2894cce
...@@ -722,11 +722,6 @@ $$; ...@@ -722,11 +722,6 @@ $$;
CALL transaction_test1(); CALL transaction_test1();
</programlisting> </programlisting>
</para> </para>
<para>
Transactions cannot be ended when a cursor created by
<function>spi_query</function> is open.
</para>
</listitem> </listitem>
</varlistentry> </varlistentry>
</variablelist> </variablelist>
......
...@@ -3510,8 +3510,41 @@ CALL transaction_test1(); ...@@ -3510,8 +3510,41 @@ CALL transaction_test1();
</para> </para>
<para> <para>
A transaction cannot be ended inside a loop over a query result, nor Special considerations apply to cursor loops. Consider this example:
inside a block with exception handlers. <programlisting>
CREATE PROCEDURE transaction_test2()
LANGUAGE plpgsql
AS $$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT * FROM test2 ORDER BY x LOOP
INSERT INTO test1 (a) VALUES (r.x);
COMMIT;
END LOOP;
END;
$$;
CALL transaction_test2();
</programlisting>
Normally, cursors are automatically closed at transaction commit.
However, a cursor created as part of a loop like this is automatically
converted to a holdable cursor by the first <command>COMMIT</command> or
<command>ROLLBACK</command>. That means that the cursor is fully
evaluated at the first <command>COMMIT</command> or
<command>ROLLBACK</command> rather than row by row. The cursor is still
removed automatically after the loop, so this is mostly invisible to the
user.
</para>
<para>
Transaction commands are not allowed in cursor loops driven by commands
that are not read-only (for example <command>UPDATE
... RETURNING</command>).
</para>
<para>
A transaction cannot be ended inside a block with exception handlers.
</para> </para>
</sect1> </sect1>
......
...@@ -1416,9 +1416,7 @@ CALL transaction_test1(); ...@@ -1416,9 +1416,7 @@ CALL transaction_test1();
</para> </para>
<para> <para>
Transactions cannot be ended when a cursor created by Transactions cannot be ended when an explicit subtransaction is active.
<literal>plpy.cursor</literal> is open or when an explicit subtransaction
is active.
</para> </para>
</sect1> </sect1>
......
...@@ -3938,6 +3938,8 @@ PostgresMain(int argc, char *argv[], ...@@ -3938,6 +3938,8 @@ PostgresMain(int argc, char *argv[],
if (am_walsender) if (am_walsender)
WalSndErrorCleanup(); WalSndErrorCleanup();
PortalErrorCleanup();
/* /*
* We can't release replication slots inside AbortTransaction() as we * We can't release replication slots inside AbortTransaction() as we
* need to be able to start and abort transactions while having a slot * need to be able to start and abort transactions while having a slot
......
...@@ -620,6 +620,36 @@ PortalHashTableDeleteAll(void) ...@@ -620,6 +620,36 @@ PortalHashTableDeleteAll(void)
} }
} }
/*
* "Hold" a portal. Prepare it for access by later transactions.
*/
static void
HoldPortal(Portal portal)
{
/*
* Note that PersistHoldablePortal() must release all resources
* used by the portal that are local to the creating transaction.
*/
PortalCreateHoldStore(portal);
PersistHoldablePortal(portal);
/* drop cached plan reference, if any */
PortalReleaseCachedPlan(portal);
/*
* Any resources belonging to the portal will be released in the
* upcoming transaction-wide cleanup; the portal will no longer
* have its own resources.
*/
portal->resowner = NULL;
/*
* Having successfully exported the holdable cursor, mark it as
* not belonging to this transaction.
*/
portal->createSubid = InvalidSubTransactionId;
portal->activeSubid = InvalidSubTransactionId;
}
/* /*
* Pre-commit processing for portals. * Pre-commit processing for portals.
...@@ -648,9 +678,10 @@ PreCommit_Portals(bool isPrepare) ...@@ -648,9 +678,10 @@ PreCommit_Portals(bool isPrepare)
/* /*
* There should be no pinned portals anymore. Complain if someone * There should be no pinned portals anymore. Complain if someone
* leaked one. * leaked one. Auto-held portals are allowed; we assume that whoever
* pinned them is managing them.
*/ */
if (portal->portalPinned) if (portal->portalPinned && !portal->autoHeld)
elog(ERROR, "cannot commit while a portal is pinned"); elog(ERROR, "cannot commit while a portal is pinned");
/* /*
...@@ -684,29 +715,7 @@ PreCommit_Portals(bool isPrepare) ...@@ -684,29 +715,7 @@ PreCommit_Portals(bool isPrepare)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has created a cursor WITH HOLD"))); errmsg("cannot PREPARE a transaction that has created a cursor WITH HOLD")));
/* HoldPortal(portal);
* Note that PersistHoldablePortal() must release all resources
* used by the portal that are local to the creating transaction.
*/
PortalCreateHoldStore(portal);
PersistHoldablePortal(portal);
/* drop cached plan reference, if any */
PortalReleaseCachedPlan(portal);
/*
* Any resources belonging to the portal will be released in the
* upcoming transaction-wide cleanup; the portal will no longer
* have its own resources.
*/
portal->resowner = NULL;
/*
* Having successfully exported the holdable cursor, mark it as
* not belonging to this transaction.
*/
portal->createSubid = InvalidSubTransactionId;
portal->activeSubid = InvalidSubTransactionId;
/* Report we changed state */ /* Report we changed state */
result = true; result = true;
...@@ -771,6 +780,14 @@ AtAbort_Portals(void) ...@@ -771,6 +780,14 @@ AtAbort_Portals(void)
if (portal->createSubid == InvalidSubTransactionId) if (portal->createSubid == InvalidSubTransactionId)
continue; continue;
/*
* Do nothing to auto-held cursors. This is similar to the case of a
* cursor from a previous transaction, but it could also be that the
* cursor was auto-held in this transaction, so it wants to live on.
*/
if (portal->autoHeld)
continue;
/* /*
* If it was created in the current transaction, we can't do normal * If it was created in the current transaction, we can't do normal
* shutdown on a READY portal either; it might refer to objects * shutdown on a READY portal either; it might refer to objects
...@@ -834,8 +851,11 @@ AtCleanup_Portals(void) ...@@ -834,8 +851,11 @@ AtCleanup_Portals(void)
if (portal->status == PORTAL_ACTIVE) if (portal->status == PORTAL_ACTIVE)
continue; continue;
/* Do nothing to cursors held over from a previous transaction */ /*
if (portal->createSubid == InvalidSubTransactionId) * Do nothing to cursors held over from a previous transaction or
* auto-held ones.
*/
if (portal->createSubid == InvalidSubTransactionId || portal->autoHeld)
{ {
Assert(portal->status != PORTAL_ACTIVE); Assert(portal->status != PORTAL_ACTIVE);
Assert(portal->resowner == NULL); Assert(portal->resowner == NULL);
...@@ -865,6 +885,32 @@ AtCleanup_Portals(void) ...@@ -865,6 +885,32 @@ AtCleanup_Portals(void)
} }
} }
/*
* Portal-related cleanup when we return to the main loop on error.
*
* This is different from the cleanup at transaction abort. Auto-held portals
* are cleaned up on error but not on transaction abort.
*/
void
PortalErrorCleanup(void)
{
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
hash_seq_init(&status, PortalHashTable);
while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
{
Portal portal = hentry->portal;
if (portal->autoHeld)
{
portal->portalPinned = false;
PortalDrop(portal, false);
}
}
}
/* /*
* Pre-subcommit processing for portals. * Pre-subcommit processing for portals.
* *
...@@ -1164,8 +1210,19 @@ ThereAreNoReadyPortals(void) ...@@ -1164,8 +1210,19 @@ ThereAreNoReadyPortals(void)
return true; return true;
} }
bool /*
ThereArePinnedPortals(void) * Hold all pinned portals.
*
* A procedural language implementation that uses pinned portals for its
* internally generated cursors can call this in its COMMIT command to convert
* those cursors to held cursors, so that they survive the transaction end.
* We mark those portals as "auto-held" so that exception exit knows to clean
* them up. (In normal, non-exception code paths, the PL needs to clean those
* portals itself, since transaction end won't do it anymore, but that should
* be normal practice anyway.)
*/
void
HoldPinnedPortals(void)
{ {
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
PortalHashEnt *hentry; PortalHashEnt *hentry;
...@@ -1176,9 +1233,24 @@ ThereArePinnedPortals(void) ...@@ -1176,9 +1233,24 @@ ThereArePinnedPortals(void)
{ {
Portal portal = hentry->portal; Portal portal = hentry->portal;
if (portal->portalPinned) if (portal->portalPinned && !portal->autoHeld)
return true; {
} /*
* Doing transaction control, especially abort, inside a cursor
* loop that is not read-only, for example using UPDATE
* ... RETURNING, has weird semantics issues. Also, this
* implementation wouldn't work, because such portals cannot be
* held. (The core grammar enforces that only SELECT statements
* can drive a cursor, but for example PL/pgSQL does not restrict
* it.)
*/
if (portal->strategy != PORTAL_ONE_SELECT)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION),
errmsg("cannot perform transaction commands inside a cursor loop that is not read-only")));
return false; portal->autoHeld = true;
HoldPortal(portal);
}
}
} }
...@@ -147,6 +147,8 @@ typedef struct PortalData ...@@ -147,6 +147,8 @@ typedef struct PortalData
/* Status data */ /* Status data */
PortalStatus status; /* see above */ PortalStatus status; /* see above */
bool portalPinned; /* a pinned portal can't be dropped */ bool portalPinned; /* a pinned portal can't be dropped */
bool autoHeld; /* was automatically converted from pinned to
* held (see HoldPinnedPortals()) */
/* If not NULL, Executor is active; call ExecutorEnd eventually: */ /* If not NULL, Executor is active; call ExecutorEnd eventually: */
QueryDesc *queryDesc; /* info needed for executor invocation */ QueryDesc *queryDesc; /* info needed for executor invocation */
...@@ -204,6 +206,7 @@ extern void EnablePortalManager(void); ...@@ -204,6 +206,7 @@ extern void EnablePortalManager(void);
extern bool PreCommit_Portals(bool isPrepare); extern bool PreCommit_Portals(bool isPrepare);
extern void AtAbort_Portals(void); extern void AtAbort_Portals(void);
extern void AtCleanup_Portals(void); extern void AtCleanup_Portals(void);
extern void PortalErrorCleanup(void);
extern void AtSubCommit_Portals(SubTransactionId mySubid, extern void AtSubCommit_Portals(SubTransactionId mySubid,
SubTransactionId parentSubid, SubTransactionId parentSubid,
ResourceOwner parentXactOwner); ResourceOwner parentXactOwner);
...@@ -231,6 +234,6 @@ extern PlannedStmt *PortalGetPrimaryStmt(Portal portal); ...@@ -231,6 +234,6 @@ extern PlannedStmt *PortalGetPrimaryStmt(Portal portal);
extern void PortalCreateHoldStore(Portal portal); extern void PortalCreateHoldStore(Portal portal);
extern void PortalHashTableDeleteAll(void); extern void PortalHashTableDeleteAll(void);
extern bool ThereAreNoReadyPortals(void); extern bool ThereAreNoReadyPortals(void);
extern bool ThereArePinnedPortals(void); extern void HoldPinnedPortals(void);
#endif /* PORTAL_H */ #endif /* PORTAL_H */
...@@ -105,11 +105,44 @@ while (defined($row = spi_fetchrow($sth))) { ...@@ -105,11 +105,44 @@ while (defined($row = spi_fetchrow($sth))) {
spi_commit(); spi_commit();
} }
$$; $$;
ERROR: cannot commit transaction while a cursor is open at line 6.
CONTEXT: PL/Perl anonymous code block
SELECT * FROM test1; SELECT * FROM test1;
a | b a | b
---+--- ---+---
0 |
1 |
2 |
3 |
4 |
(5 rows)
-- check that this doesn't leak a holdable portal
SELECT * FROM pg_cursors;
name | statement | is_holdable | is_binary | is_scrollable | creation_time
------+-----------+-------------+-----------+---------------+---------------
(0 rows)
-- error in cursor loop with commit
TRUNCATE test1;
DO LANGUAGE plperl $$
my $sth = spi_query("SELECT * FROM test2 ORDER BY x");
my $row;
while (defined($row = spi_fetchrow($sth))) {
spi_exec_query("INSERT INTO test1 (a) VALUES (12/(" . $row->{x} . "-2))");
spi_commit();
}
$$;
ERROR: division by zero at line 5.
CONTEXT: PL/Perl anonymous code block
SELECT * FROM test1;
a | b
-----+---
-6 |
-12 |
(2 rows)
SELECT * FROM pg_cursors;
name | statement | is_holdable | is_binary | is_scrollable | creation_time
------+-----------+-------------+-----------+---------------+---------------
(0 rows) (0 rows)
-- rollback inside cursor loop -- rollback inside cursor loop
...@@ -122,12 +155,42 @@ while (defined($row = spi_fetchrow($sth))) { ...@@ -122,12 +155,42 @@ while (defined($row = spi_fetchrow($sth))) {
spi_rollback(); spi_rollback();
} }
$$; $$;
ERROR: cannot abort transaction while a cursor is open at line 6.
CONTEXT: PL/Perl anonymous code block
SELECT * FROM test1; SELECT * FROM test1;
a | b a | b
---+--- ---+---
(0 rows) (0 rows)
SELECT * FROM pg_cursors;
name | statement | is_holdable | is_binary | is_scrollable | creation_time
------+-----------+-------------+-----------+---------------+---------------
(0 rows)
-- first commit then rollback inside cursor loop
TRUNCATE test1;
DO LANGUAGE plperl $$
my $sth = spi_query("SELECT * FROM test2 ORDER BY x");
my $row;
while (defined($row = spi_fetchrow($sth))) {
spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")");
if ($row->{x} % 2 == 0) {
spi_commit();
} else {
spi_rollback();
}
}
$$;
SELECT * FROM test1;
a | b
---+---
0 |
2 |
4 |
(3 rows)
SELECT * FROM pg_cursors;
name | statement | is_holdable | is_binary | is_scrollable | creation_time
------+-----------+-------------+-----------+---------------+---------------
(0 rows)
DROP TABLE test1; DROP TABLE test1;
DROP TABLE test2; DROP TABLE test2;
...@@ -3965,10 +3965,7 @@ plperl_spi_commit(void) ...@@ -3965,10 +3965,7 @@ plperl_spi_commit(void)
PG_TRY(); PG_TRY();
{ {
if (ThereArePinnedPortals()) HoldPinnedPortals();
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot commit transaction while a cursor is open")));
SPI_commit(); SPI_commit();
SPI_start_transaction(); SPI_start_transaction();
...@@ -3995,10 +3992,7 @@ plperl_spi_rollback(void) ...@@ -3995,10 +3992,7 @@ plperl_spi_rollback(void)
PG_TRY(); PG_TRY();
{ {
if (ThereArePinnedPortals()) HoldPinnedPortals();
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION),
errmsg("cannot abort transaction while a cursor is open")));
SPI_rollback(); SPI_rollback();
SPI_start_transaction(); SPI_start_transaction();
......
...@@ -100,6 +100,26 @@ $$; ...@@ -100,6 +100,26 @@ $$;
SELECT * FROM test1; SELECT * FROM test1;
-- check that this doesn't leak a holdable portal
SELECT * FROM pg_cursors;
-- error in cursor loop with commit
TRUNCATE test1;
DO LANGUAGE plperl $$
my $sth = spi_query("SELECT * FROM test2 ORDER BY x");
my $row;
while (defined($row = spi_fetchrow($sth))) {
spi_exec_query("INSERT INTO test1 (a) VALUES (12/(" . $row->{x} . "-2))");
spi_commit();
}
$$;
SELECT * FROM test1;
SELECT * FROM pg_cursors;
-- rollback inside cursor loop -- rollback inside cursor loop
TRUNCATE test1; TRUNCATE test1;
...@@ -115,6 +135,29 @@ $$; ...@@ -115,6 +135,29 @@ $$;
SELECT * FROM test1; SELECT * FROM test1;
SELECT * FROM pg_cursors;
-- first commit then rollback inside cursor loop
TRUNCATE test1;
DO LANGUAGE plperl $$
my $sth = spi_query("SELECT * FROM test2 ORDER BY x");
my $row;
while (defined($row = spi_fetchrow($sth))) {
spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")");
if ($row->{x} % 2 == 0) {
spi_commit();
} else {
spi_rollback();
}
}
$$;
SELECT * FROM test1;
SELECT * FROM pg_cursors;
DROP TABLE test1; DROP TABLE test1;
DROP TABLE test2; DROP TABLE test2;
...@@ -195,11 +195,47 @@ BEGIN ...@@ -195,11 +195,47 @@ BEGIN
END LOOP; END LOOP;
END; END;
$$; $$;
ERROR: committing inside a cursor loop is not supported
CONTEXT: PL/pgSQL function inline_code_block line 7 at COMMIT
SELECT * FROM test1; SELECT * FROM test1;
a | b a | b
---+--- ---+---
0 |
1 |
2 |
3 |
4 |
(5 rows)
-- check that this doesn't leak a holdable portal
SELECT * FROM pg_cursors;
name | statement | is_holdable | is_binary | is_scrollable | creation_time
------+-----------+-------------+-----------+---------------+---------------
(0 rows)
-- error in cursor loop with commit
TRUNCATE test1;
DO LANGUAGE plpgsql $$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT * FROM test2 ORDER BY x LOOP
INSERT INTO test1 (a) VALUES (12/(r.x-2));
COMMIT;
END LOOP;
END;
$$;
ERROR: division by zero
CONTEXT: SQL statement "INSERT INTO test1 (a) VALUES (12/(r.x-2))"
PL/pgSQL function inline_code_block line 6 at SQL statement
SELECT * FROM test1;
a | b
-----+---
-6 |
-12 |
(2 rows)
SELECT * FROM pg_cursors;
name | statement | is_holdable | is_binary | is_scrollable | creation_time
------+-----------+-------------+-----------+---------------+---------------
(0 rows) (0 rows)
-- rollback inside cursor loop -- rollback inside cursor loop
...@@ -214,13 +250,79 @@ BEGIN ...@@ -214,13 +250,79 @@ BEGIN
END LOOP; END LOOP;
END; END;
$$; $$;
ERROR: cannot abort transaction inside a cursor loop SELECT * FROM test1;
a | b
---+---
(0 rows)
SELECT * FROM pg_cursors;
name | statement | is_holdable | is_binary | is_scrollable | creation_time
------+-----------+-------------+-----------+---------------+---------------
(0 rows)
-- first commit then rollback inside cursor loop
TRUNCATE test1;
DO LANGUAGE plpgsql $$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT * FROM test2 ORDER BY x LOOP
INSERT INTO test1 (a) VALUES (r.x);
IF r.x % 2 = 0 THEN
COMMIT;
ELSE
ROLLBACK;
END IF;
END LOOP;
END;
$$;
SELECT * FROM test1;
a | b
---+---
0 |
2 |
4 |
(3 rows)
SELECT * FROM pg_cursors;
name | statement | is_holdable | is_binary | is_scrollable | creation_time
------+-----------+-------------+-----------+---------------+---------------
(0 rows)
-- rollback inside cursor loop
TRUNCATE test1;
DO LANGUAGE plpgsql $$
DECLARE
r RECORD;
BEGIN
FOR r IN UPDATE test2 SET x = x * 2 RETURNING x LOOP
INSERT INTO test1 (a) VALUES (r.x);
ROLLBACK;
END LOOP;
END;
$$;
ERROR: cannot perform transaction commands inside a cursor loop that is not read-only
CONTEXT: PL/pgSQL function inline_code_block line 7 at ROLLBACK CONTEXT: PL/pgSQL function inline_code_block line 7 at ROLLBACK
SELECT * FROM test1; SELECT * FROM test1;
a | b a | b
---+--- ---+---
(0 rows) (0 rows)
SELECT * FROM test2;
x
---
0
1
2
3
4
(5 rows)
SELECT * FROM pg_cursors;
name | statement | is_holdable | is_binary | is_scrollable | creation_time
------+-----------+-------------+-----------+---------------+---------------
(0 rows)
-- commit inside block with exception handler -- commit inside block with exception handler
TRUNCATE test1; TRUNCATE test1;
DO LANGUAGE plpgsql $$ DO LANGUAGE plpgsql $$
......
...@@ -4677,14 +4677,7 @@ exec_stmt_close(PLpgSQL_execstate *estate, PLpgSQL_stmt_close *stmt) ...@@ -4677,14 +4677,7 @@ exec_stmt_close(PLpgSQL_execstate *estate, PLpgSQL_stmt_close *stmt)
static int static int
exec_stmt_commit(PLpgSQL_execstate *estate, PLpgSQL_stmt_commit *stmt) exec_stmt_commit(PLpgSQL_execstate *estate, PLpgSQL_stmt_commit *stmt)
{ {
/* HoldPinnedPortals();
* XXX This could be implemented by converting the pinned portals to
* holdable ones and organizing the cleanup separately.
*/
if (ThereArePinnedPortals())
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("committing inside a cursor loop is not supported")));
SPI_commit(); SPI_commit();
SPI_start_transaction(); SPI_start_transaction();
...@@ -4703,14 +4696,7 @@ exec_stmt_commit(PLpgSQL_execstate *estate, PLpgSQL_stmt_commit *stmt) ...@@ -4703,14 +4696,7 @@ exec_stmt_commit(PLpgSQL_execstate *estate, PLpgSQL_stmt_commit *stmt)
static int static int
exec_stmt_rollback(PLpgSQL_execstate *estate, PLpgSQL_stmt_rollback *stmt) exec_stmt_rollback(PLpgSQL_execstate *estate, PLpgSQL_stmt_rollback *stmt)
{ {
/* HoldPinnedPortals();
* Unlike the COMMIT case above, this might not make sense at all,
* especially if the query driving the cursor loop has side effects.
*/
if (ThereArePinnedPortals())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION),
errmsg("cannot abort transaction inside a cursor loop")));
SPI_rollback(); SPI_rollback();
SPI_start_transaction(); SPI_start_transaction();
......
...@@ -175,6 +175,28 @@ $$; ...@@ -175,6 +175,28 @@ $$;
SELECT * FROM test1; SELECT * FROM test1;
-- check that this doesn't leak a holdable portal
SELECT * FROM pg_cursors;
-- error in cursor loop with commit
TRUNCATE test1;
DO LANGUAGE plpgsql $$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT * FROM test2 ORDER BY x LOOP
INSERT INTO test1 (a) VALUES (12/(r.x-2));
COMMIT;
END LOOP;
END;
$$;
SELECT * FROM test1;
SELECT * FROM pg_cursors;
-- rollback inside cursor loop -- rollback inside cursor loop
TRUNCATE test1; TRUNCATE test1;
...@@ -192,6 +214,51 @@ $$; ...@@ -192,6 +214,51 @@ $$;
SELECT * FROM test1; SELECT * FROM test1;
SELECT * FROM pg_cursors;
-- first commit then rollback inside cursor loop
TRUNCATE test1;
DO LANGUAGE plpgsql $$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT * FROM test2 ORDER BY x LOOP
INSERT INTO test1 (a) VALUES (r.x);
IF r.x % 2 = 0 THEN
COMMIT;
ELSE
ROLLBACK;
END IF;
END LOOP;
END;
$$;
SELECT * FROM test1;
SELECT * FROM pg_cursors;
-- rollback inside cursor loop
TRUNCATE test1;
DO LANGUAGE plpgsql $$
DECLARE
r RECORD;
BEGIN
FOR r IN UPDATE test2 SET x = x * 2 RETURNING x LOOP
INSERT INTO test1 (a) VALUES (r.x);
ROLLBACK;
END LOOP;
END;
$$;
SELECT * FROM test1;
SELECT * FROM test2;
SELECT * FROM pg_cursors;
-- commit inside block with exception handler -- commit inside block with exception handler
TRUNCATE test1; TRUNCATE test1;
......
...@@ -111,11 +111,44 @@ for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): ...@@ -111,11 +111,44 @@ for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"):
plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x'])
plpy.commit() plpy.commit()
$$; $$;
ERROR: cannot commit transaction while a cursor is open
CONTEXT: PL/Python anonymous code block
SELECT * FROM test1; SELECT * FROM test1;
a | b a | b
---+--- ---+---
0 |
1 |
2 |
3 |
4 |
(5 rows)
-- check that this doesn't leak a holdable portal
SELECT * FROM pg_cursors;
name | statement | is_holdable | is_binary | is_scrollable | creation_time
------+-----------+-------------+-----------+---------------+---------------
(0 rows)
-- error in cursor loop with commit
TRUNCATE test1;
DO LANGUAGE plpythonu $$
for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"):
plpy.execute("INSERT INTO test1 (a) VALUES (12/(%s-2))" % row['x'])
plpy.commit()
$$;
ERROR: spiexceptions.DivisionByZero: division by zero
CONTEXT: Traceback (most recent call last):
PL/Python anonymous code block, line 3, in <module>
plpy.execute("INSERT INTO test1 (a) VALUES (12/(%s-2))" % row['x'])
PL/Python anonymous code block
SELECT * FROM test1;
a | b
-----+---
-6 |
-12 |
(2 rows)
SELECT * FROM pg_cursors;
name | statement | is_holdable | is_binary | is_scrollable | creation_time
------+-----------+-------------+-----------+---------------+---------------
(0 rows) (0 rows)
-- rollback inside cursor loop -- rollback inside cursor loop
...@@ -125,12 +158,38 @@ for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): ...@@ -125,12 +158,38 @@ for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"):
plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x'])
plpy.rollback() plpy.rollback()
$$; $$;
ERROR: cannot abort transaction while a cursor is open
CONTEXT: PL/Python anonymous code block
SELECT * FROM test1; SELECT * FROM test1;
a | b a | b
---+--- ---+---
(0 rows) (0 rows)
SELECT * FROM pg_cursors;
name | statement | is_holdable | is_binary | is_scrollable | creation_time
------+-----------+-------------+-----------+---------------+---------------
(0 rows)
-- first commit then rollback inside cursor loop
TRUNCATE test1;
DO LANGUAGE plpythonu $$
for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"):
plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x'])
if row['x'] % 2 == 0:
plpy.commit()
else:
plpy.rollback()
$$;
SELECT * FROM test1;
a | b
---+---
0 |
2 |
4 |
(3 rows)
SELECT * FROM pg_cursors;
name | statement | is_holdable | is_binary | is_scrollable | creation_time
------+-----------+-------------+-----------+---------------+---------------
(0 rows)
DROP TABLE test1; DROP TABLE test1;
DROP TABLE test2; DROP TABLE test2;
...@@ -594,10 +594,7 @@ PLy_commit(PyObject *self, PyObject *args) ...@@ -594,10 +594,7 @@ PLy_commit(PyObject *self, PyObject *args)
{ {
PLyExecutionContext *exec_ctx = PLy_current_execution_context(); PLyExecutionContext *exec_ctx = PLy_current_execution_context();
if (ThereArePinnedPortals()) HoldPinnedPortals();
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot commit transaction while a cursor is open")));
SPI_commit(); SPI_commit();
SPI_start_transaction(); SPI_start_transaction();
...@@ -613,10 +610,7 @@ PLy_rollback(PyObject *self, PyObject *args) ...@@ -613,10 +610,7 @@ PLy_rollback(PyObject *self, PyObject *args)
{ {
PLyExecutionContext *exec_ctx = PLy_current_execution_context(); PLyExecutionContext *exec_ctx = PLy_current_execution_context();
if (ThereArePinnedPortals()) HoldPinnedPortals();
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION),
errmsg("cannot abort transaction while a cursor is open")));
SPI_rollback(); SPI_rollback();
SPI_start_transaction(); SPI_start_transaction();
......
...@@ -99,6 +99,23 @@ $$; ...@@ -99,6 +99,23 @@ $$;
SELECT * FROM test1; SELECT * FROM test1;
-- check that this doesn't leak a holdable portal
SELECT * FROM pg_cursors;
-- error in cursor loop with commit
TRUNCATE test1;
DO LANGUAGE plpythonu $$
for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"):
plpy.execute("INSERT INTO test1 (a) VALUES (12/(%s-2))" % row['x'])
plpy.commit()
$$;
SELECT * FROM test1;
SELECT * FROM pg_cursors;
-- rollback inside cursor loop -- rollback inside cursor loop
TRUNCATE test1; TRUNCATE test1;
...@@ -111,6 +128,25 @@ $$; ...@@ -111,6 +128,25 @@ $$;
SELECT * FROM test1; SELECT * FROM test1;
SELECT * FROM pg_cursors;
-- first commit then rollback inside cursor loop
TRUNCATE test1;
DO LANGUAGE plpythonu $$
for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"):
plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x'])
if row['x'] % 2 == 0:
plpy.commit()
else:
plpy.rollback()
$$;
SELECT * FROM test1;
SELECT * FROM pg_cursors;
DROP TABLE test1; DROP TABLE test1;
DROP TABLE test2; DROP TABLE test2;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment