Commit 6c2003f8 authored by Andres Freund's avatar Andres Freund

Don't force-assign transaction id when exporting a snapshot.

Previously we required every exported transaction to have an xid
assigned. That was used to check that the exporting transaction is
still running, which in turn is needed to guarantee that that
necessary rows haven't been removed in between exporting and importing
the snapshot.

The exported xid caused unnecessary problems with logical decoding,
because slot creation has to wait for all concurrent xid to finish,
which in turn serializes concurrent slot creation.   It also
prohibited snapshots to be exported on hot-standby replicas.

Instead export the virtual transactionid, which avoids the unnecessary
serialization and the inability to export snapshots on standbys. This
changes the file name of the exported snapshot, but since we never
documented what that one means, that seems ok.

Author: Petr Jelinek, slightly editorialized by me
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/f598b4b8-8cd7-0d54-0939-adda763d8c34@2ndquadrant.com
parent b6966d46
...@@ -222,8 +222,8 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa ...@@ -222,8 +222,8 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT pg_export_snapshot(); SELECT pg_export_snapshot();
pg_export_snapshot pg_export_snapshot
-------------------- ---------------------
000003A1-1 00000003-0000001B-1
(1 row) (1 row)
</programlisting> </programlisting>
...@@ -233,7 +233,7 @@ SELECT pg_export_snapshot(); ...@@ -233,7 +233,7 @@ SELECT pg_export_snapshot();
<programlisting> <programlisting>
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET TRANSACTION SNAPSHOT '000003A1-1'; SET TRANSACTION SNAPSHOT '00000003-0000001B-1';
</programlisting></para> </programlisting></para>
</refsect1> </refsect1>
......
...@@ -262,7 +262,7 @@ static bool ExportInProgress = false; ...@@ -262,7 +262,7 @@ static bool ExportInProgress = false;
static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
/* snapshot building/manipulation/distribution functions */ /* snapshot building/manipulation/distribution functions */
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid); static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
static void SnapBuildFreeSnapshot(Snapshot snap); static void SnapBuildFreeSnapshot(Snapshot snap);
...@@ -463,7 +463,7 @@ SnapBuildSnapDecRefcount(Snapshot snap) ...@@ -463,7 +463,7 @@ SnapBuildSnapDecRefcount(Snapshot snap)
* and ->subxip/subxcnt values. * and ->subxip/subxcnt values.
*/ */
static Snapshot static Snapshot
SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid) SnapBuildBuildSnapshot(SnapBuild *builder)
{ {
Snapshot snapshot; Snapshot snapshot;
Size ssize; Size ssize;
...@@ -562,7 +562,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder) ...@@ -562,7 +562,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
if (TransactionIdIsValid(MyPgXact->xmin)) if (TransactionIdIsValid(MyPgXact->xmin))
elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid"); elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid");
snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId()); snap = SnapBuildBuildSnapshot(builder);
/* /*
* We know that snap->xmin is alive, enforced by the logical xmin * We know that snap->xmin is alive, enforced by the logical xmin
...@@ -679,7 +679,7 @@ SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid) ...@@ -679,7 +679,7 @@ SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
/* only build a new snapshot if we don't have a prebuilt one */ /* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL) if (builder->snapshot == NULL)
{ {
builder->snapshot = SnapBuildBuildSnapshot(builder, xid); builder->snapshot = SnapBuildBuildSnapshot(builder);
/* increase refcount for the snapshot builder */ /* increase refcount for the snapshot builder */
SnapBuildSnapIncRefcount(builder->snapshot); SnapBuildSnapIncRefcount(builder->snapshot);
} }
...@@ -743,7 +743,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) ...@@ -743,7 +743,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
/* only build a new snapshot if we don't have a prebuilt one */ /* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL) if (builder->snapshot == NULL)
{ {
builder->snapshot = SnapBuildBuildSnapshot(builder, xid); builder->snapshot = SnapBuildBuildSnapshot(builder);
/* increase refcount for the snapshot builder */ /* increase refcount for the snapshot builder */
SnapBuildSnapIncRefcount(builder->snapshot); SnapBuildSnapIncRefcount(builder->snapshot);
} }
...@@ -1061,7 +1061,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, ...@@ -1061,7 +1061,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
if (builder->snapshot) if (builder->snapshot)
SnapBuildSnapDecRefcount(builder->snapshot); SnapBuildSnapDecRefcount(builder->snapshot);
builder->snapshot = SnapBuildBuildSnapshot(builder, xid); builder->snapshot = SnapBuildBuildSnapshot(builder);
/* we might need to execute invalidations, add snapshot */ /* we might need to execute invalidations, add snapshot */
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
...@@ -1831,7 +1831,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) ...@@ -1831,7 +1831,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
{ {
SnapBuildSnapDecRefcount(builder->snapshot); SnapBuildSnapDecRefcount(builder->snapshot);
} }
builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidTransactionId); builder->snapshot = SnapBuildBuildSnapshot(builder);
SnapBuildSnapIncRefcount(builder->snapshot); SnapBuildSnapIncRefcount(builder->snapshot);
ReorderBufferSetRestartPoint(builder->reorder, lsn); ReorderBufferSetRestartPoint(builder->reorder, lsn);
......
...@@ -1793,14 +1793,15 @@ GetSnapshotData(Snapshot snapshot) ...@@ -1793,14 +1793,15 @@ GetSnapshotData(Snapshot snapshot)
* Returns TRUE if successful, FALSE if source xact is no longer running. * Returns TRUE if successful, FALSE if source xact is no longer running.
*/ */
bool bool
ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid) ProcArrayInstallImportedXmin(TransactionId xmin,
VirtualTransactionId *sourcevxid)
{ {
bool result = false; bool result = false;
ProcArrayStruct *arrayP = procArray; ProcArrayStruct *arrayP = procArray;
int index; int index;
Assert(TransactionIdIsNormal(xmin)); Assert(TransactionIdIsNormal(xmin));
if (!TransactionIdIsNormal(sourcexid)) if (!sourcevxid)
return false; return false;
/* Get lock so source xact can't end while we're doing this */ /* Get lock so source xact can't end while we're doing this */
...@@ -1817,8 +1818,10 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid) ...@@ -1817,8 +1818,10 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
if (pgxact->vacuumFlags & PROC_IN_VACUUM) if (pgxact->vacuumFlags & PROC_IN_VACUUM)
continue; continue;
xid = pgxact->xid; /* fetch just once */ /* We are only interested in the specific virtual transaction. */
if (xid != sourcexid) if (proc->backendId != sourcevxid->backendId)
continue;
if (proc->lxid != sourcevxid->localTransactionId)
continue; continue;
/* /*
......
...@@ -148,7 +148,7 @@ ...@@ -148,7 +148,7 @@
* predicate lock maintenance * predicate lock maintenance
* GetSerializableTransactionSnapshot(Snapshot snapshot) * GetSerializableTransactionSnapshot(Snapshot snapshot)
* SetSerializableTransactionSnapshot(Snapshot snapshot, * SetSerializableTransactionSnapshot(Snapshot snapshot,
* TransactionId sourcexid) * VirtualTransactionId *sourcevxid)
* RegisterPredicateLockingXid(void) * RegisterPredicateLockingXid(void)
* PredicateLockRelation(Relation relation, Snapshot snapshot) * PredicateLockRelation(Relation relation, Snapshot snapshot)
* PredicateLockPage(Relation relation, BlockNumber blkno, * PredicateLockPage(Relation relation, BlockNumber blkno,
...@@ -434,7 +434,8 @@ static uint32 predicatelock_hash(const void *key, Size keysize); ...@@ -434,7 +434,8 @@ static uint32 predicatelock_hash(const void *key, Size keysize);
static void SummarizeOldestCommittedSxact(void); static void SummarizeOldestCommittedSxact(void);
static Snapshot GetSafeSnapshot(Snapshot snapshot); static Snapshot GetSafeSnapshot(Snapshot snapshot);
static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
TransactionId sourcexid); VirtualTransactionId *sourcevxid,
int sourcepid);
static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag); static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag, static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
PREDICATELOCKTARGETTAG *parent); PREDICATELOCKTARGETTAG *parent);
...@@ -1510,7 +1511,7 @@ GetSafeSnapshot(Snapshot origSnapshot) ...@@ -1510,7 +1511,7 @@ GetSafeSnapshot(Snapshot origSnapshot)
* one passed to it, but we avoid assuming that here. * one passed to it, but we avoid assuming that here.
*/ */
snapshot = GetSerializableTransactionSnapshotInt(origSnapshot, snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
InvalidTransactionId); NULL, InvalidPid);
if (MySerializableXact == InvalidSerializableXact) if (MySerializableXact == InvalidSerializableXact)
return snapshot; /* no concurrent r/w xacts; it's safe */ return snapshot; /* no concurrent r/w xacts; it's safe */
...@@ -1643,7 +1644,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot) ...@@ -1643,7 +1644,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
return GetSafeSnapshot(snapshot); return GetSafeSnapshot(snapshot);
return GetSerializableTransactionSnapshotInt(snapshot, return GetSerializableTransactionSnapshotInt(snapshot,
InvalidTransactionId); NULL, InvalidPid);
} }
/* /*
...@@ -1658,7 +1659,8 @@ GetSerializableTransactionSnapshot(Snapshot snapshot) ...@@ -1658,7 +1659,8 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
*/ */
void void
SetSerializableTransactionSnapshot(Snapshot snapshot, SetSerializableTransactionSnapshot(Snapshot snapshot,
TransactionId sourcexid) VirtualTransactionId *sourcevxid,
int sourcepid)
{ {
Assert(IsolationIsSerializable()); Assert(IsolationIsSerializable());
...@@ -1673,7 +1675,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, ...@@ -1673,7 +1675,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE"))); errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
(void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid); (void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid,
sourcepid);
} }
/* /*
...@@ -1687,7 +1690,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, ...@@ -1687,7 +1690,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
*/ */
static Snapshot static Snapshot
GetSerializableTransactionSnapshotInt(Snapshot snapshot, GetSerializableTransactionSnapshotInt(Snapshot snapshot,
TransactionId sourcexid) VirtualTransactionId *sourcevxid,
int sourcepid)
{ {
PGPROC *proc; PGPROC *proc;
VirtualTransactionId vxid; VirtualTransactionId vxid;
...@@ -1741,17 +1745,17 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, ...@@ -1741,17 +1745,17 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
} while (!sxact); } while (!sxact);
/* Get the snapshot, or check that it's safe to use */ /* Get the snapshot, or check that it's safe to use */
if (!TransactionIdIsValid(sourcexid)) if (!sourcevxid)
snapshot = GetSnapshotData(snapshot); snapshot = GetSnapshotData(snapshot);
else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcexid)) else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcevxid))
{ {
ReleasePredXact(sxact); ReleasePredXact(sxact);
LWLockRelease(SerializableXactHashLock); LWLockRelease(SerializableXactHashLock);
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not import the requested snapshot"), errmsg("could not import the requested snapshot"),
errdetail("The source transaction %u is not running anymore.", errdetail("The source process with pid %d is not running anymore.",
sourcexid))); sourcepid)));
} }
/* /*
......
...@@ -58,6 +58,7 @@ ...@@ -58,6 +58,7 @@
#include "storage/proc.h" #include "storage/proc.h"
#include "storage/procarray.h" #include "storage/procarray.h"
#include "storage/sinval.h" #include "storage/sinval.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h" #include "storage/spin.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/memutils.h" #include "utils/memutils.h"
...@@ -211,11 +212,15 @@ static Snapshot FirstXactSnapshot = NULL; ...@@ -211,11 +212,15 @@ static Snapshot FirstXactSnapshot = NULL;
/* Define pathname of exported-snapshot files */ /* Define pathname of exported-snapshot files */
#define SNAPSHOT_EXPORT_DIR "pg_snapshots" #define SNAPSHOT_EXPORT_DIR "pg_snapshots"
#define XactExportFilePath(path, xid, num, suffix) \
snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%d%s", \
xid, num, suffix)
/* Current xact's exported snapshots (a list of Snapshot structs) */ /* Structure holding info about exported snapshot. */
typedef struct ExportedSnapshot
{
char *snapfile;
Snapshot snapshot;
} ExportedSnapshot;
/* Current xact's exported snapshots (a list of ExportedSnapshot structs) */
static List *exportedSnapshots = NIL; static List *exportedSnapshots = NIL;
/* Prototypes for local functions */ /* Prototypes for local functions */
...@@ -558,8 +563,8 @@ SnapshotSetCommandId(CommandId curcid) ...@@ -558,8 +563,8 @@ SnapshotSetCommandId(CommandId curcid)
* in GetTransactionSnapshot. * in GetTransactionSnapshot.
*/ */
static void static void
SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid, SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
PGPROC *sourceproc) int sourcepid, PGPROC *sourceproc)
{ {
/* Caller should have checked this already */ /* Caller should have checked this already */
Assert(!FirstSnapshotSet); Assert(!FirstSnapshotSet);
...@@ -617,12 +622,12 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid, ...@@ -617,12 +622,12 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
errmsg("could not import the requested snapshot"), errmsg("could not import the requested snapshot"),
errdetail("The source transaction is not running anymore."))); errdetail("The source transaction is not running anymore.")));
} }
else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid)) else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not import the requested snapshot"), errmsg("could not import the requested snapshot"),
errdetail("The source transaction %u is not running anymore.", errdetail("The source process with pid %d is not running anymore.",
sourcexid))); sourcepid)));
/* /*
* In transaction-snapshot mode, the first snapshot must live until end of * In transaction-snapshot mode, the first snapshot must live until end of
...@@ -632,7 +637,8 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid, ...@@ -632,7 +637,8 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
if (IsolationUsesXactSnapshot()) if (IsolationUsesXactSnapshot())
{ {
if (IsolationIsSerializable()) if (IsolationIsSerializable())
SetSerializableTransactionSnapshot(CurrentSnapshot, sourcexid); SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid,
sourcepid);
/* Make a saved copy */ /* Make a saved copy */
CurrentSnapshot = CopySnapshot(CurrentSnapshot); CurrentSnapshot = CopySnapshot(CurrentSnapshot);
FirstXactSnapshot = CurrentSnapshot; FirstXactSnapshot = CurrentSnapshot;
...@@ -1075,33 +1081,29 @@ AtEOXact_Snapshot(bool isCommit, bool resetXmin) ...@@ -1075,33 +1081,29 @@ AtEOXact_Snapshot(bool isCommit, bool resetXmin)
*/ */
if (exportedSnapshots != NIL) if (exportedSnapshots != NIL)
{ {
TransactionId myxid = GetTopTransactionId();
int i;
char buf[MAXPGPATH];
ListCell *lc; ListCell *lc;
/* /*
* Get rid of the files. Unlink failure is only a WARNING because (1) * Get rid of the files. Unlink failure is only a WARNING because (1)
* it's too late to abort the transaction, and (2) leaving a leaked * it's too late to abort the transaction, and (2) leaving a leaked
* file around has little real consequence anyway. * file around has little real consequence anyway.
*/ *
for (i = 1; i <= list_length(exportedSnapshots); i++) * We also also need to remove the snapshots from RegisteredSnapshots
{ * to prevent a warning below.
XactExportFilePath(buf, myxid, i, ""); *
if (unlink(buf)) * As with the FirstXactSnapshot, we don't need to free resources of
elog(WARNING, "could not unlink file \"%s\": %m", buf); * the snapshot iself as it will go away with the memory context.
}
/*
* As with the FirstXactSnapshot, we needn't spend any effort on
* cleaning up the per-snapshot data structures, but we do need to
* remove them from RegisteredSnapshots to prevent a warning below.
*/ */
foreach(lc, exportedSnapshots) foreach(lc, exportedSnapshots)
{ {
Snapshot snap = (Snapshot) lfirst(lc); ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc);
if (unlink(esnap->snapfile))
elog(WARNING, "could not unlink file \"%s\": %m",
esnap->snapfile);
pairingheap_remove(&RegisteredSnapshots, &snap->ph_node); pairingheap_remove(&RegisteredSnapshots,
&esnap->snapshot->ph_node);
} }
exportedSnapshots = NIL; exportedSnapshots = NIL;
...@@ -1159,6 +1161,7 @@ ExportSnapshot(Snapshot snapshot) ...@@ -1159,6 +1161,7 @@ ExportSnapshot(Snapshot snapshot)
{ {
TransactionId topXid; TransactionId topXid;
TransactionId *children; TransactionId *children;
ExportedSnapshot *esnap;
int nchildren; int nchildren;
int addTopXid; int addTopXid;
StringInfoData buf; StringInfoData buf;
...@@ -1183,9 +1186,9 @@ ExportSnapshot(Snapshot snapshot) ...@@ -1183,9 +1186,9 @@ ExportSnapshot(Snapshot snapshot)
*/ */
/* /*
* This will assign a transaction ID if we do not yet have one. * Get our transaction ID if there is one, to include in the snapshot.
*/ */
topXid = GetTopTransactionId(); topXid = GetTopTransactionIdIfAny();
/* /*
* We cannot export a snapshot from a subtransaction because there's no * We cannot export a snapshot from a subtransaction because there's no
...@@ -1204,6 +1207,13 @@ ExportSnapshot(Snapshot snapshot) ...@@ -1204,6 +1207,13 @@ ExportSnapshot(Snapshot snapshot)
*/ */
nchildren = xactGetCommittedChildren(&children); nchildren = xactGetCommittedChildren(&children);
/*
* Generate file path for the snapshot. We start numbering of snapshots
* inside the transaction from 1.
*/
snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d",
MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1);
/* /*
* Copy the snapshot into TopTransactionContext, add it to the * Copy the snapshot into TopTransactionContext, add it to the
* exportedSnapshots list, and mark it pseudo-registered. We do this to * exportedSnapshots list, and mark it pseudo-registered. We do this to
...@@ -1213,7 +1223,10 @@ ExportSnapshot(Snapshot snapshot) ...@@ -1213,7 +1223,10 @@ ExportSnapshot(Snapshot snapshot)
snapshot = CopySnapshot(snapshot); snapshot = CopySnapshot(snapshot);
oldcxt = MemoryContextSwitchTo(TopTransactionContext); oldcxt = MemoryContextSwitchTo(TopTransactionContext);
exportedSnapshots = lappend(exportedSnapshots, snapshot); esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
esnap->snapfile = pstrdup(path);
esnap->snapshot = snapshot;
exportedSnapshots = lappend(exportedSnapshots, esnap);
MemoryContextSwitchTo(oldcxt); MemoryContextSwitchTo(oldcxt);
snapshot->regd_count++; snapshot->regd_count++;
...@@ -1226,7 +1239,8 @@ ExportSnapshot(Snapshot snapshot) ...@@ -1226,7 +1239,8 @@ ExportSnapshot(Snapshot snapshot)
*/ */
initStringInfo(&buf); initStringInfo(&buf);
appendStringInfo(&buf, "xid:%u\n", topXid); appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid);
appendStringInfo(&buf, "pid:%d\n", MyProcPid);
appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId); appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
appendStringInfo(&buf, "iso:%d\n", XactIsoLevel); appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
appendStringInfo(&buf, "ro:%d\n", XactReadOnly); appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
...@@ -1245,7 +1259,8 @@ ExportSnapshot(Snapshot snapshot) ...@@ -1245,7 +1259,8 @@ ExportSnapshot(Snapshot snapshot)
* xmax. (We need not make the same check for subxip[] members, see * xmax. (We need not make the same check for subxip[] members, see
* snapshot.h.) * snapshot.h.)
*/ */
addTopXid = TransactionIdPrecedes(topXid, snapshot->xmax) ? 1 : 0; addTopXid = (TransactionIdIsValid(topXid) &&
TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid); appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
for (i = 0; i < snapshot->xcnt; i++) for (i = 0; i < snapshot->xcnt; i++)
appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]); appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
...@@ -1276,7 +1291,7 @@ ExportSnapshot(Snapshot snapshot) ...@@ -1276,7 +1291,7 @@ ExportSnapshot(Snapshot snapshot)
* ensures that no other backend can read an incomplete file * ensures that no other backend can read an incomplete file
* (ImportSnapshot won't allow it because of its valid-characters check). * (ImportSnapshot won't allow it because of its valid-characters check).
*/ */
XactExportFilePath(pathtmp, topXid, list_length(exportedSnapshots), ".tmp"); snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path);
if (!(f = AllocateFile(pathtmp, PG_BINARY_W))) if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
ereport(ERROR, ereport(ERROR,
(errcode_for_file_access(), (errcode_for_file_access(),
...@@ -1298,8 +1313,6 @@ ExportSnapshot(Snapshot snapshot) ...@@ -1298,8 +1313,6 @@ ExportSnapshot(Snapshot snapshot)
* Now that we have written everything into a .tmp file, rename the file * Now that we have written everything into a .tmp file, rename the file
* to remove the .tmp suffix. * to remove the .tmp suffix.
*/ */
XactExportFilePath(path, topXid, list_length(exportedSnapshots), "");
if (rename(pathtmp, path) < 0) if (rename(pathtmp, path) < 0)
ereport(ERROR, ereport(ERROR,
(errcode_for_file_access(), (errcode_for_file_access(),
...@@ -1384,6 +1397,30 @@ parseXidFromText(const char *prefix, char **s, const char *filename) ...@@ -1384,6 +1397,30 @@ parseXidFromText(const char *prefix, char **s, const char *filename)
return val; return val;
} }
static void
parseVxidFromText(const char *prefix, char **s, const char *filename,
VirtualTransactionId *vxid)
{
char *ptr = *s;
int prefixlen = strlen(prefix);
if (strncmp(ptr, prefix, prefixlen) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr += prefixlen;
if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr = strchr(ptr, '\n');
if (!ptr)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", filename)));
*s = ptr + 1;
}
/* /*
* ImportSnapshot * ImportSnapshot
* Import a previously exported snapshot. The argument should be a * Import a previously exported snapshot. The argument should be a
...@@ -1399,7 +1436,8 @@ ImportSnapshot(const char *idstr) ...@@ -1399,7 +1436,8 @@ ImportSnapshot(const char *idstr)
char *filebuf; char *filebuf;
int xcnt; int xcnt;
int i; int i;
TransactionId src_xid; VirtualTransactionId src_vxid;
int src_pid;
Oid src_dbid; Oid src_dbid;
int src_isolevel; int src_isolevel;
bool src_readonly; bool src_readonly;
...@@ -1463,7 +1501,8 @@ ImportSnapshot(const char *idstr) ...@@ -1463,7 +1501,8 @@ ImportSnapshot(const char *idstr)
*/ */
memset(&snapshot, 0, sizeof(snapshot)); memset(&snapshot, 0, sizeof(snapshot));
src_xid = parseXidFromText("xid:", &filebuf, path); parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
src_pid = parseIntFromText("pid:", &filebuf, path);
/* we abuse parseXidFromText a bit here ... */ /* we abuse parseXidFromText a bit here ... */
src_dbid = parseXidFromText("dbid:", &filebuf, path); src_dbid = parseXidFromText("dbid:", &filebuf, path);
src_isolevel = parseIntFromText("iso:", &filebuf, path); src_isolevel = parseIntFromText("iso:", &filebuf, path);
...@@ -1513,7 +1552,7 @@ ImportSnapshot(const char *idstr) ...@@ -1513,7 +1552,7 @@ ImportSnapshot(const char *idstr)
* don't trouble to check the array elements, just the most critical * don't trouble to check the array elements, just the most critical
* fields. * fields.
*/ */
if (!TransactionIdIsNormal(src_xid) || if (!VirtualTransactionIdIsValid(src_vxid) ||
!OidIsValid(src_dbid) || !OidIsValid(src_dbid) ||
!TransactionIdIsNormal(snapshot.xmin) || !TransactionIdIsNormal(snapshot.xmin) ||
!TransactionIdIsNormal(snapshot.xmax)) !TransactionIdIsNormal(snapshot.xmax))
...@@ -1554,7 +1593,7 @@ ImportSnapshot(const char *idstr) ...@@ -1554,7 +1593,7 @@ ImportSnapshot(const char *idstr)
errmsg("cannot import a snapshot from a different database"))); errmsg("cannot import a snapshot from a different database")));
/* OK, install the snapshot */ /* OK, install the snapshot */
SetTransactionSnapshot(&snapshot, src_xid, NULL); SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
} }
/* /*
...@@ -2141,5 +2180,5 @@ RestoreSnapshot(char *start_address) ...@@ -2141,5 +2180,5 @@ RestoreSnapshot(char *start_address)
void void
RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc) RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
{ {
SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc); SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc);
} }
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#ifndef PREDICATE_H #ifndef PREDICATE_H
#define PREDICATE_H #define PREDICATE_H
#include "storage/lock.h"
#include "utils/relcache.h" #include "utils/relcache.h"
#include "utils/snapshot.h" #include "utils/snapshot.h"
...@@ -46,7 +47,8 @@ extern bool PageIsPredicateLocked(Relation relation, BlockNumber blkno); ...@@ -46,7 +47,8 @@ extern bool PageIsPredicateLocked(Relation relation, BlockNumber blkno);
/* predicate lock maintenance */ /* predicate lock maintenance */
extern Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot); extern Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot);
extern void SetSerializableTransactionSnapshot(Snapshot snapshot, extern void SetSerializableTransactionSnapshot(Snapshot snapshot,
TransactionId sourcexid); VirtualTransactionId *sourcevxid,
int sourcepid);
extern void RegisterPredicateLockingXid(TransactionId xid); extern void RegisterPredicateLockingXid(TransactionId xid);
extern void PredicateLockRelation(Relation relation, Snapshot snapshot); extern void PredicateLockRelation(Relation relation, Snapshot snapshot);
extern void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot); extern void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot);
......
...@@ -82,7 +82,7 @@ extern int GetMaxSnapshotSubxidCount(void); ...@@ -82,7 +82,7 @@ extern int GetMaxSnapshotSubxidCount(void);
extern Snapshot GetSnapshotData(Snapshot snapshot); extern Snapshot GetSnapshotData(Snapshot snapshot);
extern bool ProcArrayInstallImportedXmin(TransactionId xmin, extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
TransactionId sourcexid); VirtualTransactionId *sourcevxid);
extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc); extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
extern RunningTransactions GetRunningTransactionData(void); extern RunningTransactions GetRunningTransactionData(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment