Commit 0ac5ad51 authored by Alvaro Herrera's avatar Alvaro Herrera

Improve concurrency of foreign key locking

This patch introduces two additional lock modes for tuples: "SELECT FOR
KEY SHARE" and "SELECT FOR NO KEY UPDATE".  These don't block each
other, in contrast with already existing "SELECT FOR SHARE" and "SELECT
FOR UPDATE".  UPDATE commands that do not modify the values stored in
the columns that are part of the key of the tuple now grab a SELECT FOR
NO KEY UPDATE lock on the tuple, allowing them to proceed concurrently
with tuple locks of the FOR KEY SHARE variety.

Foreign key triggers now use FOR KEY SHARE instead of FOR SHARE; this
means the concurrency improvement applies to them, which is the whole
point of this patch.

The added tuple lock semantics require some rejiggering of the multixact
module, so that the locking level that each transaction is holding can
be stored alongside its Xid.  Also, multixacts now need to persist
across server restarts and crashes, because they can now represent not
only tuple locks, but also tuple updates.  This means we need more
careful tracking of lifetime of pg_multixact SLRU files; since they now
persist longer, we require more infrastructure to figure out when they
can be removed.  pg_upgrade also needs to be careful to copy
pg_multixact files over from the old server to the new, or at least part
of multixact.c state, depending on the versions of the old and new
servers.

Tuple time qualification rules (HeapTupleSatisfies routines) need to be
careful not to consider tuples with the "is multi" infomask bit set as
being only locked; they might need to look up MultiXact values (i.e.
possibly do pg_multixact I/O) to find out the Xid that updated a tuple,
whereas they previously were assured to only use information readily
available from the tuple header.  This is considered acceptable, because
the extra I/O would involve cases that would previously cause some
commands to block waiting for concurrent transactions to finish.

Another important change is the fact that locking tuples that have
previously been updated causes the future versions to be marked as
locked, too; this is essential for correctness of foreign key checks.
This causes additional WAL-logging, also (there was previously a single
WAL record for a locked tuple; now there are as many as updated copies
of the tuple there exist.)

With all this in place, contention related to tuples being checked by
foreign key rules should be much reduced.

As a bonus, the old behavior that a subtransaction grabbing a stronger
tuple lock than the parent (sub)transaction held on a given tuple and
later aborting caused the weaker lock to be lost, has been fixed.

Many new spec files were added for isolation tester framework, to ensure
overall behavior is sane.  There's probably room for several more tests.

There were several reviewers of this patch; in particular, Noah Misch
and Andres Freund spent considerable time in it.  Original idea for the
patch came from Simon Riggs, after a problem report by Joel Jacobson.
Most code is from me, with contributions from Marti Raudsepp, Alexander
Shulgin, Noah Misch and Andres Freund.

This patch was discussed in several pgsql-hackers threads; the most
important start at the following message-ids:
	AANLkTimo9XVcEzfiBR-ut3KVNDkjm2Vxh+t8kAmWjPuv@mail.gmail.com
	1290721684-sup-3951@alvh.no-ip.org
	1294953201-sup-2099@alvh.no-ip.org
	1320343602-sup-2290@alvh.no-ip.org
	1339690386-sup-8927@alvh.no-ip.org
	4FE5FF020200002500048A3D@gw.wicourts.gov
	4FEAB90A0200002500048B7D@gw.wicourts.gov
parent f925c79b
...@@ -191,7 +191,7 @@ ERROR: cannot change foreign table "agg_csv" ...@@ -191,7 +191,7 @@ ERROR: cannot change foreign table "agg_csv"
DELETE FROM agg_csv WHERE a = 100; DELETE FROM agg_csv WHERE a = 100;
ERROR: cannot change foreign table "agg_csv" ERROR: cannot change foreign table "agg_csv"
SELECT * FROM agg_csv FOR UPDATE OF agg_csv; SELECT * FROM agg_csv FOR UPDATE OF agg_csv;
ERROR: SELECT FOR UPDATE/SHARE cannot be used with foreign table "agg_csv" ERROR: SELECT FOR UPDATE/SHARE/KEY UPDATE/KEY SHARE cannot be used with foreign table "agg_csv"
LINE 1: SELECT * FROM agg_csv FOR UPDATE OF agg_csv; LINE 1: SELECT * FROM agg_csv FOR UPDATE OF agg_csv;
^ ^
-- but this should be ignored -- but this should be ignored
......
...@@ -163,7 +163,7 @@ heap_page_items(PG_FUNCTION_ARGS) ...@@ -163,7 +163,7 @@ heap_page_items(PG_FUNCTION_ARGS)
tuphdr = (HeapTupleHeader) PageGetItem(page, id); tuphdr = (HeapTupleHeader) PageGetItem(page, id);
values[4] = UInt32GetDatum(HeapTupleHeaderGetXmin(tuphdr)); values[4] = UInt32GetDatum(HeapTupleHeaderGetXmin(tuphdr));
values[5] = UInt32GetDatum(HeapTupleHeaderGetXmax(tuphdr)); values[5] = UInt32GetDatum(HeapTupleHeaderGetRawXmax(tuphdr));
values[6] = UInt32GetDatum(HeapTupleHeaderGetRawCommandId(tuphdr)); /* shared with xvac */ values[6] = UInt32GetDatum(HeapTupleHeaderGetRawCommandId(tuphdr)); /* shared with xvac */
values[7] = PointerGetDatum(&tuphdr->t_ctid); values[7] = PointerGetDatum(&tuphdr->t_ctid);
values[8] = UInt32GetDatum(tuphdr->t_infomask2); values[8] = UInt32GetDatum(tuphdr->t_infomask2);
......
...@@ -40,6 +40,9 @@ get_control_data(ClusterInfo *cluster, bool live_check) ...@@ -40,6 +40,9 @@ get_control_data(ClusterInfo *cluster, bool live_check)
bool got_xid = false; bool got_xid = false;
bool got_oid = false; bool got_oid = false;
bool got_nextxlogfile = false; bool got_nextxlogfile = false;
bool got_multi = false;
bool got_mxoff = false;
bool got_oldestmulti = false;
bool got_log_id = false; bool got_log_id = false;
bool got_log_seg = false; bool got_log_seg = false;
bool got_tli = false; bool got_tli = false;
...@@ -246,6 +249,39 @@ get_control_data(ClusterInfo *cluster, bool live_check) ...@@ -246,6 +249,39 @@ get_control_data(ClusterInfo *cluster, bool live_check)
cluster->controldata.chkpnt_nxtoid = str2uint(p); cluster->controldata.chkpnt_nxtoid = str2uint(p);
got_oid = true; got_oid = true;
} }
else if ((p = strstr(bufin, "Latest checkpoint's NextMultiXactId:")) != NULL)
{
p = strchr(p, ':');
if (p == NULL || strlen(p) <= 1)
pg_log(PG_FATAL, "%d: controldata retrieval problem\n", __LINE__);
p++; /* removing ':' char */
cluster->controldata.chkpnt_nxtmulti = str2uint(p);
got_multi = true;
}
else if ((p = strstr(bufin, "Latest checkpoint's oldestMultiXid:")) != NULL)
{
p = strchr(p, ':');
if (p == NULL || strlen(p) <= 1)
pg_log(PG_FATAL, "%d: controldata retrieval problem\n", __LINE__);
p++; /* removing ':' char */
cluster->controldata.chkpnt_oldstMulti = str2uint(p);
got_oldestmulti = true;
}
else if ((p = strstr(bufin, "Latest checkpoint's NextMultiOffset:")) != NULL)
{
p = strchr(p, ':');
if (p == NULL || strlen(p) <= 1)
pg_log(PG_FATAL, "%d: controldata retrieval problem\n", __LINE__);
p++; /* removing ':' char */
cluster->controldata.chkpnt_nxtmxoff = str2uint(p);
got_mxoff = true;
}
else if ((p = strstr(bufin, "Maximum data alignment:")) != NULL) else if ((p = strstr(bufin, "Maximum data alignment:")) != NULL)
{ {
p = strchr(p, ':'); p = strchr(p, ':');
...@@ -433,6 +469,7 @@ get_control_data(ClusterInfo *cluster, bool live_check) ...@@ -433,6 +469,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
/* verify that we got all the mandatory pg_control data */ /* verify that we got all the mandatory pg_control data */
if (!got_xid || !got_oid || if (!got_xid || !got_oid ||
!got_multi || !got_mxoff || !got_oldestmulti ||
(!live_check && !got_nextxlogfile) || (!live_check && !got_nextxlogfile) ||
!got_tli || !got_tli ||
!got_align || !got_blocksz || !got_largesz || !got_walsz || !got_align || !got_blocksz || !got_largesz || !got_walsz ||
...@@ -448,6 +485,15 @@ get_control_data(ClusterInfo *cluster, bool live_check) ...@@ -448,6 +485,15 @@ get_control_data(ClusterInfo *cluster, bool live_check)
if (!got_oid) if (!got_oid)
pg_log(PG_REPORT, " latest checkpoint next OID\n"); pg_log(PG_REPORT, " latest checkpoint next OID\n");
if (!got_multi)
pg_log(PG_REPORT, " latest checkpoint next MultiXactId\n");
if (!got_mxoff)
pg_log(PG_REPORT, " latest checkpoint next MultiXactOffset\n");
if (!got_oldestmulti)
pg_log(PG_REPORT, " latest checkpoint oldest MultiXactId\n");
if (!live_check && !got_nextxlogfile) if (!live_check && !got_nextxlogfile)
pg_log(PG_REPORT, " first WAL segment after reset\n"); pg_log(PG_REPORT, " first WAL segment after reset\n");
......
...@@ -382,6 +382,52 @@ copy_clog_xlog_xid(void) ...@@ -382,6 +382,52 @@ copy_clog_xlog_xid(void)
new_cluster.pgdata); new_cluster.pgdata);
check_ok(); check_ok();
/*
* If both new and old are after the pg_multixact change commit, copy those
* files too. If the old server is before that change and the new server
* is after, then we don't copy anything but we need to reset pg_control so
* that the new server doesn't attempt to read multis older than the cutoff
* value.
*/
if (old_cluster.controldata.cat_ver >= MULTIXACT_FORMATCHANGE_CAT_VER &&
new_cluster.controldata.cat_ver >= MULTIXACT_FORMATCHANGE_CAT_VER)
{
copy_subdir_files("pg_multixact/offsets");
copy_subdir_files("pg_multixact/members");
prep_status("Setting next multixact ID and offset for new cluster");
/*
* we preserve all files and contents, so we must preserve both "next"
* counters here and the oldest multi present on system.
*/
exec_prog(UTILITY_LOG_FILE, NULL, true,
"\"%s/pg_resetxlog\" -O %u -m %u,%u \"%s\"",
new_cluster.bindir,
old_cluster.controldata.chkpnt_nxtmxoff,
old_cluster.controldata.chkpnt_nxtmulti,
old_cluster.controldata.chkpnt_oldstMulti,
new_cluster.pgdata);
check_ok();
}
else if (new_cluster.controldata.cat_ver >= MULTIXACT_FORMATCHANGE_CAT_VER)
{
prep_status("Setting oldest multixact ID on new cluster");
/*
* We don't preserve files in this case, but it's important that the
* oldest multi is set to the latest value used by the old system, so
* that multixact.c returns the empty set for multis that might be
* present on disk. We set next multi to the value following that; it
* might end up wrapped around (i.e. 0) if the old cluster had
* next=MaxMultiXactId, but multixact.c can cope with that just fine.
*/
exec_prog(UTILITY_LOG_FILE, NULL, true,
"\"%s/pg_resetxlog\" -m %u,%u \"%s\"",
new_cluster.bindir,
old_cluster.controldata.chkpnt_nxtmulti + 1,
old_cluster.controldata.chkpnt_nxtmulti,
new_cluster.pgdata);
check_ok();
}
/* now reset the wal archives in the new cluster */ /* now reset the wal archives in the new cluster */
prep_status("Resetting WAL archives"); prep_status("Resetting WAL archives");
exec_prog(UTILITY_LOG_FILE, NULL, true, exec_prog(UTILITY_LOG_FILE, NULL, true,
......
...@@ -108,6 +108,10 @@ extern char *output_files[]; ...@@ -108,6 +108,10 @@ extern char *output_files[];
*/ */
#define VISIBILITY_MAP_CRASHSAFE_CAT_VER 201107031 #define VISIBILITY_MAP_CRASHSAFE_CAT_VER 201107031
/*
* pg_multixact format changed in this catversion:
*/
#define MULTIXACT_FORMATCHANGE_CAT_VER 201301231
/* /*
* Each relation is represented by a relinfo structure. * Each relation is represented by a relinfo structure.
...@@ -182,6 +186,9 @@ typedef struct ...@@ -182,6 +186,9 @@ typedef struct
uint32 chkpnt_tli; uint32 chkpnt_tli;
uint32 chkpnt_nxtxid; uint32 chkpnt_nxtxid;
uint32 chkpnt_nxtoid; uint32 chkpnt_nxtoid;
uint32 chkpnt_nxtmulti;
uint32 chkpnt_nxtmxoff;
uint32 chkpnt_oldstMulti;
uint32 align; uint32 align;
uint32 blocksz; uint32 blocksz;
uint32 largesz; uint32 largesz;
......
...@@ -4,7 +4,7 @@ MODULE_big = pgrowlocks ...@@ -4,7 +4,7 @@ MODULE_big = pgrowlocks
OBJS = pgrowlocks.o OBJS = pgrowlocks.o
EXTENSION = pgrowlocks EXTENSION = pgrowlocks
DATA = pgrowlocks--1.0.sql pgrowlocks--unpackaged--1.0.sql DATA = pgrowlocks--1.1.sql pgrowlocks--1.0--1.1.sql pgrowlocks--unpackaged--1.0.sql
ifdef USE_PGXS ifdef USE_PGXS
PG_CONFIG = pg_config PG_CONFIG = pg_config
......
/* contrib/pgrowlocks/pgrowlocks--1.0--1.1.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pgrowlocks" to load this file. \quit
ALTER EXTENSION pgrowlocks DROP FUNCTION pgrowlocks(text);
DROP FUNCTION pgrowlocks(text);
CREATE FUNCTION pgrowlocks(IN relname text,
OUT locked_row TID, -- row TID
OUT locker XID, -- locking XID
OUT multi bool, -- multi XID?
OUT xids xid[], -- multi XIDs
OUT modes text[], -- multi XID statuses
OUT pids INTEGER[]) -- locker's process id
RETURNS SETOF record
AS 'MODULE_PATHNAME', 'pgrowlocks'
LANGUAGE C STRICT;
/* contrib/pgrowlocks/pgrowlocks--1.0.sql */ /* contrib/pgrowlocks/pgrowlocks--1.1.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION -- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pgrowlocks" to load this file. \quit \echo Use "CREATE EXTENSION pgrowlocks" to load this file. \quit
CREATE FUNCTION pgrowlocks(IN relname text, CREATE FUNCTION pgrowlocks(IN relname text,
OUT locked_row TID, -- row TID OUT locked_row TID, -- row TID
OUT lock_type TEXT, -- lock type
OUT locker XID, -- locking XID OUT locker XID, -- locking XID
OUT multi bool, -- multi XID? OUT multi bool, -- multi XID?
OUT xids xid[], -- multi XIDs OUT xids xid[], -- multi XIDs
OUT modes text[], -- multi XID statuses
OUT pids INTEGER[]) -- locker's process id OUT pids INTEGER[]) -- locker's process id
RETURNS SETOF record RETURNS SETOF record
AS 'MODULE_PATHNAME', 'pgrowlocks' AS 'MODULE_PATHNAME', 'pgrowlocks'
......
...@@ -59,6 +59,13 @@ typedef struct ...@@ -59,6 +59,13 @@ typedef struct
int ncolumns; int ncolumns;
} MyData; } MyData;
#define Atnum_tid 0
#define Atnum_xmax 1
#define Atnum_ismulti 2
#define Atnum_xids 3
#define Atnum_modes 4
#define Atnum_pids 5
Datum Datum
pgrowlocks(PG_FUNCTION_ARGS) pgrowlocks(PG_FUNCTION_ARGS)
{ {
...@@ -117,79 +124,146 @@ pgrowlocks(PG_FUNCTION_ARGS) ...@@ -117,79 +124,146 @@ pgrowlocks(PG_FUNCTION_ARGS)
/* scan the relation */ /* scan the relation */
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{ {
HTSU_Result htsu;
TransactionId xmax;
uint16 infomask;
/* must hold a buffer lock to call HeapTupleSatisfiesUpdate */ /* must hold a buffer lock to call HeapTupleSatisfiesUpdate */
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
if (HeapTupleSatisfiesUpdate(tuple->t_data, htsu = HeapTupleSatisfiesUpdate(tuple->t_data,
GetCurrentCommandId(false), GetCurrentCommandId(false),
scan->rs_cbuf) == HeapTupleBeingUpdated) scan->rs_cbuf);
xmax = HeapTupleHeaderGetRawXmax(tuple->t_data);
infomask = tuple->t_data->t_infomask;
/*
* a tuple is locked if HTSU returns BeingUpdated, and if it returns
* MayBeUpdated but the Xmax is valid and pointing at us.
*/
if (htsu == HeapTupleBeingUpdated ||
(htsu == HeapTupleMayBeUpdated &&
!(infomask & HEAP_XMAX_INVALID) &&
!(infomask & HEAP_XMAX_IS_MULTI) &&
(xmax == GetCurrentTransactionIdIfAny())))
{ {
char **values; char **values;
int i;
values = (char **) palloc(mydata->ncolumns * sizeof(char *)); values = (char **) palloc(mydata->ncolumns * sizeof(char *));
i = 0; values[Atnum_tid] = (char *) DirectFunctionCall1(tidout,
values[i++] = (char *) DirectFunctionCall1(tidout, PointerGetDatum(&tuple->t_self)); PointerGetDatum(&tuple->t_self));
if (tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK) values[Atnum_xmax] = palloc(NCHARS * sizeof(char));
values[i++] = pstrdup("Shared"); snprintf(values[Atnum_xmax], NCHARS, "%d", xmax);
else if (infomask & HEAP_XMAX_IS_MULTI)
values[i++] = pstrdup("Exclusive");
values[i] = palloc(NCHARS * sizeof(char));
snprintf(values[i++], NCHARS, "%d", HeapTupleHeaderGetXmax(tuple->t_data));
if (tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI)
{ {
TransactionId *xids; MultiXactMember *members;
int nxids; int nmembers;
int j; bool first = true;
int isValidXid = 0; /* any valid xid ever exists? */ bool allow_old;
values[i++] = pstrdup("true"); values[Atnum_ismulti] = pstrdup("true");
nxids = GetMultiXactIdMembers(HeapTupleHeaderGetXmax(tuple->t_data), &xids);
if (nxids == -1) allow_old = !(infomask & HEAP_LOCK_MASK) &&
(infomask & HEAP_XMAX_LOCK_ONLY);
nmembers = GetMultiXactIdMembers(xmax, &members, allow_old);
if (nmembers == -1)
{ {
elog(ERROR, "GetMultiXactIdMembers returns error"); values[Atnum_xids] = "{0}";
values[Atnum_modes] = "{transient upgrade status}";
values[Atnum_pids] = "{0}";
} }
else
{
int j;
values[i] = palloc(NCHARS * nxids); values[Atnum_xids] = palloc(NCHARS * nmembers);
values[i + 1] = palloc(NCHARS * nxids); values[Atnum_modes] = palloc(NCHARS * nmembers);
strcpy(values[i], "{"); values[Atnum_pids] = palloc(NCHARS * nmembers);
strcpy(values[i + 1], "{");
for (j = 0; j < nxids; j++) strcpy(values[Atnum_xids], "{");
{ strcpy(values[Atnum_modes], "{");
char buf[NCHARS]; strcpy(values[Atnum_pids], "{");
if (TransactionIdIsInProgress(xids[j])) for (j = 0; j < nmembers; j++)
{ {
if (isValidXid) char buf[NCHARS];
if (!first)
{ {
strcat(values[i], ","); strcat(values[Atnum_xids], ",");
strcat(values[i + 1], ","); strcat(values[Atnum_modes], ",");
strcat(values[Atnum_pids], ",");
} }
snprintf(buf, NCHARS, "%d", xids[j]); snprintf(buf, NCHARS, "%d", members[j].xid);
strcat(values[i], buf); strcat(values[Atnum_xids], buf);
snprintf(buf, NCHARS, "%d", BackendXidGetPid(xids[j])); switch (members[j].status)
strcat(values[i + 1], buf); {
case MultiXactStatusUpdate:
snprintf(buf, NCHARS, "Update");
break;
case MultiXactStatusNoKeyUpdate:
snprintf(buf, NCHARS, "No Key Update");
break;
case MultiXactStatusForUpdate:
snprintf(buf, NCHARS, "For Update");
break;
case MultiXactStatusForNoKeyUpdate:
snprintf(buf, NCHARS, "For No Key Update");
break;
case MultiXactStatusForShare:
snprintf(buf, NCHARS, "Share");
break;
case MultiXactStatusForKeyShare:
snprintf(buf, NCHARS, "Key Share");
break;
}
strcat(values[Atnum_modes], buf);
snprintf(buf, NCHARS, "%d",
BackendXidGetPid(members[j].xid));
strcat(values[Atnum_pids], buf);
isValidXid = 1; first = false;
} }
}
strcat(values[i], "}"); strcat(values[Atnum_xids], "}");
strcat(values[i + 1], "}"); strcat(values[Atnum_modes], "}");
i++; strcat(values[Atnum_pids], "}");
}
} }
else else
{ {
values[i++] = pstrdup("false"); values[Atnum_ismulti] = pstrdup("false");
values[i] = palloc(NCHARS * sizeof(char));
snprintf(values[i++], NCHARS, "{%d}", HeapTupleHeaderGetXmax(tuple->t_data)); values[Atnum_xids] = palloc(NCHARS * sizeof(char));
snprintf(values[Atnum_xids], NCHARS, "{%d}", xmax);
values[Atnum_modes] = palloc(NCHARS);
if (infomask & HEAP_XMAX_LOCK_ONLY)
{
if (HEAP_XMAX_IS_SHR_LOCKED(infomask))
snprintf(values[Atnum_modes], NCHARS, "{For Share}");
else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))
snprintf(values[Atnum_modes], NCHARS, "{For Key Share}");
else if (HEAP_XMAX_IS_EXCL_LOCKED(infomask))
snprintf(values[Atnum_modes], NCHARS, "{For Update}");
else
/* neither keyshare nor exclusive bit it set */
snprintf(values[Atnum_modes], NCHARS,
"{transient upgrade status}");
}
else
{
if (tuple->t_data->t_infomask2 & HEAP_KEYS_UPDATED)
snprintf(values[Atnum_modes], NCHARS, "{Key Update}");
else
snprintf(values[Atnum_modes], NCHARS, "{Update}");
}
values[i] = palloc(NCHARS * sizeof(char)); values[Atnum_pids] = palloc(NCHARS * sizeof(char));
snprintf(values[i++], NCHARS, "{%d}", BackendXidGetPid(HeapTupleHeaderGetXmax(tuple->t_data))); snprintf(values[Atnum_pids], NCHARS, "{%d}",
BackendXidGetPid(xmax));
} }
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
...@@ -200,10 +274,10 @@ pgrowlocks(PG_FUNCTION_ARGS) ...@@ -200,10 +274,10 @@ pgrowlocks(PG_FUNCTION_ARGS)
/* make the tuple into a datum */ /* make the tuple into a datum */
result = HeapTupleGetDatum(tuple); result = HeapTupleGetDatum(tuple);
/* Clean up */ /*
for (i = 0; i < mydata->ncolumns; i++) * no need to pfree what we allocated; it's on a short-lived memory
pfree(values[i]); * context anyway
pfree(values); */
SRF_RETURN_NEXT(funcctx, result); SRF_RETURN_NEXT(funcctx, result);
} }
......
# pgrowlocks extension # pgrowlocks extension
comment = 'show row-level locking information' comment = 'show row-level locking information'
default_version = '1.0' default_version = '1.1'
module_pathname = '$libdir/pgrowlocks' module_pathname = '$libdir/pgrowlocks'
relocatable = true relocatable = true
...@@ -43,12 +43,6 @@ pgrowlocks(text) returns setof record ...@@ -43,12 +43,6 @@ pgrowlocks(text) returns setof record
<entry><type>tid</type></entry> <entry><type>tid</type></entry>
<entry>Tuple ID (TID) of locked row</entry> <entry>Tuple ID (TID) of locked row</entry>
</row> </row>
<row>
<entry><structfield>lock_type</structfield></entry>
<entry><type>text</type></entry>
<entry><literal>Shared</> for shared lock, or
<literal>Exclusive</> for exclusive lock</entry>
</row>
<row> <row>
<entry><structfield>locker</structfield></entry> <entry><structfield>locker</structfield></entry>
<entry><type>xid</type></entry> <entry><type>xid</type></entry>
...@@ -64,6 +58,15 @@ pgrowlocks(text) returns setof record ...@@ -64,6 +58,15 @@ pgrowlocks(text) returns setof record
<entry><type>xid[]</type></entry> <entry><type>xid[]</type></entry>
<entry>Transaction IDs of lockers (more than one if multitransaction)</entry> <entry>Transaction IDs of lockers (more than one if multitransaction)</entry>
</row> </row>
<row>
<entry><structfield>lock_type</structfield></entry>
<entry><type>text[]</type></entry>
<entry>Lock mode of lockers (more than one if multitransaction),
an array of <literal>Key Share</>, <literal>Share</>,
<literal>For No Key Update</>, <literal>No Key Update</>,
<literal>For Update</>, <literal>Update</>.</entry>
</row>
<row> <row>
<entry><structfield>pids</structfield></entry> <entry><structfield>pids</structfield></entry>
<entry><type>integer[]</type></entry> <entry><type>integer[]</type></entry>
......
This diff is collapsed.
...@@ -542,7 +542,7 @@ heap_getsysattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull) ...@@ -542,7 +542,7 @@ heap_getsysattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
result = TransactionIdGetDatum(HeapTupleHeaderGetXmin(tup->t_data)); result = TransactionIdGetDatum(HeapTupleHeaderGetXmin(tup->t_data));
break; break;
case MaxTransactionIdAttributeNumber: case MaxTransactionIdAttributeNumber:
result = TransactionIdGetDatum(HeapTupleHeaderGetXmax(tup->t_data)); result = TransactionIdGetDatum(HeapTupleHeaderGetRawXmax(tup->t_data));
break; break;
case MinCommandIdAttributeNumber: case MinCommandIdAttributeNumber:
case MaxCommandIdAttributeNumber: case MaxCommandIdAttributeNumber:
......
Locking tuples
--------------
Locking tuples is not as easy as locking tables or other database objects.
The problem is that transactions might want to lock large numbers of tuples at
any one time, so it's not possible to keep the locks objects in shared memory.
To work around this limitation, we use a two-level mechanism. The first level
is implemented by storing locking information in the tuple header: a tuple is
marked as locked by setting the current transaction's XID as its XMAX, and
setting additional infomask bits to distinguish this case from the more normal
case of having deleted the tuple. When multiple transactions concurrently
lock a tuple, a MultiXact is used; see below. This mechanism can accomodate
arbitrarily large numbers of tuples being locked simultaneously.
When it is necessary to wait for a tuple-level lock to be released, the basic
delay is provided by XactLockTableWait or MultiXactIdWait on the contents of
the tuple's XMAX. However, that mechanism will release all waiters
concurrently, so there would be a race condition as to which waiter gets the
tuple, potentially leading to indefinite starvation of some waiters. The
possibility of share-locking makes the problem much worse --- a steady stream
of share-lockers can easily block an exclusive locker forever. To provide
more reliable semantics about who gets a tuple-level lock first, we use the
standard lock manager, which implements the second level mentioned above. The
protocol for waiting for a tuple-level lock is really
LockTuple()
XactLockTableWait()
mark tuple as locked by me
UnlockTuple()
When there are multiple waiters, arbitration of who is to get the lock next
is provided by LockTuple(). However, at most one tuple-level lock will
be held or awaited per backend at any time, so we don't risk overflow
of the lock table. Note that incoming share-lockers are required to
do LockTuple as well, if there is any conflict, to ensure that they don't
starve out waiting exclusive-lockers. However, if there is not any active
conflict for a tuple, we don't incur any extra overhead.
We provide four levels of tuple locking strength: SELECT FOR KEY UPDATE is
super-exclusive locking (used to delete tuples and more generally to update
tuples modifying the values of the columns that make up the key of the tuple);
SELECT FOR UPDATE is a standards-compliant exclusive lock; SELECT FOR SHARE
implements shared locks; and finally SELECT FOR KEY SHARE is a super-weak mode
that does not conflict with exclusive mode, but conflicts with SELECT FOR KEY
UPDATE. This last mode implements a mode just strong enough to implement RI
checks, i.e. it ensures that tuples do not go away from under a check, without
blocking when some other transaction that want to update the tuple without
changing its key.
The conflict table is:
KEY UPDATE UPDATE SHARE KEY SHARE
KEY UPDATE conflict conflict conflict conflict
UPDATE conflict conflict conflict
SHARE conflict conflict
KEY SHARE conflict
When there is a single locker in a tuple, we can just store the locking info
in the tuple itself. We do this by storing the locker's Xid in XMAX, and
setting infomask bits specifying the locking strength. There is one exception
here: since infomask space is limited, we do not provide a separate bit
for SELECT FOR SHARE, so we have to use the extended info in a MultiXact in
that case. (The other cases, SELECT FOR UPDATE and SELECT FOR KEY SHARE, are
presumably more commonly used due to being the standards-mandated locking
mechanism, or heavily used by the RI code, so we want to provide fast paths
for those.)
MultiXacts
----------
A tuple header provides very limited space for storing information about tuple
locking and updates: there is room only for a single Xid and a small number of
infomask bits. Whenever we need to store more than one lock, we replace the
first locker's Xid with a new MultiXactId. Each MultiXact provides extended
locking data; it comprises an array of Xids plus some flags bits for each one.
The flags are currently used to store the locking strength of each member
transaction. (The flags also distinguish a pure locker from an updater.)
In earlier PostgreSQL releases, a MultiXact always meant that the tuple was
locked in shared mode by multiple transactions. This is no longer the case; a
MultiXact may contain an update or delete Xid. (Keep in mind that tuple locks
in a transaction do not conflict with other tuple locks in the same
transaction, so it's possible to have otherwise conflicting locks in a
MultiXact if they belong to the same transaction).
Note that each lock is attributed to the subtransaction that acquires it.
This means that a subtransaction that aborts is seen as though it releases the
locks it acquired; concurrent transactions can then proceed without having to
wait for the main transaction to finish. It also means that a subtransaction
can upgrade to a stronger lock level than an earlier transaction had, and if
the subxact aborts, the earlier, weaker lock is kept.
The possibility of having an update within a MultiXact means that they must
persist across crashes and restarts: a future reader of the tuple needs to
figure out whether the update committed or aborted. So we have a requirement
that pg_multixact needs to retain pages of its data until we're certain that
the MultiXacts in them are no longer of interest.
VACUUM is in charge of removing old MultiXacts at the time of tuple freezing.
This works in the same way that pg_clog segments are removed: we have a
pg_class column that stores the earliest multixact that could possibly be
stored in the table; the minimum of all such values is stored in a pg_database
column. VACUUM computes the minimum across all pg_database values, and
removes pg_multixact segments older than the minimum.
Infomask Bits
-------------
The following infomask bits are applicable:
- HEAP_XMAX_INVALID
Any tuple with this bit set does not have a valid value stored in XMAX.
- HEAP_XMAX_IS_MULTI
This bit is set if the tuple's Xmax is a MultiXactId (as opposed to a
regular TransactionId).
- HEAP_XMAX_LOCK_ONLY
This bit is set when the XMAX is a locker only; that is, if it's a
multixact, it does not contain an update among its members. It's set when
the XMAX is a plain Xid that locked the tuple, as well.
- HEAP_XMAX_KEYSHR_LOCK
- HEAP_XMAX_EXCL_LOCK
These bits indicate the strength of the lock acquired; they are useful when
the XMAX is not a MultiXactId. If it's a multi, the info is to be found in
the member flags. If HEAP_XMAX_IS_MULTI is not set and HEAP_XMAX_LOCK_ONLY
is set, then one of these *must* be set as well.
Note there is no infomask bit for a SELECT FOR SHARE lock. Also there is no
separate bit for a SELECT FOR KEY UPDATE lock; this is implemented by the
HEAP_KEYS_UPDATED bit.
- HEAP_KEYS_UPDATED
This bit lives in t_infomask2. If set, indicates that the XMAX updated
this tuple and changed the key values, or it deleted the tuple.
It's set regardless of whether the XMAX is a TransactionId or a MultiXactId.
We currently never set the HEAP_XMAX_COMMITTED when the HEAP_XMAX_IS_MULTI bit
is set.
This diff is collapsed.
...@@ -463,7 +463,7 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum, ...@@ -463,7 +463,7 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
* that the page is reconsidered for pruning in future. * that the page is reconsidered for pruning in future.
*/ */
heap_prune_record_prunable(prstate, heap_prune_record_prunable(prstate,
HeapTupleHeaderGetXmax(htup)); HeapTupleHeaderGetUpdateXid(htup));
break; break;
case HEAPTUPLE_DELETE_IN_PROGRESS: case HEAPTUPLE_DELETE_IN_PROGRESS:
...@@ -473,7 +473,7 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum, ...@@ -473,7 +473,7 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
* that the page is reconsidered for pruning in future. * that the page is reconsidered for pruning in future.
*/ */
heap_prune_record_prunable(prstate, heap_prune_record_prunable(prstate,
HeapTupleHeaderGetXmax(htup)); HeapTupleHeaderGetUpdateXid(htup));
break; break;
case HEAPTUPLE_LIVE: case HEAPTUPLE_LIVE:
...@@ -521,7 +521,7 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum, ...@@ -521,7 +521,7 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
Assert(ItemPointerGetBlockNumber(&htup->t_ctid) == Assert(ItemPointerGetBlockNumber(&htup->t_ctid) ==
BufferGetBlockNumber(buffer)); BufferGetBlockNumber(buffer));
offnum = ItemPointerGetOffsetNumber(&htup->t_ctid); offnum = ItemPointerGetOffsetNumber(&htup->t_ctid);
priorXmax = HeapTupleHeaderGetXmax(htup); priorXmax = HeapTupleHeaderGetUpdateXid(htup);
} }
/* /*
...@@ -746,7 +746,7 @@ heap_get_root_tuples(Page page, OffsetNumber *root_offsets) ...@@ -746,7 +746,7 @@ heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
/* Set up to scan the HOT-chain */ /* Set up to scan the HOT-chain */
nextoffnum = ItemPointerGetOffsetNumber(&htup->t_ctid); nextoffnum = ItemPointerGetOffsetNumber(&htup->t_ctid);
priorXmax = HeapTupleHeaderGetXmax(htup); priorXmax = HeapTupleHeaderGetUpdateXid(htup);
} }
else else
{ {
...@@ -787,7 +787,7 @@ heap_get_root_tuples(Page page, OffsetNumber *root_offsets) ...@@ -787,7 +787,7 @@ heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
break; break;
nextoffnum = ItemPointerGetOffsetNumber(&htup->t_ctid); nextoffnum = ItemPointerGetOffsetNumber(&htup->t_ctid);
priorXmax = HeapTupleHeaderGetXmax(htup); priorXmax = HeapTupleHeaderGetUpdateXid(htup);
} }
} }
} }
...@@ -111,6 +111,7 @@ ...@@ -111,6 +111,7 @@
#include "storage/smgr.h" #include "storage/smgr.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/tqual.h"
/* /*
...@@ -128,6 +129,8 @@ typedef struct RewriteStateData ...@@ -128,6 +129,8 @@ typedef struct RewriteStateData
* determine tuple visibility */ * determine tuple visibility */
TransactionId rs_freeze_xid;/* Xid that will be used as freeze cutoff TransactionId rs_freeze_xid;/* Xid that will be used as freeze cutoff
* point */ * point */
MultiXactId rs_freeze_multi;/* MultiXactId that will be used as freeze
* cutoff point for multixacts */
MemoryContext rs_cxt; /* for hash tables and entries and tuples in MemoryContext rs_cxt; /* for hash tables and entries and tuples in
* them */ * them */
HTAB *rs_unresolved_tups; /* unmatched A tuples */ HTAB *rs_unresolved_tups; /* unmatched A tuples */
...@@ -177,6 +180,7 @@ static void raw_heap_insert(RewriteState state, HeapTuple tup); ...@@ -177,6 +180,7 @@ static void raw_heap_insert(RewriteState state, HeapTuple tup);
* new_heap new, locked heap relation to insert tuples to * new_heap new, locked heap relation to insert tuples to
* oldest_xmin xid used by the caller to determine which tuples are dead * oldest_xmin xid used by the caller to determine which tuples are dead
* freeze_xid xid before which tuples will be frozen * freeze_xid xid before which tuples will be frozen
* freeze_multi multixact before which multis will be frozen
* use_wal should the inserts to the new heap be WAL-logged? * use_wal should the inserts to the new heap be WAL-logged?
* *
* Returns an opaque RewriteState, allocated in current memory context, * Returns an opaque RewriteState, allocated in current memory context,
...@@ -184,7 +188,8 @@ static void raw_heap_insert(RewriteState state, HeapTuple tup); ...@@ -184,7 +188,8 @@ static void raw_heap_insert(RewriteState state, HeapTuple tup);
*/ */
RewriteState RewriteState
begin_heap_rewrite(Relation new_heap, TransactionId oldest_xmin, begin_heap_rewrite(Relation new_heap, TransactionId oldest_xmin,
TransactionId freeze_xid, bool use_wal) TransactionId freeze_xid, MultiXactId freeze_multi,
bool use_wal)
{ {
RewriteState state; RewriteState state;
MemoryContext rw_cxt; MemoryContext rw_cxt;
...@@ -213,6 +218,7 @@ begin_heap_rewrite(Relation new_heap, TransactionId oldest_xmin, ...@@ -213,6 +218,7 @@ begin_heap_rewrite(Relation new_heap, TransactionId oldest_xmin,
state->rs_use_wal = use_wal; state->rs_use_wal = use_wal;
state->rs_oldest_xmin = oldest_xmin; state->rs_oldest_xmin = oldest_xmin;
state->rs_freeze_xid = freeze_xid; state->rs_freeze_xid = freeze_xid;
state->rs_freeze_multi = freeze_multi;
state->rs_cxt = rw_cxt; state->rs_cxt = rw_cxt;
/* Initialize hash tables used to track update chains */ /* Initialize hash tables used to track update chains */
...@@ -337,7 +343,8 @@ rewrite_heap_tuple(RewriteState state, ...@@ -337,7 +343,8 @@ rewrite_heap_tuple(RewriteState state,
* While we have our hands on the tuple, we may as well freeze any * While we have our hands on the tuple, we may as well freeze any
* very-old xmin or xmax, so that future VACUUM effort can be saved. * very-old xmin or xmax, so that future VACUUM effort can be saved.
*/ */
heap_freeze_tuple(new_tuple->t_data, state->rs_freeze_xid); heap_freeze_tuple(new_tuple->t_data, state->rs_freeze_xid,
state->rs_freeze_multi);
/* /*
* Invalid ctid means that ctid should point to the tuple itself. We'll * Invalid ctid means that ctid should point to the tuple itself. We'll
...@@ -348,15 +355,15 @@ rewrite_heap_tuple(RewriteState state, ...@@ -348,15 +355,15 @@ rewrite_heap_tuple(RewriteState state,
/* /*
* If the tuple has been updated, check the old-to-new mapping hash table. * If the tuple has been updated, check the old-to-new mapping hash table.
*/ */
if (!(old_tuple->t_data->t_infomask & (HEAP_XMAX_INVALID | if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
HEAP_IS_LOCKED)) && HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
!(ItemPointerEquals(&(old_tuple->t_self), !(ItemPointerEquals(&(old_tuple->t_self),
&(old_tuple->t_data->t_ctid)))) &(old_tuple->t_data->t_ctid))))
{ {
OldToNewMapping mapping; OldToNewMapping mapping;
memset(&hashkey, 0, sizeof(hashkey)); memset(&hashkey, 0, sizeof(hashkey));
hashkey.xmin = HeapTupleHeaderGetXmax(old_tuple->t_data); hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
hashkey.tid = old_tuple->t_data->t_ctid; hashkey.tid = old_tuple->t_data->t_ctid;
mapping = (OldToNewMapping) mapping = (OldToNewMapping)
......
...@@ -25,6 +25,21 @@ out_target(StringInfo buf, xl_heaptid *target) ...@@ -25,6 +25,21 @@ out_target(StringInfo buf, xl_heaptid *target)
ItemPointerGetOffsetNumber(&(target->tid))); ItemPointerGetOffsetNumber(&(target->tid)));
} }
static void
out_infobits(StringInfo buf, uint8 infobits)
{
if (infobits & XLHL_XMAX_IS_MULTI)
appendStringInfo(buf, "IS_MULTI ");
if (infobits & XLHL_XMAX_LOCK_ONLY)
appendStringInfo(buf, "LOCK_ONLY ");
if (infobits & XLHL_XMAX_EXCL_LOCK)
appendStringInfo(buf, "EXCL_LOCK ");
if (infobits & XLHL_XMAX_KEYSHR_LOCK)
appendStringInfo(buf, "KEYSHR_LOCK ");
if (infobits & XLHL_KEYS_UPDATED)
appendStringInfo(buf, "KEYS_UPDATED ");
}
void void
heap_desc(StringInfo buf, uint8 xl_info, char *rec) heap_desc(StringInfo buf, uint8 xl_info, char *rec)
{ {
...@@ -47,6 +62,8 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -47,6 +62,8 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec)
appendStringInfo(buf, "delete: "); appendStringInfo(buf, "delete: ");
out_target(buf, &(xlrec->target)); out_target(buf, &(xlrec->target));
appendStringInfoChar(buf, ' ');
out_infobits(buf, xlrec->infobits_set);
} }
else if (info == XLOG_HEAP_UPDATE) else if (info == XLOG_HEAP_UPDATE)
{ {
...@@ -57,9 +74,12 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -57,9 +74,12 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec)
else else
appendStringInfo(buf, "update: "); appendStringInfo(buf, "update: ");
out_target(buf, &(xlrec->target)); out_target(buf, &(xlrec->target));
appendStringInfo(buf, "; new %u/%u", appendStringInfo(buf, " xmax %u ", xlrec->old_xmax);
out_infobits(buf, xlrec->old_infobits_set);
appendStringInfo(buf, "; new tid %u/%u xmax %u",
ItemPointerGetBlockNumber(&(xlrec->newtid)), ItemPointerGetBlockNumber(&(xlrec->newtid)),
ItemPointerGetOffsetNumber(&(xlrec->newtid))); ItemPointerGetOffsetNumber(&(xlrec->newtid)),
xlrec->new_xmax);
} }
else if (info == XLOG_HEAP_HOT_UPDATE) else if (info == XLOG_HEAP_HOT_UPDATE)
{ {
...@@ -70,9 +90,12 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -70,9 +90,12 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec)
else else
appendStringInfo(buf, "hot_update: "); appendStringInfo(buf, "hot_update: ");
out_target(buf, &(xlrec->target)); out_target(buf, &(xlrec->target));
appendStringInfo(buf, "; new %u/%u", appendStringInfo(buf, " xmax %u ", xlrec->old_xmax);
out_infobits(buf, xlrec->old_infobits_set);
appendStringInfo(buf, "; new tid %u/%u xmax %u",
ItemPointerGetBlockNumber(&(xlrec->newtid)), ItemPointerGetBlockNumber(&(xlrec->newtid)),
ItemPointerGetOffsetNumber(&(xlrec->newtid))); ItemPointerGetOffsetNumber(&(xlrec->newtid)),
xlrec->new_xmax);
} }
else if (info == XLOG_HEAP_NEWPAGE) else if (info == XLOG_HEAP_NEWPAGE)
{ {
...@@ -87,16 +110,10 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -87,16 +110,10 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec)
{ {
xl_heap_lock *xlrec = (xl_heap_lock *) rec; xl_heap_lock *xlrec = (xl_heap_lock *) rec;
if (xlrec->shared_lock) appendStringInfo(buf, "lock %u: ", xlrec->locking_xid);
appendStringInfo(buf, "shared_lock: ");
else
appendStringInfo(buf, "exclusive_lock: ");
if (xlrec->xid_is_mxact)
appendStringInfo(buf, "mxid ");
else
appendStringInfo(buf, "xid ");
appendStringInfo(buf, "%u ", xlrec->locking_xid);
out_target(buf, &(xlrec->target)); out_target(buf, &(xlrec->target));
appendStringInfoChar(buf, ' ');
out_infobits(buf, xlrec->infobits_set);
} }
else if (info == XLOG_HEAP_INPLACE) else if (info == XLOG_HEAP_INPLACE)
{ {
...@@ -108,7 +125,6 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -108,7 +125,6 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec)
else else
appendStringInfo(buf, "UNKNOWN"); appendStringInfo(buf, "UNKNOWN");
} }
void void
heap2_desc(StringInfo buf, uint8 xl_info, char *rec) heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
{ {
...@@ -119,10 +135,10 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -119,10 +135,10 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
{ {
xl_heap_freeze *xlrec = (xl_heap_freeze *) rec; xl_heap_freeze *xlrec = (xl_heap_freeze *) rec;
appendStringInfo(buf, "freeze: rel %u/%u/%u; blk %u; cutoff %u", appendStringInfo(buf, "freeze: rel %u/%u/%u; blk %u; cutoff xid %u multi %u",
xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.spcNode, xlrec->node.dbNode,
xlrec->node.relNode, xlrec->block, xlrec->node.relNode, xlrec->block,
xlrec->cutoff_xid); xlrec->cutoff_xid, xlrec->cutoff_multi);
} }
else if (info == XLOG_HEAP2_CLEAN) else if (info == XLOG_HEAP2_CLEAN)
{ {
...@@ -160,6 +176,14 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -160,6 +176,14 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode, xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
xlrec->blkno, xlrec->ntuples); xlrec->blkno, xlrec->ntuples);
} }
else if (info == XLOG_HEAP2_LOCK_UPDATED)
{
xl_heap_lock_updated *xlrec = (xl_heap_lock_updated *) rec;
appendStringInfo(buf, "lock updated: xmax %u msk %04x; ", xlrec->xmax,
xlrec->infobits_set);
out_target(buf, &(xlrec->target));
}
else else
appendStringInfo(buf, "UNKNOWN"); appendStringInfo(buf, "UNKNOWN");
} }
...@@ -16,6 +16,35 @@ ...@@ -16,6 +16,35 @@
#include "access/multixact.h" #include "access/multixact.h"
static void
out_member(StringInfo buf, MultiXactMember *member)
{
appendStringInfo(buf, "%u ", member->xid);
switch (member->status)
{
case MultiXactStatusForKeyShare:
appendStringInfoString(buf, "(keysh) ");
break;
case MultiXactStatusForShare:
appendStringInfoString(buf, "(sh) ");
break;
case MultiXactStatusForNoKeyUpdate:
appendStringInfoString(buf, "(fornokeyupd) ");
break;
case MultiXactStatusForUpdate:
appendStringInfoString(buf, "(forupd) ");
break;
case MultiXactStatusNoKeyUpdate:
appendStringInfoString(buf, "(nokeyupd) ");
break;
case MultiXactStatusUpdate:
appendStringInfoString(buf, "(upd) ");
break;
default:
appendStringInfoString(buf, "(unk) ");
break;
}
}
void void
multixact_desc(StringInfo buf, uint8 xl_info, char *rec) multixact_desc(StringInfo buf, uint8 xl_info, char *rec)
...@@ -41,10 +70,10 @@ multixact_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -41,10 +70,10 @@ multixact_desc(StringInfo buf, uint8 xl_info, char *rec)
xl_multixact_create *xlrec = (xl_multixact_create *) rec; xl_multixact_create *xlrec = (xl_multixact_create *) rec;
int i; int i;
appendStringInfo(buf, "create multixact %u offset %u:", appendStringInfo(buf, "create mxid %u offset %u nmembers %d: ", xlrec->mid,
xlrec->mid, xlrec->moff); xlrec->moff, xlrec->nmembers);
for (i = 0; i < xlrec->nxids; i++) for (i = 0; i < xlrec->nmembers; i++)
appendStringInfo(buf, " %u", xlrec->xids[i]); out_member(buf, &xlrec->members[i]);
} }
else else
appendStringInfo(buf, "UNKNOWN"); appendStringInfo(buf, "UNKNOWN");
......
...@@ -41,7 +41,8 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -41,7 +41,8 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
appendStringInfo(buf, "checkpoint: redo %X/%X; " appendStringInfo(buf, "checkpoint: redo %X/%X; "
"tli %u; fpw %s; xid %u/%u; oid %u; multi %u; offset %u; " "tli %u; fpw %s; xid %u/%u; oid %u; multi %u; offset %u; "
"oldest xid %u in DB %u; oldest running xid %u; %s", "oldest xid %u in DB %u; oldest multi %u in DB %u; "
"oldest running xid %u; %s",
(uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo, (uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
checkpoint->ThisTimeLineID, checkpoint->ThisTimeLineID,
checkpoint->fullPageWrites ? "true" : "false", checkpoint->fullPageWrites ? "true" : "false",
...@@ -51,6 +52,8 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -51,6 +52,8 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
checkpoint->nextMultiOffset, checkpoint->nextMultiOffset,
checkpoint->oldestXid, checkpoint->oldestXid,
checkpoint->oldestXidDB, checkpoint->oldestXidDB,
checkpoint->oldestMulti,
checkpoint->oldestMultiDB,
checkpoint->oldestActiveXid, checkpoint->oldestActiveXid,
(info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online"); (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
} }
......
...@@ -791,10 +791,10 @@ parent transaction to complete. ...@@ -791,10 +791,10 @@ parent transaction to complete.
Not all transactional behaviour is emulated, for example we do not insert Not all transactional behaviour is emulated, for example we do not insert
a transaction entry into the lock table, nor do we maintain the transaction a transaction entry into the lock table, nor do we maintain the transaction
stack in memory. Clog entries are made normally. Multitrans is not maintained stack in memory. Clog entries are made normally. Multixact is not maintained
because its purpose is to record tuple level locks that an application has because its purpose is to record tuple level locks that an application has
requested to prevent write locks. Since write locks cannot be obtained at all, requested to prevent other tuple locks. Since tuple locks cannot be obtained at
there is never any conflict and so there is no reason to update multitrans. all, there is never any conflict and so there is no reason to update multixact.
Subtrans is maintained during recovery but the details of the transaction Subtrans is maintained during recovery but the details of the transaction
tree are ignored and all subtransactions reference the top-level TransactionId tree are ignored and all subtransactions reference the top-level TransactionId
directly. Since commit is atomic this provides correct lock wait behaviour directly. Since commit is atomic this provides correct lock wait behaviour
......
This diff is collapsed.
...@@ -75,6 +75,8 @@ GetNewTransactionId(bool isSubXact) ...@@ -75,6 +75,8 @@ GetNewTransactionId(bool isSubXact)
* If we're past xidStopLimit, refuse to execute transactions, unless * If we're past xidStopLimit, refuse to execute transactions, unless
* we are running in a standalone backend (which gives an escape hatch * we are running in a standalone backend (which gives an escape hatch
* to the DBA who somehow got past the earlier defenses). * to the DBA who somehow got past the earlier defenses).
*
* Note that this coding also appears in GetNewMultiXactId.
*---------- *----------
*/ */
if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit)) if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
......
...@@ -3899,6 +3899,8 @@ BootStrapXLOG(void) ...@@ -3899,6 +3899,8 @@ BootStrapXLOG(void)
checkPoint.nextMultiOffset = 0; checkPoint.nextMultiOffset = 0;
checkPoint.oldestXid = FirstNormalTransactionId; checkPoint.oldestXid = FirstNormalTransactionId;
checkPoint.oldestXidDB = TemplateDbOid; checkPoint.oldestXidDB = TemplateDbOid;
checkPoint.oldestMulti = FirstMultiXactId;
checkPoint.oldestMultiDB = TemplateDbOid;
checkPoint.time = (pg_time_t) time(NULL); checkPoint.time = (pg_time_t) time(NULL);
checkPoint.oldestActiveXid = InvalidTransactionId; checkPoint.oldestActiveXid = InvalidTransactionId;
...@@ -3907,6 +3909,7 @@ BootStrapXLOG(void) ...@@ -3907,6 +3909,7 @@ BootStrapXLOG(void)
ShmemVariableCache->oidCount = 0; ShmemVariableCache->oidCount = 0;
MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
/* Set up the XLOG page header */ /* Set up the XLOG page header */
page->xlp_magic = XLOG_PAGE_MAGIC; page->xlp_magic = XLOG_PAGE_MAGIC;
...@@ -4979,6 +4982,9 @@ StartupXLOG(void) ...@@ -4979,6 +4982,9 @@ StartupXLOG(void)
ereport(DEBUG1, ereport(DEBUG1,
(errmsg("oldest unfrozen transaction ID: %u, in database %u", (errmsg("oldest unfrozen transaction ID: %u, in database %u",
checkPoint.oldestXid, checkPoint.oldestXidDB))); checkPoint.oldestXid, checkPoint.oldestXidDB)));
ereport(DEBUG1,
(errmsg("oldest MultiXactId: %u, in database %u",
checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
if (!TransactionIdIsNormal(checkPoint.nextXid)) if (!TransactionIdIsNormal(checkPoint.nextXid))
ereport(PANIC, ereport(PANIC,
(errmsg("invalid next transaction ID"))); (errmsg("invalid next transaction ID")));
...@@ -4989,6 +4995,7 @@ StartupXLOG(void) ...@@ -4989,6 +4995,7 @@ StartupXLOG(void)
ShmemVariableCache->oidCount = 0; ShmemVariableCache->oidCount = 0;
MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch; XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
XLogCtl->ckptXid = checkPoint.nextXid; XLogCtl->ckptXid = checkPoint.nextXid;
...@@ -6724,7 +6731,9 @@ CreateCheckPoint(int flags) ...@@ -6724,7 +6731,9 @@ CreateCheckPoint(int flags)
MultiXactGetCheckptMulti(shutdown, MultiXactGetCheckptMulti(shutdown,
&checkPoint.nextMulti, &checkPoint.nextMulti,
&checkPoint.nextMultiOffset); &checkPoint.nextMultiOffset,
&checkPoint.oldestMulti,
&checkPoint.oldestMultiDB);
/* /*
* Having constructed the checkpoint record, ensure all shmem disk buffers * Having constructed the checkpoint record, ensure all shmem disk buffers
...@@ -7479,6 +7488,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -7479,6 +7488,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
MultiXactSetNextMXact(checkPoint.nextMulti, MultiXactSetNextMXact(checkPoint.nextMulti,
checkPoint.nextMultiOffset); checkPoint.nextMultiOffset);
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
/* /*
* If we see a shutdown checkpoint while waiting for an end-of-backup * If we see a shutdown checkpoint while waiting for an end-of-backup
...@@ -7577,6 +7587,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -7577,6 +7587,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
checkPoint.oldestXid)) checkPoint.oldestXid))
SetTransactionIdLimit(checkPoint.oldestXid, SetTransactionIdLimit(checkPoint.oldestXid,
checkPoint.oldestXidDB); checkPoint.oldestXidDB);
MultiXactAdvanceOldest(checkPoint.oldestMulti,
checkPoint.oldestMultiDB);
/* ControlFile->checkPointCopy always tracks the latest ckpt XID */ /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch; ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "postgres.h" #include "postgres.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/multixact.h"
#include "access/sysattr.h" #include "access/sysattr.h"
#include "access/transam.h" #include "access/transam.h"
#include "access/xact.h" #include "access/xact.h"
...@@ -779,6 +780,7 @@ InsertPgClassTuple(Relation pg_class_desc, ...@@ -779,6 +780,7 @@ InsertPgClassTuple(Relation pg_class_desc,
values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers); values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass); values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid); values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
if (relacl != (Datum) 0) if (relacl != (Datum) 0)
values[Anum_pg_class_relacl - 1] = relacl; values[Anum_pg_class_relacl - 1] = relacl;
else else
...@@ -854,7 +856,7 @@ AddNewRelationTuple(Relation pg_class_desc, ...@@ -854,7 +856,7 @@ AddNewRelationTuple(Relation pg_class_desc,
break; break;
} }
/* Initialize relfrozenxid */ /* Initialize relfrozenxid and relminmxid */
if (relkind == RELKIND_RELATION || if (relkind == RELKIND_RELATION ||
relkind == RELKIND_TOASTVALUE) relkind == RELKIND_TOASTVALUE)
{ {
...@@ -864,6 +866,15 @@ AddNewRelationTuple(Relation pg_class_desc, ...@@ -864,6 +866,15 @@ AddNewRelationTuple(Relation pg_class_desc,
* that will do. * that will do.
*/ */
new_rel_reltup->relfrozenxid = RecentXmin; new_rel_reltup->relfrozenxid = RecentXmin;
/*
* Similarly, initialize the minimum Multixact to the first value that
* could possibly be stored in tuples in the table. Running
* transactions could reuse values from their local cache, so we are
* careful to consider all currently running multis.
*
* XXX this could be refined further, but is it worth the hassle?
*/
new_rel_reltup->relminmxid = GetOldestMultiXactId();
} }
else else
{ {
...@@ -874,6 +885,7 @@ AddNewRelationTuple(Relation pg_class_desc, ...@@ -874,6 +885,7 @@ AddNewRelationTuple(Relation pg_class_desc,
* commands/sequence.c.) * commands/sequence.c.)
*/ */
new_rel_reltup->relfrozenxid = InvalidTransactionId; new_rel_reltup->relfrozenxid = InvalidTransactionId;
new_rel_reltup->relfrozenxid = InvalidMultiXactId;
} }
new_rel_reltup->relowner = relowner; new_rel_reltup->relowner = relowner;
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <unistd.h> #include <unistd.h>
#include "access/multixact.h"
#include "access/relscan.h" #include "access/relscan.h"
#include "access/sysattr.h" #include "access/sysattr.h"
#include "access/transam.h" #include "access/transam.h"
...@@ -2353,8 +2354,7 @@ IndexBuildHeapScan(Relation heapRelation, ...@@ -2353,8 +2354,7 @@ IndexBuildHeapScan(Relation heapRelation,
* As with INSERT_IN_PROGRESS case, this is unexpected * As with INSERT_IN_PROGRESS case, this is unexpected
* unless it's our own deletion or a system catalog. * unless it's our own deletion or a system catalog.
*/ */
Assert(!(heapTuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI)); xwait = HeapTupleHeaderGetUpdateXid(heapTuple->t_data);
xwait = HeapTupleHeaderGetXmax(heapTuple->t_data);
if (!TransactionIdIsCurrentTransactionId(xwait)) if (!TransactionIdIsCurrentTransactionId(xwait))
{ {
if (!is_system_catalog) if (!is_system_catalog)
...@@ -3184,7 +3184,8 @@ reindex_index(Oid indexId, bool skip_constraint_checks) ...@@ -3184,7 +3184,8 @@ reindex_index(Oid indexId, bool skip_constraint_checks)
} }
/* We'll build a new physical relation for the index */ /* We'll build a new physical relation for the index */
RelationSetNewRelfilenode(iRel, InvalidTransactionId); RelationSetNewRelfilenode(iRel, InvalidTransactionId,
InvalidMultiXactId);
/* Initialize the index and rebuild */ /* Initialize the index and rebuild */
/* Note: we do not need to re-establish pkey setting */ /* Note: we do not need to re-establish pkey setting */
...@@ -3364,7 +3365,7 @@ reindex_relation(Oid relid, int flags) ...@@ -3364,7 +3365,7 @@ reindex_relation(Oid relid, int flags)
/* Ensure rd_indexattr is valid; see comments for RelationSetIndexList */ /* Ensure rd_indexattr is valid; see comments for RelationSetIndexList */
if (is_pg_class) if (is_pg_class)
(void) RelationGetIndexAttrBitmap(rel); (void) RelationGetIndexAttrBitmap(rel, false);
PG_TRY(); PG_TRY();
{ {
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include <math.h> #include <math.h>
#include "access/multixact.h"
#include "access/transam.h" #include "access/transam.h"
#include "access/tupconvert.h" #include "access/tupconvert.h"
#include "access/tuptoaster.h" #include "access/tuptoaster.h"
...@@ -580,7 +581,8 @@ do_analyze_rel(Relation onerel, VacuumStmt *vacstmt, ...@@ -580,7 +581,8 @@ do_analyze_rel(Relation onerel, VacuumStmt *vacstmt,
totalrows, totalrows,
visibilitymap_count(onerel), visibilitymap_count(onerel),
hasindex, hasindex,
InvalidTransactionId); InvalidTransactionId,
InvalidMultiXactId);
/* /*
* Same for indexes. Vacuum always scans all indexes, so if we're part of * Same for indexes. Vacuum always scans all indexes, so if we're part of
...@@ -600,7 +602,8 @@ do_analyze_rel(Relation onerel, VacuumStmt *vacstmt, ...@@ -600,7 +602,8 @@ do_analyze_rel(Relation onerel, VacuumStmt *vacstmt,
totalindexrows, totalindexrows,
0, 0,
false, false,
InvalidTransactionId); InvalidTransactionId,
InvalidMultiXactId);
} }
} }
...@@ -1193,7 +1196,7 @@ acquire_sample_rows(Relation onerel, int elevel, ...@@ -1193,7 +1196,7 @@ acquire_sample_rows(Relation onerel, int elevel,
* right. (Note: this works out properly when the row was * right. (Note: this works out properly when the row was
* both inserted and deleted in our xact.) * both inserted and deleted in our xact.)
*/ */
if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmax(targtuple.t_data))) if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(targtuple.t_data)))
deadrows += 1; deadrows += 1;
else else
liverows += 1; liverows += 1;
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "access/multixact.h"
#include "access/relscan.h" #include "access/relscan.h"
#include "access/rewriteheap.h" #include "access/rewriteheap.h"
#include "access/transam.h" #include "access/transam.h"
...@@ -65,7 +66,8 @@ static void rebuild_relation(Relation OldHeap, Oid indexOid, ...@@ -65,7 +66,8 @@ static void rebuild_relation(Relation OldHeap, Oid indexOid,
int freeze_min_age, int freeze_table_age, bool verbose); int freeze_min_age, int freeze_table_age, bool verbose);
static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
int freeze_min_age, int freeze_table_age, bool verbose, int freeze_min_age, int freeze_table_age, bool verbose,
bool *pSwapToastByContent, TransactionId *pFreezeXid); bool *pSwapToastByContent, TransactionId *pFreezeXid,
MultiXactId *pFreezeMulti);
static List *get_tables_to_cluster(MemoryContext cluster_context); static List *get_tables_to_cluster(MemoryContext cluster_context);
static void reform_and_rewrite_tuple(HeapTuple tuple, static void reform_and_rewrite_tuple(HeapTuple tuple,
TupleDesc oldTupDesc, TupleDesc newTupDesc, TupleDesc oldTupDesc, TupleDesc newTupDesc,
...@@ -549,6 +551,7 @@ rebuild_relation(Relation OldHeap, Oid indexOid, ...@@ -549,6 +551,7 @@ rebuild_relation(Relation OldHeap, Oid indexOid,
bool is_system_catalog; bool is_system_catalog;
bool swap_toast_by_content; bool swap_toast_by_content;
TransactionId frozenXid; TransactionId frozenXid;
MultiXactId frozenMulti;
/* Mark the correct index as clustered */ /* Mark the correct index as clustered */
if (OidIsValid(indexOid)) if (OidIsValid(indexOid))
...@@ -566,14 +569,14 @@ rebuild_relation(Relation OldHeap, Oid indexOid, ...@@ -566,14 +569,14 @@ rebuild_relation(Relation OldHeap, Oid indexOid,
/* Copy the heap data into the new table in the desired order */ /* Copy the heap data into the new table in the desired order */
copy_heap_data(OIDNewHeap, tableOid, indexOid, copy_heap_data(OIDNewHeap, tableOid, indexOid,
freeze_min_age, freeze_table_age, verbose, freeze_min_age, freeze_table_age, verbose,
&swap_toast_by_content, &frozenXid); &swap_toast_by_content, &frozenXid, &frozenMulti);
/* /*
* Swap the physical files of the target and transient tables, then * Swap the physical files of the target and transient tables, then
* rebuild the target's indexes and throw away the transient table. * rebuild the target's indexes and throw away the transient table.
*/ */
finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog, finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog,
swap_toast_by_content, false, frozenXid); swap_toast_by_content, false, frozenXid, frozenMulti);
} }
...@@ -706,7 +709,8 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace) ...@@ -706,7 +709,8 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace)
static void static void
copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
int freeze_min_age, int freeze_table_age, bool verbose, int freeze_min_age, int freeze_table_age, bool verbose,
bool *pSwapToastByContent, TransactionId *pFreezeXid) bool *pSwapToastByContent, TransactionId *pFreezeXid,
MultiXactId *pFreezeMulti)
{ {
Relation NewHeap, Relation NewHeap,
OldHeap, OldHeap,
...@@ -722,6 +726,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, ...@@ -722,6 +726,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
bool is_system_catalog; bool is_system_catalog;
TransactionId OldestXmin; TransactionId OldestXmin;
TransactionId FreezeXid; TransactionId FreezeXid;
MultiXactId MultiXactFrzLimit;
RewriteState rwstate; RewriteState rwstate;
bool use_sort; bool use_sort;
Tuplesortstate *tuplesort; Tuplesortstate *tuplesort;
...@@ -822,7 +827,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, ...@@ -822,7 +827,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
*/ */
vacuum_set_xid_limits(freeze_min_age, freeze_table_age, vacuum_set_xid_limits(freeze_min_age, freeze_table_age,
OldHeap->rd_rel->relisshared, OldHeap->rd_rel->relisshared,
&OldestXmin, &FreezeXid, NULL); &OldestXmin, &FreezeXid, NULL, &MultiXactFrzLimit);
/* /*
* FreezeXid will become the table's new relfrozenxid, and that mustn't go * FreezeXid will become the table's new relfrozenxid, and that mustn't go
...@@ -831,14 +836,16 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, ...@@ -831,14 +836,16 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
if (TransactionIdPrecedes(FreezeXid, OldHeap->rd_rel->relfrozenxid)) if (TransactionIdPrecedes(FreezeXid, OldHeap->rd_rel->relfrozenxid))
FreezeXid = OldHeap->rd_rel->relfrozenxid; FreezeXid = OldHeap->rd_rel->relfrozenxid;
/* return selected value to caller */ /* return selected values to caller */
*pFreezeXid = FreezeXid; *pFreezeXid = FreezeXid;
*pFreezeMulti = MultiXactFrzLimit;
/* Remember if it's a system catalog */ /* Remember if it's a system catalog */
is_system_catalog = IsSystemRelation(OldHeap); is_system_catalog = IsSystemRelation(OldHeap);
/* Initialize the rewrite operation */ /* Initialize the rewrite operation */
rwstate = begin_heap_rewrite(NewHeap, OldestXmin, FreezeXid, use_wal); rwstate = begin_heap_rewrite(NewHeap, OldestXmin, FreezeXid,
MultiXactFrzLimit, use_wal);
/* /*
* Decide whether to use an indexscan or seqscan-and-optional-sort to scan * Decide whether to use an indexscan or seqscan-and-optional-sort to scan
...@@ -966,9 +973,8 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, ...@@ -966,9 +973,8 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
/* /*
* Similar situation to INSERT_IN_PROGRESS case. * Similar situation to INSERT_IN_PROGRESS case.
*/ */
Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI));
if (!is_system_catalog && if (!is_system_catalog &&
!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmax(tuple->t_data))) !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(tuple->t_data)))
elog(WARNING, "concurrent delete in progress within table \"%s\"", elog(WARNING, "concurrent delete in progress within table \"%s\"",
RelationGetRelationName(OldHeap)); RelationGetRelationName(OldHeap));
/* treat as recently dead */ /* treat as recently dead */
...@@ -1097,6 +1103,7 @@ static void ...@@ -1097,6 +1103,7 @@ static void
swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class, swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
bool swap_toast_by_content, bool swap_toast_by_content,
TransactionId frozenXid, TransactionId frozenXid,
MultiXactId frozenMulti,
Oid *mapped_tables) Oid *mapped_tables)
{ {
Relation relRelation; Relation relRelation;
...@@ -1204,11 +1211,13 @@ swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class, ...@@ -1204,11 +1211,13 @@ swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
* and then fail to commit the pg_class update. * and then fail to commit the pg_class update.
*/ */
/* set rel1's frozen Xid */ /* set rel1's frozen Xid and minimum MultiXid */
if (relform1->relkind != RELKIND_INDEX) if (relform1->relkind != RELKIND_INDEX)
{ {
Assert(TransactionIdIsNormal(frozenXid)); Assert(TransactionIdIsNormal(frozenXid));
relform1->relfrozenxid = frozenXid; relform1->relfrozenxid = frozenXid;
Assert(MultiXactIdIsValid(frozenMulti));
relform1->relminmxid = frozenMulti;
} }
/* swap size statistics too, since new rel has freshly-updated stats */ /* swap size statistics too, since new rel has freshly-updated stats */
...@@ -1272,6 +1281,7 @@ swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class, ...@@ -1272,6 +1281,7 @@ swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
target_is_pg_class, target_is_pg_class,
swap_toast_by_content, swap_toast_by_content,
frozenXid, frozenXid,
frozenMulti,
mapped_tables); mapped_tables);
} }
else else
...@@ -1361,6 +1371,7 @@ swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class, ...@@ -1361,6 +1371,7 @@ swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
target_is_pg_class, target_is_pg_class,
swap_toast_by_content, swap_toast_by_content,
InvalidTransactionId, InvalidTransactionId,
InvalidMultiXactId,
mapped_tables); mapped_tables);
/* Clean up. */ /* Clean up. */
...@@ -1398,7 +1409,8 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, ...@@ -1398,7 +1409,8 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
bool is_system_catalog, bool is_system_catalog,
bool swap_toast_by_content, bool swap_toast_by_content,
bool check_constraints, bool check_constraints,
TransactionId frozenXid) TransactionId frozenXid,
MultiXactId frozenMulti)
{ {
ObjectAddress object; ObjectAddress object;
Oid mapped_tables[4]; Oid mapped_tables[4];
...@@ -1414,7 +1426,8 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, ...@@ -1414,7 +1426,8 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
*/ */
swap_relation_files(OIDOldHeap, OIDNewHeap, swap_relation_files(OIDOldHeap, OIDNewHeap,
(OIDOldHeap == RelationRelationId), (OIDOldHeap == RelationRelationId),
swap_toast_by_content, frozenXid, mapped_tables); swap_toast_by_content, frozenXid, frozenMulti,
mapped_tables);
/* /*
* If it's a system catalog, queue an sinval message to flush all * If it's a system catalog, queue an sinval message to flush all
......
...@@ -80,6 +80,7 @@ static bool get_db_info(const char *name, LOCKMODE lockmode, ...@@ -80,6 +80,7 @@ static bool get_db_info(const char *name, LOCKMODE lockmode,
Oid *dbIdP, Oid *ownerIdP, Oid *dbIdP, Oid *ownerIdP,
int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
Oid *dbLastSysOidP, TransactionId *dbFrozenXidP, Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
MultiXactId *dbMinMultiP,
Oid *dbTablespace, char **dbCollate, char **dbCtype); Oid *dbTablespace, char **dbCollate, char **dbCtype);
static bool have_createdb_privilege(void); static bool have_createdb_privilege(void);
static void remove_dbtablespaces(Oid db_id); static void remove_dbtablespaces(Oid db_id);
...@@ -104,6 +105,7 @@ createdb(const CreatedbStmt *stmt) ...@@ -104,6 +105,7 @@ createdb(const CreatedbStmt *stmt)
bool src_allowconn; bool src_allowconn;
Oid src_lastsysoid; Oid src_lastsysoid;
TransactionId src_frozenxid; TransactionId src_frozenxid;
MultiXactId src_minmxid;
Oid src_deftablespace; Oid src_deftablespace;
volatile Oid dst_deftablespace; volatile Oid dst_deftablespace;
Relation pg_database_rel; Relation pg_database_rel;
...@@ -288,7 +290,7 @@ createdb(const CreatedbStmt *stmt) ...@@ -288,7 +290,7 @@ createdb(const CreatedbStmt *stmt)
if (!get_db_info(dbtemplate, ShareLock, if (!get_db_info(dbtemplate, ShareLock,
&src_dboid, &src_owner, &src_encoding, &src_dboid, &src_owner, &src_encoding,
&src_istemplate, &src_allowconn, &src_lastsysoid, &src_istemplate, &src_allowconn, &src_lastsysoid,
&src_frozenxid, &src_deftablespace, &src_frozenxid, &src_minmxid, &src_deftablespace,
&src_collate, &src_ctype)) &src_collate, &src_ctype))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE), (errcode(ERRCODE_UNDEFINED_DATABASE),
...@@ -491,6 +493,7 @@ createdb(const CreatedbStmt *stmt) ...@@ -491,6 +493,7 @@ createdb(const CreatedbStmt *stmt)
new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit); new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid); new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid); new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace); new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
/* /*
...@@ -786,7 +789,7 @@ dropdb(const char *dbname, bool missing_ok) ...@@ -786,7 +789,7 @@ dropdb(const char *dbname, bool missing_ok)
pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock); pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
&db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL)) &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
{ {
if (!missing_ok) if (!missing_ok)
{ {
...@@ -945,7 +948,7 @@ RenameDatabase(const char *oldname, const char *newname) ...@@ -945,7 +948,7 @@ RenameDatabase(const char *oldname, const char *newname)
rel = heap_open(DatabaseRelationId, RowExclusiveLock); rel = heap_open(DatabaseRelationId, RowExclusiveLock);
if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
NULL, NULL, NULL, NULL, NULL, NULL, NULL)) NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE), (errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", oldname))); errmsg("database \"%s\" does not exist", oldname)));
...@@ -1046,7 +1049,7 @@ movedb(const char *dbname, const char *tblspcname) ...@@ -1046,7 +1049,7 @@ movedb(const char *dbname, const char *tblspcname)
pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock); pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL)) NULL, NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE), (errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", dbname))); errmsg("database \"%s\" does not exist", dbname)));
...@@ -1599,6 +1602,7 @@ get_db_info(const char *name, LOCKMODE lockmode, ...@@ -1599,6 +1602,7 @@ get_db_info(const char *name, LOCKMODE lockmode,
Oid *dbIdP, Oid *ownerIdP, Oid *dbIdP, Oid *ownerIdP,
int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
Oid *dbLastSysOidP, TransactionId *dbFrozenXidP, Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
MultiXactId *dbMinMultiP,
Oid *dbTablespace, char **dbCollate, char **dbCtype) Oid *dbTablespace, char **dbCollate, char **dbCtype)
{ {
bool result = false; bool result = false;
...@@ -1685,6 +1689,9 @@ get_db_info(const char *name, LOCKMODE lockmode, ...@@ -1685,6 +1689,9 @@ get_db_info(const char *name, LOCKMODE lockmode,
/* limit of frozen XIDs */ /* limit of frozen XIDs */
if (dbFrozenXidP) if (dbFrozenXidP)
*dbFrozenXidP = dbform->datfrozenxid; *dbFrozenXidP = dbform->datfrozenxid;
/* limit of frozen Multixacts */
if (dbMinMultiP)
*dbMinMultiP = dbform->datminmxid;
/* default tablespace for this database */ /* default tablespace for this database */
if (dbTablespace) if (dbTablespace)
*dbTablespace = dbform->dattablespace; *dbTablespace = dbform->dattablespace;
......
...@@ -14,8 +14,9 @@ ...@@ -14,8 +14,9 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "access/transam.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/multixact.h"
#include "access/transam.h"
#include "access/xlogutils.h" #include "access/xlogutils.h"
#include "catalog/dependency.h" #include "catalog/dependency.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
...@@ -282,8 +283,10 @@ ResetSequence(Oid seq_relid) ...@@ -282,8 +283,10 @@ ResetSequence(Oid seq_relid)
/* /*
* Create a new storage file for the sequence. We want to keep the * Create a new storage file for the sequence. We want to keep the
* sequence's relfrozenxid at 0, since it won't contain any unfrozen XIDs. * sequence's relfrozenxid at 0, since it won't contain any unfrozen XIDs.
* Same with relminmxid, since a sequence will never contain multixacts.
*/ */
RelationSetNewRelfilenode(seq_rel, InvalidTransactionId); RelationSetNewRelfilenode(seq_rel, InvalidTransactionId,
InvalidMultiXactId);
/* /*
* Insert the modified tuple into the new storage file. * Insert the modified tuple into the new storage file.
...@@ -1110,7 +1113,8 @@ read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple) ...@@ -1110,7 +1113,8 @@ read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
* bit update, ie, don't bother to WAL-log it, since we can certainly do * bit update, ie, don't bother to WAL-log it, since we can certainly do
* this again if the update gets lost. * this again if the update gets lost.
*/ */
if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId) Assert(!(seqtuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI));
if (HeapTupleHeaderGetRawXmax(seqtuple->t_data) != InvalidTransactionId)
{ {
HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId); HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED; seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
......
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
#include "postgres.h" #include "postgres.h"
#include "access/genam.h" #include "access/genam.h"
#include "access/heapam.h"
#include "access/heapam_xlog.h" #include "access/heapam_xlog.h"
#include "access/multixact.h"
#include "access/reloptions.h" #include "access/reloptions.h"
#include "access/relscan.h" #include "access/relscan.h"
#include "access/sysattr.h" #include "access/sysattr.h"
...@@ -1130,6 +1132,7 @@ ExecuteTruncate(TruncateStmt *stmt) ...@@ -1130,6 +1132,7 @@ ExecuteTruncate(TruncateStmt *stmt)
{ {
Oid heap_relid; Oid heap_relid;
Oid toast_relid; Oid toast_relid;
MultiXactId minmulti;
/* /*
* This effectively deletes all rows in the table, and may be done * This effectively deletes all rows in the table, and may be done
...@@ -1139,6 +1142,8 @@ ExecuteTruncate(TruncateStmt *stmt) ...@@ -1139,6 +1142,8 @@ ExecuteTruncate(TruncateStmt *stmt)
*/ */
CheckTableForSerializableConflictIn(rel); CheckTableForSerializableConflictIn(rel);
minmulti = GetOldestMultiXactId();
/* /*
* Need the full transaction-safe pushups. * Need the full transaction-safe pushups.
* *
...@@ -1146,7 +1151,7 @@ ExecuteTruncate(TruncateStmt *stmt) ...@@ -1146,7 +1151,7 @@ ExecuteTruncate(TruncateStmt *stmt)
* as the relfilenode value. The old storage file is scheduled for * as the relfilenode value. The old storage file is scheduled for
* deletion at commit. * deletion at commit.
*/ */
RelationSetNewRelfilenode(rel, RecentXmin); RelationSetNewRelfilenode(rel, RecentXmin, minmulti);
if (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED) if (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
heap_create_init_fork(rel); heap_create_init_fork(rel);
...@@ -1159,7 +1164,7 @@ ExecuteTruncate(TruncateStmt *stmt) ...@@ -1159,7 +1164,7 @@ ExecuteTruncate(TruncateStmt *stmt)
if (OidIsValid(toast_relid)) if (OidIsValid(toast_relid))
{ {
rel = relation_open(toast_relid, AccessExclusiveLock); rel = relation_open(toast_relid, AccessExclusiveLock);
RelationSetNewRelfilenode(rel, RecentXmin); RelationSetNewRelfilenode(rel, RecentXmin, minmulti);
if (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED) if (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
heap_create_init_fork(rel); heap_create_init_fork(rel);
heap_close(rel, NoLock); heap_close(rel, NoLock);
...@@ -3516,7 +3521,8 @@ ATRewriteTables(List **wqueue, LOCKMODE lockmode) ...@@ -3516,7 +3521,8 @@ ATRewriteTables(List **wqueue, LOCKMODE lockmode)
* interest in letting this code work on system catalogs. * interest in letting this code work on system catalogs.
*/ */
finish_heap_swap(tab->relid, OIDNewHeap, finish_heap_swap(tab->relid, OIDNewHeap,
false, false, true, RecentXmin); false, false, true, RecentXmin,
ReadNextMultiXactId());
} }
else else
{ {
......
...@@ -73,6 +73,7 @@ static HeapTuple GetTupleForTrigger(EState *estate, ...@@ -73,6 +73,7 @@ static HeapTuple GetTupleForTrigger(EState *estate,
EPQState *epqstate, EPQState *epqstate,
ResultRelInfo *relinfo, ResultRelInfo *relinfo,
ItemPointer tid, ItemPointer tid,
LockTupleMode lockmode,
TupleTableSlot **newSlot); TupleTableSlot **newSlot);
static bool TriggerEnabled(EState *estate, ResultRelInfo *relinfo, static bool TriggerEnabled(EState *estate, ResultRelInfo *relinfo,
Trigger *trigger, TriggerEvent event, Trigger *trigger, TriggerEvent event,
...@@ -2147,7 +2148,7 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ...@@ -2147,7 +2148,7 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate,
int i; int i;
trigtuple = GetTupleForTrigger(estate, epqstate, relinfo, tupleid, trigtuple = GetTupleForTrigger(estate, epqstate, relinfo, tupleid,
&newSlot); LockTupleExclusive, &newSlot);
if (trigtuple == NULL) if (trigtuple == NULL)
return false; return false;
...@@ -2201,7 +2202,8 @@ ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ...@@ -2201,7 +2202,8 @@ ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo,
if (trigdesc && trigdesc->trig_delete_after_row) if (trigdesc && trigdesc->trig_delete_after_row)
{ {
HeapTuple trigtuple = GetTupleForTrigger(estate, NULL, relinfo, HeapTuple trigtuple = GetTupleForTrigger(estate, NULL, relinfo,
tupleid, NULL); tupleid, LockTupleExclusive,
NULL);
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_DELETE, AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_DELETE,
true, trigtuple, NULL, NIL, NULL); true, trigtuple, NULL, NIL, NULL);
...@@ -2332,10 +2334,24 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ...@@ -2332,10 +2334,24 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
TupleTableSlot *newSlot; TupleTableSlot *newSlot;
int i; int i;
Bitmapset *modifiedCols; Bitmapset *modifiedCols;
Bitmapset *keyCols;
LockTupleMode lockmode;
/*
* Compute lock mode to use. If columns that are part of the key have not
* been modified, then we can use a weaker lock, allowing for better
* concurrency.
*/
modifiedCols = GetModifiedColumns(relinfo, estate);
keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc, true);
if (bms_overlap(keyCols, modifiedCols))
lockmode = LockTupleExclusive;
else
lockmode = LockTupleNoKeyExclusive;
/* get a copy of the on-disk tuple we are planning to update */ /* get a copy of the on-disk tuple we are planning to update */
trigtuple = GetTupleForTrigger(estate, epqstate, relinfo, tupleid, trigtuple = GetTupleForTrigger(estate, epqstate, relinfo, tupleid,
&newSlot); lockmode, &newSlot);
if (trigtuple == NULL) if (trigtuple == NULL)
return NULL; /* cancel the update action */ return NULL; /* cancel the update action */
...@@ -2357,7 +2373,6 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ...@@ -2357,7 +2373,6 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
newtuple = slottuple; newtuple = slottuple;
} }
modifiedCols = GetModifiedColumns(relinfo, estate);
LocTriggerData.type = T_TriggerData; LocTriggerData.type = T_TriggerData;
LocTriggerData.tg_event = TRIGGER_EVENT_UPDATE | LocTriggerData.tg_event = TRIGGER_EVENT_UPDATE |
...@@ -2426,7 +2441,8 @@ ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ...@@ -2426,7 +2441,8 @@ ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo,
if (trigdesc && trigdesc->trig_update_after_row) if (trigdesc && trigdesc->trig_update_after_row)
{ {
HeapTuple trigtuple = GetTupleForTrigger(estate, NULL, relinfo, HeapTuple trigtuple = GetTupleForTrigger(estate, NULL, relinfo,
tupleid, NULL); tupleid, LockTupleExclusive,
NULL);
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_UPDATE, AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_UPDATE,
true, trigtuple, newtuple, recheckIndexes, true, trigtuple, newtuple, recheckIndexes,
...@@ -2565,6 +2581,7 @@ GetTupleForTrigger(EState *estate, ...@@ -2565,6 +2581,7 @@ GetTupleForTrigger(EState *estate,
EPQState *epqstate, EPQState *epqstate,
ResultRelInfo *relinfo, ResultRelInfo *relinfo,
ItemPointer tid, ItemPointer tid,
LockTupleMode lockmode,
TupleTableSlot **newSlot) TupleTableSlot **newSlot)
{ {
Relation relation = relinfo->ri_RelationDesc; Relation relation = relinfo->ri_RelationDesc;
...@@ -2589,8 +2606,8 @@ ltrmark:; ...@@ -2589,8 +2606,8 @@ ltrmark:;
tuple.t_self = *tid; tuple.t_self = *tid;
test = heap_lock_tuple(relation, &tuple, test = heap_lock_tuple(relation, &tuple,
estate->es_output_cid, estate->es_output_cid,
LockTupleExclusive, false /* wait */, lockmode, false /* wait */,
&buffer, &hufd); false, &buffer, &hufd);
switch (test) switch (test)
{ {
case HeapTupleSelfUpdated: case HeapTupleSelfUpdated:
...@@ -2630,6 +2647,7 @@ ltrmark:; ...@@ -2630,6 +2647,7 @@ ltrmark:;
epqstate, epqstate,
relation, relation,
relinfo->ri_RangeTableIndex, relinfo->ri_RangeTableIndex,
lockmode,
&hufd.ctid, &hufd.ctid,
hufd.xmax); hufd.xmax);
if (!TupIsNull(epqslot)) if (!TupIsNull(epqslot))
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "access/genam.h" #include "access/genam.h"
#include "access/heapam.h" #include "access/heapam.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/multixact.h"
#include "access/transam.h" #include "access/transam.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
...@@ -63,7 +64,7 @@ static BufferAccessStrategy vac_strategy; ...@@ -63,7 +64,7 @@ static BufferAccessStrategy vac_strategy;
/* non-export function prototypes */ /* non-export function prototypes */
static List *get_rel_oids(Oid relid, const RangeVar *vacrel); static List *get_rel_oids(Oid relid, const RangeVar *vacrel);
static void vac_truncate_clog(TransactionId frozenXID); static void vac_truncate_clog(TransactionId frozenXID, MultiXactId frozenMulti);
static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast, static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast,
bool for_wraparound); bool for_wraparound);
...@@ -379,7 +380,8 @@ vacuum_set_xid_limits(int freeze_min_age, ...@@ -379,7 +380,8 @@ vacuum_set_xid_limits(int freeze_min_age,
bool sharedRel, bool sharedRel,
TransactionId *oldestXmin, TransactionId *oldestXmin,
TransactionId *freezeLimit, TransactionId *freezeLimit,
TransactionId *freezeTableLimit) TransactionId *freezeTableLimit,
MultiXactId *multiXactFrzLimit)
{ {
int freezemin; int freezemin;
TransactionId limit; TransactionId limit;
...@@ -463,8 +465,22 @@ vacuum_set_xid_limits(int freeze_min_age, ...@@ -463,8 +465,22 @@ vacuum_set_xid_limits(int freeze_min_age,
*freezeTableLimit = limit; *freezeTableLimit = limit;
} }
}
if (multiXactFrzLimit != NULL)
{
MultiXactId mxLimit;
/*
* simplistic multixactid freezing: use the same freezing policy as
* for Xids
*/
mxLimit = GetOldestMultiXactId() - freezemin;
if (mxLimit < FirstMultiXactId)
mxLimit = FirstMultiXactId;
*multiXactFrzLimit = mxLimit;
}
}
/* /*
* vac_estimate_reltuples() -- estimate the new value for pg_class.reltuples * vac_estimate_reltuples() -- estimate the new value for pg_class.reltuples
...@@ -574,7 +590,8 @@ void ...@@ -574,7 +590,8 @@ void
vac_update_relstats(Relation relation, vac_update_relstats(Relation relation,
BlockNumber num_pages, double num_tuples, BlockNumber num_pages, double num_tuples,
BlockNumber num_all_visible_pages, BlockNumber num_all_visible_pages,
bool hasindex, TransactionId frozenxid) bool hasindex, TransactionId frozenxid,
MultiXactId minmulti)
{ {
Oid relid = RelationGetRelid(relation); Oid relid = RelationGetRelid(relation);
Relation rd; Relation rd;
...@@ -648,6 +665,14 @@ vac_update_relstats(Relation relation, ...@@ -648,6 +665,14 @@ vac_update_relstats(Relation relation,
dirty = true; dirty = true;
} }
/* relminmxid must never go backward, either */
if (MultiXactIdIsValid(minmulti) &&
MultiXactIdPrecedes(pgcform->relminmxid, minmulti))
{
pgcform->relminmxid = minmulti;
dirty = true;
}
/* If anything changed, write out the tuple. */ /* If anything changed, write out the tuple. */
if (dirty) if (dirty)
heap_inplace_update(rd, ctup); heap_inplace_update(rd, ctup);
...@@ -660,8 +685,13 @@ vac_update_relstats(Relation relation, ...@@ -660,8 +685,13 @@ vac_update_relstats(Relation relation,
* vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB * vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
* *
* Update pg_database's datfrozenxid entry for our database to be the * Update pg_database's datfrozenxid entry for our database to be the
* minimum of the pg_class.relfrozenxid values. If we are able to * minimum of the pg_class.relfrozenxid values.
* advance pg_database.datfrozenxid, also try to truncate pg_clog. *
* Similarly, update our datfrozenmulti to be the minimum of the
* pg_class.relfrozenmulti values.
*
* If we are able to advance either pg_database value, also try to
* truncate pg_clog and pg_multixact.
* *
* We violate transaction semantics here by overwriting the database's * We violate transaction semantics here by overwriting the database's
* existing pg_database tuple with the new value. This is reasonably * existing pg_database tuple with the new value. This is reasonably
...@@ -678,16 +708,23 @@ vac_update_datfrozenxid(void) ...@@ -678,16 +708,23 @@ vac_update_datfrozenxid(void)
SysScanDesc scan; SysScanDesc scan;
HeapTuple classTup; HeapTuple classTup;
TransactionId newFrozenXid; TransactionId newFrozenXid;
MultiXactId newFrozenMulti;
bool dirty = false; bool dirty = false;
/* /*
* Initialize the "min" calculation with GetOldestXmin, which is a * Initialize the "min" calculation with GetOldestXmin, which is a
* reasonable approximation to the minimum relfrozenxid for not-yet- * reasonable approximation to the minimum relfrozenxid for not-yet-
* committed pg_class entries for new tables; see AddNewRelationTuple(). * committed pg_class entries for new tables; see AddNewRelationTuple().
* Se we cannot produce a wrong minimum by starting with this. * So we cannot produce a wrong minimum by starting with this.
*/ */
newFrozenXid = GetOldestXmin(true, true); newFrozenXid = GetOldestXmin(true, true);
/*
* Similarly, initialize the MultiXact "min" with the value that would
* be used on pg_class for new tables. See AddNewRelationTuple().
*/
newFrozenMulti = GetOldestMultiXactId();
/* /*
* We must seqscan pg_class to find the minimum Xid, because there is no * We must seqscan pg_class to find the minimum Xid, because there is no
* index that can help us here. * index that can help us here.
...@@ -710,9 +747,13 @@ vac_update_datfrozenxid(void) ...@@ -710,9 +747,13 @@ vac_update_datfrozenxid(void)
continue; continue;
Assert(TransactionIdIsNormal(classForm->relfrozenxid)); Assert(TransactionIdIsNormal(classForm->relfrozenxid));
Assert(MultiXactIdIsValid(classForm->relminmxid));
if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid)) if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
newFrozenXid = classForm->relfrozenxid; newFrozenXid = classForm->relfrozenxid;
if (MultiXactIdPrecedes(classForm->relminmxid, newFrozenMulti))
newFrozenMulti = classForm->relminmxid;
} }
/* we're done with pg_class */ /* we're done with pg_class */
...@@ -720,6 +761,7 @@ vac_update_datfrozenxid(void) ...@@ -720,6 +761,7 @@ vac_update_datfrozenxid(void)
heap_close(relation, AccessShareLock); heap_close(relation, AccessShareLock);
Assert(TransactionIdIsNormal(newFrozenXid)); Assert(TransactionIdIsNormal(newFrozenXid));
Assert(MultiXactIdIsValid(newFrozenMulti));
/* Now fetch the pg_database tuple we need to update. */ /* Now fetch the pg_database tuple we need to update. */
relation = heap_open(DatabaseRelationId, RowExclusiveLock); relation = heap_open(DatabaseRelationId, RowExclusiveLock);
...@@ -740,6 +782,13 @@ vac_update_datfrozenxid(void) ...@@ -740,6 +782,13 @@ vac_update_datfrozenxid(void)
dirty = true; dirty = true;
} }
/* ditto */
if (MultiXactIdPrecedes(dbform->datminmxid, newFrozenMulti))
{
dbform->datminmxid = newFrozenMulti;
dirty = true;
}
if (dirty) if (dirty)
heap_inplace_update(relation, tuple); heap_inplace_update(relation, tuple);
...@@ -752,7 +801,7 @@ vac_update_datfrozenxid(void) ...@@ -752,7 +801,7 @@ vac_update_datfrozenxid(void)
* this action will update that too. * this action will update that too.
*/ */
if (dirty || ForceTransactionIdLimitUpdate()) if (dirty || ForceTransactionIdLimitUpdate())
vac_truncate_clog(newFrozenXid); vac_truncate_clog(newFrozenXid, newFrozenMulti);
} }
...@@ -771,17 +820,19 @@ vac_update_datfrozenxid(void) ...@@ -771,17 +820,19 @@ vac_update_datfrozenxid(void)
* info is stale. * info is stale.
*/ */
static void static void
vac_truncate_clog(TransactionId frozenXID) vac_truncate_clog(TransactionId frozenXID, MultiXactId frozenMulti)
{ {
TransactionId myXID = GetCurrentTransactionId(); TransactionId myXID = GetCurrentTransactionId();
Relation relation; Relation relation;
HeapScanDesc scan; HeapScanDesc scan;
HeapTuple tuple; HeapTuple tuple;
Oid oldest_datoid; Oid oldestxid_datoid;
Oid oldestmulti_datoid;
bool frozenAlreadyWrapped = false; bool frozenAlreadyWrapped = false;
/* init oldest_datoid to sync with my frozenXID */ /* init oldest datoids to sync with my frozen values */
oldest_datoid = MyDatabaseId; oldestxid_datoid = MyDatabaseId;
oldestmulti_datoid = MyDatabaseId;
/* /*
* Scan pg_database to compute the minimum datfrozenxid * Scan pg_database to compute the minimum datfrozenxid
...@@ -804,13 +855,20 @@ vac_truncate_clog(TransactionId frozenXID) ...@@ -804,13 +855,20 @@ vac_truncate_clog(TransactionId frozenXID)
Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple); Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
Assert(TransactionIdIsNormal(dbform->datfrozenxid)); Assert(TransactionIdIsNormal(dbform->datfrozenxid));
Assert(MultiXactIdIsValid(dbform->datminmxid));
if (TransactionIdPrecedes(myXID, dbform->datfrozenxid)) if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
frozenAlreadyWrapped = true; frozenAlreadyWrapped = true;
else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID)) else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
{ {
frozenXID = dbform->datfrozenxid; frozenXID = dbform->datfrozenxid;
oldest_datoid = HeapTupleGetOid(tuple); oldestxid_datoid = HeapTupleGetOid(tuple);
}
if (MultiXactIdPrecedes(dbform->datminmxid, frozenMulti))
{
frozenMulti = dbform->datminmxid;
oldestmulti_datoid = HeapTupleGetOid(tuple);
} }
} }
...@@ -832,14 +890,18 @@ vac_truncate_clog(TransactionId frozenXID) ...@@ -832,14 +890,18 @@ vac_truncate_clog(TransactionId frozenXID)
return; return;
} }
/* Truncate CLOG to the oldest frozenxid */ /* Truncate CLOG and Multi to the oldest computed value */
TruncateCLOG(frozenXID); TruncateCLOG(frozenXID);
TruncateMultiXact(frozenMulti);
/* /*
* Update the wrap limit for GetNewTransactionId. Note: this function * Update the wrap limit for GetNewTransactionId and creation of new
* will also signal the postmaster for an(other) autovac cycle if needed. * MultiXactIds. Note: these functions will also signal the postmaster for
* an(other) autovac cycle if needed. XXX should we avoid possibly
* signalling twice?
*/ */
SetTransactionIdLimit(frozenXID, oldest_datoid); SetTransactionIdLimit(frozenXID, oldestxid_datoid);
MultiXactAdvanceOldest(frozenMulti, oldestmulti_datoid);
} }
......
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#include "access/heapam.h" #include "access/heapam.h"
#include "access/heapam_xlog.h" #include "access/heapam_xlog.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/multixact.h"
#include "access/transam.h" #include "access/transam.h"
#include "access/visibilitymap.h" #include "access/visibilitymap.h"
#include "catalog/storage.h" #include "catalog/storage.h"
...@@ -124,6 +125,7 @@ static int elevel = -1; ...@@ -124,6 +125,7 @@ static int elevel = -1;
static TransactionId OldestXmin; static TransactionId OldestXmin;
static TransactionId FreezeLimit; static TransactionId FreezeLimit;
static MultiXactId MultiXactFrzLimit;
static BufferAccessStrategy vac_strategy; static BufferAccessStrategy vac_strategy;
...@@ -180,6 +182,7 @@ lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt, ...@@ -180,6 +182,7 @@ lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
double new_rel_tuples; double new_rel_tuples;
BlockNumber new_rel_allvisible; BlockNumber new_rel_allvisible;
TransactionId new_frozen_xid; TransactionId new_frozen_xid;
MultiXactId new_min_multi;
/* measure elapsed time iff autovacuum logging requires it */ /* measure elapsed time iff autovacuum logging requires it */
if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0) if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
...@@ -197,7 +200,8 @@ lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt, ...@@ -197,7 +200,8 @@ lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
vacuum_set_xid_limits(vacstmt->freeze_min_age, vacstmt->freeze_table_age, vacuum_set_xid_limits(vacstmt->freeze_min_age, vacstmt->freeze_table_age,
onerel->rd_rel->relisshared, onerel->rd_rel->relisshared,
&OldestXmin, &FreezeLimit, &freezeTableLimit); &OldestXmin, &FreezeLimit, &freezeTableLimit,
&MultiXactFrzLimit);
scan_all = TransactionIdPrecedesOrEquals(onerel->rd_rel->relfrozenxid, scan_all = TransactionIdPrecedesOrEquals(onerel->rd_rel->relfrozenxid,
freezeTableLimit); freezeTableLimit);
...@@ -267,12 +271,17 @@ lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt, ...@@ -267,12 +271,17 @@ lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
if (vacrelstats->scanned_pages < vacrelstats->rel_pages) if (vacrelstats->scanned_pages < vacrelstats->rel_pages)
new_frozen_xid = InvalidTransactionId; new_frozen_xid = InvalidTransactionId;
new_min_multi = MultiXactFrzLimit;
if (vacrelstats->scanned_pages < vacrelstats->rel_pages)
new_min_multi = InvalidMultiXactId;
vac_update_relstats(onerel, vac_update_relstats(onerel,
new_rel_pages, new_rel_pages,
new_rel_tuples, new_rel_tuples,
new_rel_allvisible, new_rel_allvisible,
vacrelstats->hasindex, vacrelstats->hasindex,
new_frozen_xid); new_frozen_xid,
new_min_multi);
/* /*
* Report results to the stats collector, too. An early terminated * Report results to the stats collector, too. An early terminated
...@@ -839,7 +848,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats, ...@@ -839,7 +848,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
* Each non-removable tuple must be checked to see if it needs * Each non-removable tuple must be checked to see if it needs
* freezing. Note we already have exclusive buffer lock. * freezing. Note we already have exclusive buffer lock.
*/ */
if (heap_freeze_tuple(tuple.t_data, FreezeLimit)) if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
MultiXactFrzLimit))
frozen[nfrozen++] = offnum; frozen[nfrozen++] = offnum;
} }
} /* scan along page */ } /* scan along page */
...@@ -857,7 +867,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats, ...@@ -857,7 +867,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
XLogRecPtr recptr; XLogRecPtr recptr;
recptr = log_heap_freeze(onerel, buf, FreezeLimit, recptr = log_heap_freeze(onerel, buf, FreezeLimit,
frozen, nfrozen); MultiXactFrzLimit, frozen, nfrozen);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
} }
...@@ -1176,7 +1186,8 @@ lazy_check_needs_freeze(Buffer buf) ...@@ -1176,7 +1186,8 @@ lazy_check_needs_freeze(Buffer buf)
tupleheader = (HeapTupleHeader) PageGetItem(page, itemid); tupleheader = (HeapTupleHeader) PageGetItem(page, itemid);
if (heap_tuple_needs_freeze(tupleheader, FreezeLimit, buf)) if (heap_tuple_needs_freeze(tupleheader, FreezeLimit,
MultiXactFrzLimit, buf))
return true; return true;
} /* scan along page */ } /* scan along page */
...@@ -1253,7 +1264,8 @@ lazy_cleanup_index(Relation indrel, ...@@ -1253,7 +1264,8 @@ lazy_cleanup_index(Relation indrel,
stats->num_index_tuples, stats->num_index_tuples,
0, 0,
false, false,
InvalidTransactionId); InvalidTransactionId,
InvalidMultiXactId);
ereport(elevel, ereport(elevel,
(errmsg("index \"%s\" now contains %.0f row versions in %u pages", (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
......
...@@ -162,7 +162,8 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) ...@@ -162,7 +162,8 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
case CMD_SELECT: case CMD_SELECT:
/* /*
* SELECT FOR UPDATE/SHARE and modifying CTEs need to mark tuples * SELECT FOR [KEY] UPDATE/SHARE and modifying CTEs need to mark
* tuples
*/ */
if (queryDesc->plannedstmt->rowMarks != NIL || if (queryDesc->plannedstmt->rowMarks != NIL ||
queryDesc->plannedstmt->hasModifyingCTE) queryDesc->plannedstmt->hasModifyingCTE)
...@@ -775,7 +776,7 @@ InitPlan(QueryDesc *queryDesc, int eflags) ...@@ -775,7 +776,7 @@ InitPlan(QueryDesc *queryDesc, int eflags)
} }
/* /*
* Similarly, we have to lock relations selected FOR UPDATE/FOR SHARE * Similarly, we have to lock relations selected FOR [KEY] UPDATE/SHARE
* before we initialize the plan tree, else we'd be risking lock upgrades. * before we initialize the plan tree, else we'd be risking lock upgrades.
* While we are at it, build the ExecRowMark list. * While we are at it, build the ExecRowMark list.
*/ */
...@@ -794,7 +795,9 @@ InitPlan(QueryDesc *queryDesc, int eflags) ...@@ -794,7 +795,9 @@ InitPlan(QueryDesc *queryDesc, int eflags)
switch (rc->markType) switch (rc->markType)
{ {
case ROW_MARK_EXCLUSIVE: case ROW_MARK_EXCLUSIVE:
case ROW_MARK_NOKEYEXCLUSIVE:
case ROW_MARK_SHARE: case ROW_MARK_SHARE:
case ROW_MARK_KEYSHARE:
relid = getrelid(rc->rti, rangeTable); relid = getrelid(rc->rti, rangeTable);
relation = heap_open(relid, RowShareLock); relation = heap_open(relid, RowShareLock);
break; break;
...@@ -1341,7 +1344,7 @@ ExecEndPlan(PlanState *planstate, EState *estate) ...@@ -1341,7 +1344,7 @@ ExecEndPlan(PlanState *planstate, EState *estate)
} }
/* /*
* close any relations selected FOR UPDATE/FOR SHARE, again keeping locks * close any relations selected FOR [KEY] UPDATE/SHARE, again keeping locks
*/ */
foreach(l, estate->es_rowMarks) foreach(l, estate->es_rowMarks)
{ {
...@@ -1694,6 +1697,7 @@ ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist) ...@@ -1694,6 +1697,7 @@ ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist)
* epqstate - state for EvalPlanQual rechecking * epqstate - state for EvalPlanQual rechecking
* relation - table containing tuple * relation - table containing tuple
* rti - rangetable index of table containing tuple * rti - rangetable index of table containing tuple
* lockmode - requested tuple lock mode
* *tid - t_ctid from the outdated tuple (ie, next updated version) * *tid - t_ctid from the outdated tuple (ie, next updated version)
* priorXmax - t_xmax from the outdated tuple * priorXmax - t_xmax from the outdated tuple
* *
...@@ -1702,10 +1706,13 @@ ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist) ...@@ -1702,10 +1706,13 @@ ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist)
* *
* Returns a slot containing the new candidate update/delete tuple, or * Returns a slot containing the new candidate update/delete tuple, or
* NULL if we determine we shouldn't process the row. * NULL if we determine we shouldn't process the row.
*
* Note: properly, lockmode should be declared as enum LockTupleMode,
* but we use "int" to avoid having to include heapam.h in executor.h.
*/ */
TupleTableSlot * TupleTableSlot *
EvalPlanQual(EState *estate, EPQState *epqstate, EvalPlanQual(EState *estate, EPQState *epqstate,
Relation relation, Index rti, Relation relation, Index rti, int lockmode,
ItemPointer tid, TransactionId priorXmax) ItemPointer tid, TransactionId priorXmax)
{ {
TupleTableSlot *slot; TupleTableSlot *slot;
...@@ -1716,7 +1723,7 @@ EvalPlanQual(EState *estate, EPQState *epqstate, ...@@ -1716,7 +1723,7 @@ EvalPlanQual(EState *estate, EPQState *epqstate,
/* /*
* Get and lock the updated version of the row; if fail, return NULL. * Get and lock the updated version of the row; if fail, return NULL.
*/ */
copyTuple = EvalPlanQualFetch(estate, relation, LockTupleExclusive, copyTuple = EvalPlanQualFetch(estate, relation, lockmode,
tid, priorXmax); tid, priorXmax);
if (copyTuple == NULL) if (copyTuple == NULL)
...@@ -1864,7 +1871,7 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode, ...@@ -1864,7 +1871,7 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
test = heap_lock_tuple(relation, &tuple, test = heap_lock_tuple(relation, &tuple,
estate->es_output_cid, estate->es_output_cid,
lockmode, false /* wait */, lockmode, false /* wait */,
&buffer, &hufd); false, &buffer, &hufd);
/* We now have two pins on the buffer, get rid of one */ /* We now have two pins on the buffer, get rid of one */
ReleaseBuffer(buffer); ReleaseBuffer(buffer);
...@@ -1965,7 +1972,7 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode, ...@@ -1965,7 +1972,7 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
/* updated, so look at the updated row */ /* updated, so look at the updated row */
tuple.t_self = tuple.t_data->t_ctid; tuple.t_self = tuple.t_data->t_ctid;
/* updated row should have xmin matching this xmax */ /* updated row should have xmin matching this xmax */
priorXmax = HeapTupleHeaderGetXmax(tuple.t_data); priorXmax = HeapTupleHeaderGetUpdateXid(tuple.t_data);
ReleaseBuffer(buffer); ReleaseBuffer(buffer);
/* loop back to fetch next in chain */ /* loop back to fetch next in chain */
} }
......
This diff is collapsed.
This diff is collapsed.
...@@ -2037,7 +2037,7 @@ _copyRowMarkClause(const RowMarkClause *from) ...@@ -2037,7 +2037,7 @@ _copyRowMarkClause(const RowMarkClause *from)
RowMarkClause *newnode = makeNode(RowMarkClause); RowMarkClause *newnode = makeNode(RowMarkClause);
COPY_SCALAR_FIELD(rti); COPY_SCALAR_FIELD(rti);
COPY_SCALAR_FIELD(forUpdate); COPY_SCALAR_FIELD(strength);
COPY_SCALAR_FIELD(noWait); COPY_SCALAR_FIELD(noWait);
COPY_SCALAR_FIELD(pushedDown); COPY_SCALAR_FIELD(pushedDown);
...@@ -2400,7 +2400,7 @@ _copyLockingClause(const LockingClause *from) ...@@ -2400,7 +2400,7 @@ _copyLockingClause(const LockingClause *from)
LockingClause *newnode = makeNode(LockingClause); LockingClause *newnode = makeNode(LockingClause);
COPY_NODE_FIELD(lockedRels); COPY_NODE_FIELD(lockedRels);
COPY_SCALAR_FIELD(forUpdate); COPY_SCALAR_FIELD(strength);
COPY_SCALAR_FIELD(noWait); COPY_SCALAR_FIELD(noWait);
return newnode; return newnode;
......
This diff is collapsed.
This diff is collapsed.
...@@ -301,7 +301,7 @@ _readRowMarkClause(void) ...@@ -301,7 +301,7 @@ _readRowMarkClause(void)
READ_LOCALS(RowMarkClause); READ_LOCALS(RowMarkClause);
READ_UINT_FIELD(rti); READ_UINT_FIELD(rti);
READ_BOOL_FIELD(forUpdate); READ_ENUM_FIELD(strength, LockClauseStrength);
READ_BOOL_FIELD(noWait); READ_BOOL_FIELD(noWait);
READ_BOOL_FIELD(pushedDown); READ_BOOL_FIELD(pushedDown);
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment