Commit 44954fae authored by Philip Warner's avatar Philip Warner

Added long-standing transaction when restoring BLOBS (uses commit every BLOB_BATCH_SIZE)

Prevent dumping of languages from template1.
parent 0babf316
...@@ -23,6 +23,10 @@ ...@@ -23,6 +23,10 @@
* Modifications - 31-Jul-2000 - pjw@rhyme.com.au (1.46, 1.47) * Modifications - 31-Jul-2000 - pjw@rhyme.com.au (1.46, 1.47)
* Fixed version number initialization in _allocAH (pg_backup_archiver.c) * Fixed version number initialization in _allocAH (pg_backup_archiver.c)
* *
*
* Modifications - 30-Oct-2000 - pjw@rhyme.com.au
* Added {Start,End}RestoreBlobs to allow extended TX during BLOB restore.
*
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -590,6 +594,34 @@ int EndBlob(Archive* AHX, int oid) ...@@ -590,6 +594,34 @@ int EndBlob(Archive* AHX, int oid)
* BLOB Restoration * BLOB Restoration
**********/ **********/
/*
* Called by a format handler before any blobs are restored
*/
void StartRestoreBlobs(ArchiveHandle* AH)
{
AH->blobCount = 0;
}
/*
* Called by a format handler after all blobs are restored
*/
void EndRestoreBlobs(ArchiveHandle* AH)
{
if (AH->txActive)
{
ahlog(AH, 2, "Committing BLOB transactions\n");
CommitTransaction(AH);
}
if (AH->blobTxActive)
{
CommitTransactionXref(AH);
}
ahlog(AH, 1, "Restored %d BLOBs\n", AH->blobCount);
}
/* /*
* Called by a format handler to initiate restoration of a blob * Called by a format handler to initiate restoration of a blob
*/ */
...@@ -597,6 +629,8 @@ void StartRestoreBlob(ArchiveHandle* AH, int oid) ...@@ -597,6 +629,8 @@ void StartRestoreBlob(ArchiveHandle* AH, int oid)
{ {
int loOid; int loOid;
AH->blobCount++;
if (!AH->createdBlobXref) if (!AH->createdBlobXref)
{ {
if (!AH->connection) if (!AH->connection)
...@@ -606,7 +640,18 @@ void StartRestoreBlob(ArchiveHandle* AH, int oid) ...@@ -606,7 +640,18 @@ void StartRestoreBlob(ArchiveHandle* AH, int oid)
AH->createdBlobXref = 1; AH->createdBlobXref = 1;
} }
StartTransaction(AH); /*
* Start long-running TXs if necessary
*/
if (!AH->txActive)
{
ahlog(AH, 2, "Starting BLOB transactions\n");
StartTransaction(AH);
}
if (!AH->blobTxActive)
{
StartTransactionXref(AH);
}
loOid = lo_creat(AH->connection, INV_READ | INV_WRITE); loOid = lo_creat(AH->connection, INV_READ | INV_WRITE);
if (loOid == 0) if (loOid == 0)
...@@ -628,7 +673,15 @@ void EndRestoreBlob(ArchiveHandle* AH, int oid) ...@@ -628,7 +673,15 @@ void EndRestoreBlob(ArchiveHandle* AH, int oid)
lo_close(AH->connection, AH->loFd); lo_close(AH->connection, AH->loFd);
AH->writingBlob = 0; AH->writingBlob = 0;
CommitTransaction(AH); /*
* Commit every BLOB_BATCH_SIZE blobs...
*/
if ( ((AH->blobCount / BLOB_BATCH_SIZE) * BLOB_BATCH_SIZE) == AH->blobCount)
{
ahlog(AH, 2, "Committing BLOB transactions\n");
CommitTransaction(AH);
CommitTransactionXref(AH);
}
} }
/*********** /***********
......
...@@ -62,7 +62,7 @@ typedef z_stream *z_streamp; ...@@ -62,7 +62,7 @@ typedef z_stream *z_streamp;
#define K_VERS_MAJOR 1 #define K_VERS_MAJOR 1
#define K_VERS_MINOR 4 #define K_VERS_MINOR 4
#define K_VERS_REV 21 #define K_VERS_REV 22
/* Data block types */ /* Data block types */
#define BLK_DATA 1 #define BLK_DATA 1
...@@ -76,6 +76,9 @@ typedef z_stream *z_streamp; ...@@ -76,6 +76,9 @@ typedef z_stream *z_streamp;
#define K_VERS_1_4 (( (1 * 256 + 4) * 256 + 0) * 256 + 0) /* Date & name in header */ #define K_VERS_1_4 (( (1 * 256 + 4) * 256 + 0) * 256 + 0) /* Date & name in header */
#define K_VERS_MAX (( (1 * 256 + 4) * 256 + 255) * 256 + 0) #define K_VERS_MAX (( (1 * 256 + 4) * 256 + 255) * 256 + 0)
/* No of BLOBs to restore in 1 TX */
#define BLOB_BATCH_SIZE 100
struct _archiveHandle; struct _archiveHandle;
struct _tocEntry; struct _tocEntry;
struct _restoreList; struct _restoreList;
...@@ -186,6 +189,8 @@ typedef struct _archiveHandle { ...@@ -186,6 +189,8 @@ typedef struct _archiveHandle {
char *pgport; char *pgport;
PGconn *connection; PGconn *connection;
PGconn *blobConnection; /* Connection for BLOB xref */ PGconn *blobConnection; /* Connection for BLOB xref */
int txActive; /* Flag set if TX active on connection */
int blobTxActive; /* Flag set if TX active on blobConnection */
int connectToDB; /* Flag to indicate if direct DB connection is required */ int connectToDB; /* Flag to indicate if direct DB connection is required */
int pgCopyIn; /* Currently in libpq 'COPY IN' mode. */ int pgCopyIn; /* Currently in libpq 'COPY IN' mode. */
PQExpBuffer pgCopyBuf; /* Left-over data from incomplete lines in COPY IN */ PQExpBuffer pgCopyBuf; /* Left-over data from incomplete lines in COPY IN */
...@@ -193,6 +198,7 @@ typedef struct _archiveHandle { ...@@ -193,6 +198,7 @@ typedef struct _archiveHandle {
int loFd; /* BLOB fd */ int loFd; /* BLOB fd */
int writingBlob; /* Flag */ int writingBlob; /* Flag */
int createdBlobXref; /* Flag */ int createdBlobXref; /* Flag */
int blobCount; /* # of blobs restored */
int lastID; /* Last internal ID for a TOC entry */ int lastID; /* Last internal ID for a TOC entry */
char* fSpec; /* Archive File Spec */ char* fSpec; /* Archive File Spec */
...@@ -256,8 +262,10 @@ extern int ReadInt(ArchiveHandle* AH); ...@@ -256,8 +262,10 @@ extern int ReadInt(ArchiveHandle* AH);
extern char* ReadStr(ArchiveHandle* AH); extern char* ReadStr(ArchiveHandle* AH);
extern int WriteStr(ArchiveHandle* AH, char* s); extern int WriteStr(ArchiveHandle* AH, char* s);
extern void StartRestoreBlobs(ArchiveHandle* AH);
extern void StartRestoreBlob(ArchiveHandle* AH, int oid); extern void StartRestoreBlob(ArchiveHandle* AH, int oid);
extern void EndRestoreBlob(ArchiveHandle* AH, int oid); extern void EndRestoreBlob(ArchiveHandle* AH, int oid);
extern void EndRestoreBlobs(ArchiveHandle* AH);
extern void InitArchiveFmt_Custom(ArchiveHandle* AH); extern void InitArchiveFmt_Custom(ArchiveHandle* AH);
extern void InitArchiveFmt_Files(ArchiveHandle* AH); extern void InitArchiveFmt_Files(ArchiveHandle* AH);
......
...@@ -585,6 +585,8 @@ static void _LoadBlobs(ArchiveHandle* AH) ...@@ -585,6 +585,8 @@ static void _LoadBlobs(ArchiveHandle* AH)
{ {
int oid; int oid;
StartRestoreBlobs(AH);
oid = ReadInt(AH); oid = ReadInt(AH);
while(oid != 0) while(oid != 0)
{ {
...@@ -593,6 +595,9 @@ static void _LoadBlobs(ArchiveHandle* AH) ...@@ -593,6 +595,9 @@ static void _LoadBlobs(ArchiveHandle* AH)
EndRestoreBlob(AH, oid); EndRestoreBlob(AH, oid);
oid = ReadInt(AH); oid = ReadInt(AH);
} }
EndRestoreBlobs(AH);
} }
/* /*
...@@ -608,8 +613,8 @@ static void _skipBlobs(ArchiveHandle* AH) ...@@ -608,8 +613,8 @@ static void _skipBlobs(ArchiveHandle* AH)
oid = ReadInt(AH); oid = ReadInt(AH);
while(oid != 0) while(oid != 0)
{ {
_skipData(AH); _skipData(AH);
oid = ReadInt(AH); oid = ReadInt(AH);
} }
} }
......
...@@ -675,6 +675,17 @@ void StartTransaction(ArchiveHandle* AH) ...@@ -675,6 +675,17 @@ void StartTransaction(ArchiveHandle* AH)
appendPQExpBuffer(qry, "Begin;"); appendPQExpBuffer(qry, "Begin;");
ExecuteSqlCommand(AH, qry, "can not start database transaction"); ExecuteSqlCommand(AH, qry, "can not start database transaction");
AH->txActive = true;
}
void StartTransactionXref(ArchiveHandle* AH)
{
PQExpBuffer qry = createPQExpBuffer();
appendPQExpBuffer(qry, "Begin;");
_executeSqlCommand(AH, AH->blobConnection, qry, "can not start BLOB xref transaction");
AH->blobTxActive = true;
} }
void CommitTransaction(ArchiveHandle* AH) void CommitTransaction(ArchiveHandle* AH)
...@@ -684,6 +695,15 @@ void CommitTransaction(ArchiveHandle* AH) ...@@ -684,6 +695,15 @@ void CommitTransaction(ArchiveHandle* AH)
appendPQExpBuffer(qry, "Commit;"); appendPQExpBuffer(qry, "Commit;");
ExecuteSqlCommand(AH, qry, "can not commit database transaction"); ExecuteSqlCommand(AH, qry, "can not commit database transaction");
AH->txActive = false;
} }
void CommitTransactionXref(ArchiveHandle* AH)
{
PQExpBuffer qry = createPQExpBuffer();
appendPQExpBuffer(qry, "Commit;");
_executeSqlCommand(AH, AH->blobConnection, qry, "can not commit BLOB xref transaction");
AH->blobTxActive = false;
}
...@@ -12,5 +12,7 @@ extern int ExecuteSqlCommandBuf(ArchiveHandle* AH, void *qry, int bufLen); ...@@ -12,5 +12,7 @@ extern int ExecuteSqlCommandBuf(ArchiveHandle* AH, void *qry, int bufLen);
extern void CreateBlobXrefTable(ArchiveHandle* AH); extern void CreateBlobXrefTable(ArchiveHandle* AH);
extern void InsertBlobXref(ArchiveHandle* AH, int old, int new); extern void InsertBlobXref(ArchiveHandle* AH, int old, int new);
extern void StartTransaction(ArchiveHandle* AH); extern void StartTransaction(ArchiveHandle* AH);
extern void StartTransactionXref(ArchiveHandle* AH);
extern void CommitTransaction(ArchiveHandle* AH); extern void CommitTransaction(ArchiveHandle* AH);
extern void CommitTransactionXref(ArchiveHandle* AH);
...@@ -318,6 +318,8 @@ static void _LoadBlobs(ArchiveHandle* AH, RestoreOptions *ropt) ...@@ -318,6 +318,8 @@ static void _LoadBlobs(ArchiveHandle* AH, RestoreOptions *ropt)
lclContext* ctx = (lclContext*)AH->formatData; lclContext* ctx = (lclContext*)AH->formatData;
char fname[K_STD_BUF_SIZE]; char fname[K_STD_BUF_SIZE];
StartRestoreBlobs(AH);
ctx->blobToc = fopen("blobs.toc", PG_BINARY_R); ctx->blobToc = fopen("blobs.toc", PG_BINARY_R);
_getBlobTocEntry(AH, &oid, fname); _getBlobTocEntry(AH, &oid, fname);
...@@ -331,6 +333,8 @@ static void _LoadBlobs(ArchiveHandle* AH, RestoreOptions *ropt) ...@@ -331,6 +333,8 @@ static void _LoadBlobs(ArchiveHandle* AH, RestoreOptions *ropt)
} }
fclose(ctx->blobToc); fclose(ctx->blobToc);
EndRestoreBlobs(AH);
} }
......
...@@ -627,6 +627,8 @@ static void _LoadBlobs(ArchiveHandle* AH, RestoreOptions *ropt) ...@@ -627,6 +627,8 @@ static void _LoadBlobs(ArchiveHandle* AH, RestoreOptions *ropt)
int cnt; int cnt;
char buf[4096]; char buf[4096];
StartRestoreBlobs(AH);
th = tarOpen(AH, NULL, 'r'); /* Open next file */ th = tarOpen(AH, NULL, 'r'); /* Open next file */
while (th != NULL) while (th != NULL)
{ {
...@@ -652,21 +654,8 @@ static void _LoadBlobs(ArchiveHandle* AH, RestoreOptions *ropt) ...@@ -652,21 +654,8 @@ static void _LoadBlobs(ArchiveHandle* AH, RestoreOptions *ropt)
th = tarOpen(AH, NULL, 'r'); th = tarOpen(AH, NULL, 'r');
} }
/* EndRestoreBlobs(AH);
* ctx->blobToc = tarOpen(AH, "blobs.toc", 'r');
*
* _getBlobTocEntry(AH, &oid, fname);
*
* while(oid != 0)
* {
* StartRestoreBlob(AH, oid);
* _PrintFileData(AH, fname, ropt);
* EndRestoreBlob(AH, oid);
* _getBlobTocEntry(AH, &oid, fname);
* }
*
* tarClose(AH, ctx->blobToc);
*/
} }
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.176 2000/10/24 13:24:30 pjw Exp $ * $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.177 2000/10/31 14:20:30 pjw Exp $
* *
* Modifications - 6/10/96 - dave@bensoft.com - version 1.13.dhb * Modifications - 6/10/96 - dave@bensoft.com - version 1.13.dhb
* *
...@@ -2872,6 +2872,7 @@ dumpProcLangs(Archive *fout, FuncInfo *finfo, int numFuncs, ...@@ -2872,6 +2872,7 @@ dumpProcLangs(Archive *fout, FuncInfo *finfo, int numFuncs,
int i_lanpltrusted; int i_lanpltrusted;
int i_lanplcallfoid; int i_lanplcallfoid;
int i_lancompiler; int i_lancompiler;
Oid lanoid;
char *lanname; char *lanname;
char *lancompiler; char *lancompiler;
const char *lanplcallfoid; const char *lanplcallfoid;
...@@ -2898,7 +2899,13 @@ dumpProcLangs(Archive *fout, FuncInfo *finfo, int numFuncs, ...@@ -2898,7 +2899,13 @@ dumpProcLangs(Archive *fout, FuncInfo *finfo, int numFuncs,
for (i = 0; i < ntups; i++) for (i = 0; i < ntups; i++)
{ {
lanoid = atoi(PQgetvalue(res, i, i_oid));
if (lanoid <= g_last_builtin_oid)
continue;
lanplcallfoid = PQgetvalue(res, i, i_lanplcallfoid); lanplcallfoid = PQgetvalue(res, i, i_lanplcallfoid);
for (fidx = 0; fidx < numFuncs; fidx++) for (fidx = 0; fidx < numFuncs; fidx++)
{ {
if (!strcmp(finfo[fidx].oid, lanplcallfoid)) if (!strcmp(finfo[fidx].oid, lanplcallfoid))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment