Commit 0851e122 authored by Tom Lane's avatar Tom Lane

Reorganize clog's error reporting so that PANIC on clog I/O error can

be reduced to a plain ERROR.  Should make it at least a little less
painful to deal with data-corruption problems.
parent 6bfa2df6
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Header: /cvsroot/pgsql/src/backend/access/transam/clog.c,v 1.11 2002/09/26 22:58:33 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/access/transam/clog.c,v 1.12 2003/04/14 17:31:33 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -123,7 +123,7 @@ typedef enum ...@@ -123,7 +123,7 @@ typedef enum
CLOG_PAGE_READ_IN_PROGRESS, /* CLOG page is being read in */ CLOG_PAGE_READ_IN_PROGRESS, /* CLOG page is being read in */
CLOG_PAGE_CLEAN, /* CLOG page is valid and not dirty */ CLOG_PAGE_CLEAN, /* CLOG page is valid and not dirty */
CLOG_PAGE_DIRTY, /* CLOG page is valid but needs write */ CLOG_PAGE_DIRTY, /* CLOG page is valid but needs write */
CLOG_PAGE_WRITE_IN_PROGRESS /* CLOG page is being written out in */ CLOG_PAGE_WRITE_IN_PROGRESS /* CLOG page is being written out */
} ClogPageStatus; } ClogPageStatus;
/* /*
...@@ -180,12 +180,25 @@ static char ClogDir[MAXPGPATH]; ...@@ -180,12 +180,25 @@ static char ClogDir[MAXPGPATH];
ClogCtl->page_lru_count[slotno] = 0; \ ClogCtl->page_lru_count[slotno] = 0; \
} while (0) } while (0)
/* Saved info for CLOGReportIOError */
typedef enum
{
CLOG_OPEN_FAILED,
CLOG_CREATE_FAILED,
CLOG_SEEK_FAILED,
CLOG_READ_FAILED,
CLOG_WRITE_FAILED
} ClogErrorCause;
static ClogErrorCause clog_errcause;
static int clog_errno;
static int ZeroCLOGPage(int pageno, bool writeXlog); static int ZeroCLOGPage(int pageno, bool writeXlog);
static int ReadCLOGPage(int pageno); static int ReadCLOGPage(int pageno, TransactionId xid);
static void WriteCLOGPage(int slotno); static void WriteCLOGPage(int slotno);
static void CLOGPhysicalReadPage(int pageno, int slotno); static bool CLOGPhysicalReadPage(int pageno, int slotno);
static void CLOGPhysicalWritePage(int pageno, int slotno); static bool CLOGPhysicalWritePage(int pageno, int slotno);
static void CLOGReportIOError(int pageno, TransactionId xid);
static int SelectLRUCLOGPage(int pageno); static int SelectLRUCLOGPage(int pageno);
static bool ScanCLOGDirectory(int cutoffPage, bool doDeletions); static bool ScanCLOGDirectory(int cutoffPage, bool doDeletions);
static bool CLOGPagePrecedes(int page1, int page2); static bool CLOGPagePrecedes(int page1, int page2);
...@@ -212,7 +225,7 @@ TransactionIdSetStatus(TransactionId xid, XidStatus status) ...@@ -212,7 +225,7 @@ TransactionIdSetStatus(TransactionId xid, XidStatus status)
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
slotno = ReadCLOGPage(pageno); slotno = ReadCLOGPage(pageno, xid);
byteptr = ClogCtl->page_buffer[slotno] + byteno; byteptr = ClogCtl->page_buffer[slotno] + byteno;
/* Current state should be 0 or target state */ /* Current state should be 0 or target state */
...@@ -244,7 +257,7 @@ TransactionIdGetStatus(TransactionId xid) ...@@ -244,7 +257,7 @@ TransactionIdGetStatus(TransactionId xid)
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
slotno = ReadCLOGPage(pageno); slotno = ReadCLOGPage(pageno, xid);
byteptr = ClogCtl->page_buffer[slotno] + byteno; byteptr = ClogCtl->page_buffer[slotno] + byteno;
status = (*byteptr >> bshift) & CLOG_XACT_BITMASK; status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
...@@ -362,18 +375,22 @@ ZeroCLOGPage(int pageno, bool writeXlog) ...@@ -362,18 +375,22 @@ ZeroCLOGPage(int pageno, bool writeXlog)
* Find a CLOG page in a shared buffer, reading it in if necessary. * Find a CLOG page in a shared buffer, reading it in if necessary.
* The page number must correspond to an already-initialized page. * The page number must correspond to an already-initialized page.
* *
* The passed-in xid is used only for error reporting, and may be
* InvalidTransactionId if no specific xid is associated with the action.
*
* Return value is the shared-buffer slot number now holding the page. * Return value is the shared-buffer slot number now holding the page.
* The buffer's LRU access info is updated. * The buffer's LRU access info is updated.
* *
* Control lock must be held at entry, and will be held at exit. * Control lock must be held at entry, and will be held at exit.
*/ */
static int static int
ReadCLOGPage(int pageno) ReadCLOGPage(int pageno, TransactionId xid)
{ {
/* Outer loop handles restart if we lose the buffer to someone else */ /* Outer loop handles restart if we lose the buffer to someone else */
for (;;) for (;;)
{ {
int slotno; int slotno;
bool ok;
/* See if page already is in memory; if not, pick victim slot */ /* See if page already is in memory; if not, pick victim slot */
slotno = SelectLRUCLOGPage(pageno); slotno = SelectLRUCLOGPage(pageno);
...@@ -424,7 +441,7 @@ ReadCLOGPage(int pageno) ...@@ -424,7 +441,7 @@ ReadCLOGPage(int pageno)
} }
/* Okay, do the read */ /* Okay, do the read */
CLOGPhysicalReadPage(pageno, slotno); ok = CLOGPhysicalReadPage(pageno, slotno);
/* Re-acquire shared control lock and update page state */ /* Re-acquire shared control lock and update page state */
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
...@@ -432,10 +449,14 @@ ReadCLOGPage(int pageno) ...@@ -432,10 +449,14 @@ ReadCLOGPage(int pageno)
Assert(ClogCtl->page_number[slotno] == pageno && Assert(ClogCtl->page_number[slotno] == pageno &&
ClogCtl->page_status[slotno] == CLOG_PAGE_READ_IN_PROGRESS); ClogCtl->page_status[slotno] == CLOG_PAGE_READ_IN_PROGRESS);
ClogCtl->page_status[slotno] = CLOG_PAGE_CLEAN; ClogCtl->page_status[slotno] = ok ? CLOG_PAGE_CLEAN : CLOG_PAGE_EMPTY;
LWLockRelease(ClogBufferLocks[slotno]); LWLockRelease(ClogBufferLocks[slotno]);
/* Now it's okay to elog if we failed */
if (!ok)
CLOGReportIOError(pageno, xid);
ClogRecentlyUsed(slotno); ClogRecentlyUsed(slotno);
return slotno; return slotno;
} }
...@@ -456,6 +477,7 @@ static void ...@@ -456,6 +477,7 @@ static void
WriteCLOGPage(int slotno) WriteCLOGPage(int slotno)
{ {
int pageno; int pageno;
bool ok;
/* Do nothing if page does not need writing */ /* Do nothing if page does not need writing */
if (ClogCtl->page_status[slotno] != CLOG_PAGE_DIRTY && if (ClogCtl->page_status[slotno] != CLOG_PAGE_DIRTY &&
...@@ -499,7 +521,7 @@ WriteCLOGPage(int slotno) ...@@ -499,7 +521,7 @@ WriteCLOGPage(int slotno)
ClogCtl->page_status[slotno] = CLOG_PAGE_WRITE_IN_PROGRESS; ClogCtl->page_status[slotno] = CLOG_PAGE_WRITE_IN_PROGRESS;
/* Okay, do the write */ /* Okay, do the write */
CLOGPhysicalWritePage(pageno, slotno); ok = CLOGPhysicalWritePage(pageno, slotno);
/* Re-acquire shared control lock and update page state */ /* Re-acquire shared control lock and update page state */
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
...@@ -510,18 +532,26 @@ WriteCLOGPage(int slotno) ...@@ -510,18 +532,26 @@ WriteCLOGPage(int slotno)
/* Cannot set CLEAN if someone re-dirtied page since write started */ /* Cannot set CLEAN if someone re-dirtied page since write started */
if (ClogCtl->page_status[slotno] == CLOG_PAGE_WRITE_IN_PROGRESS) if (ClogCtl->page_status[slotno] == CLOG_PAGE_WRITE_IN_PROGRESS)
ClogCtl->page_status[slotno] = CLOG_PAGE_CLEAN; ClogCtl->page_status[slotno] = ok ? CLOG_PAGE_CLEAN : CLOG_PAGE_DIRTY;
LWLockRelease(ClogBufferLocks[slotno]); LWLockRelease(ClogBufferLocks[slotno]);
/* Now it's okay to elog if we failed */
if (!ok)
CLOGReportIOError(pageno, InvalidTransactionId);
} }
/* /*
* Physical read of a (previously existing) page into a buffer slot * Physical read of a (previously existing) page into a buffer slot
* *
* On failure, we cannot just elog(ERROR) since caller has put state in
* shared memory that must be undone. So, we return FALSE and save enough
* info in static variables to let CLOGReportIOError make the report.
*
* For now, assume it's not worth keeping a file pointer open across * For now, assume it's not worth keeping a file pointer open across
* read/write operations. We could cache one virtual file pointer ... * read/write operations. We could cache one virtual file pointer ...
*/ */
static void static bool
CLOGPhysicalReadPage(int pageno, int slotno) CLOGPhysicalReadPage(int pageno, int slotno)
{ {
int segno = pageno / CLOG_PAGES_PER_SEGMENT; int segno = pageno / CLOG_PAGES_PER_SEGMENT;
...@@ -543,31 +573,47 @@ CLOGPhysicalReadPage(int pageno, int slotno) ...@@ -543,31 +573,47 @@ CLOGPhysicalReadPage(int pageno, int slotno)
if (fd < 0) if (fd < 0)
{ {
if (errno != ENOENT || !InRecovery) if (errno != ENOENT || !InRecovery)
elog(PANIC, "open of %s failed: %m", path); {
clog_errcause = CLOG_OPEN_FAILED;
clog_errno = errno;
return false;
}
elog(LOG, "clog file %s doesn't exist, reading as zeroes", path); elog(LOG, "clog file %s doesn't exist, reading as zeroes", path);
MemSet(ClogCtl->page_buffer[slotno], 0, CLOG_BLCKSZ); MemSet(ClogCtl->page_buffer[slotno], 0, CLOG_BLCKSZ);
return; return true;
} }
if (lseek(fd, (off_t) offset, SEEK_SET) < 0) if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
elog(PANIC, "lseek of clog file %u, offset %u failed: %m", {
segno, offset); clog_errcause = CLOG_SEEK_FAILED;
clog_errno = errno;
return false;
}
errno = 0; errno = 0;
if (read(fd, ClogCtl->page_buffer[slotno], CLOG_BLCKSZ) != CLOG_BLCKSZ) if (read(fd, ClogCtl->page_buffer[slotno], CLOG_BLCKSZ) != CLOG_BLCKSZ)
elog(PANIC, "read of clog file %u, offset %u failed: %m", {
segno, offset); clog_errcause = CLOG_READ_FAILED;
clog_errno = errno;
return false;
}
close(fd); close(fd);
return true;
} }
/* /*
* Physical write of a page from a buffer slot * Physical write of a page from a buffer slot
* *
* On failure, we cannot just elog(ERROR) since caller has put state in
* shared memory that must be undone. So, we return FALSE and save enough
* info in static variables to let CLOGReportIOError make the report.
*
* For now, assume it's not worth keeping a file pointer open across * For now, assume it's not worth keeping a file pointer open across
* read/write operations. We could cache one virtual file pointer ... * read/write operations. We could cache one virtual file pointer ...
*/ */
static void static bool
CLOGPhysicalWritePage(int pageno, int slotno) CLOGPhysicalWritePage(int pageno, int slotno)
{ {
int segno = pageno / CLOG_PAGES_PER_SEGMENT; int segno = pageno / CLOG_PAGES_PER_SEGMENT;
...@@ -595,16 +641,28 @@ CLOGPhysicalWritePage(int pageno, int slotno) ...@@ -595,16 +641,28 @@ CLOGPhysicalWritePage(int pageno, int slotno)
if (fd < 0) if (fd < 0)
{ {
if (errno != ENOENT) if (errno != ENOENT)
elog(PANIC, "open of %s failed: %m", path); {
clog_errcause = CLOG_OPEN_FAILED;
clog_errno = errno;
return false;
}
fd = BasicOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, fd = BasicOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
S_IRUSR | S_IWUSR); S_IRUSR | S_IWUSR);
if (fd < 0) if (fd < 0)
elog(PANIC, "creation of file %s failed: %m", path); {
clog_errcause = CLOG_CREATE_FAILED;
clog_errno = errno;
return false;
}
} }
if (lseek(fd, (off_t) offset, SEEK_SET) < 0) if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
elog(PANIC, "lseek of clog file %u, offset %u failed: %m", {
segno, offset); clog_errcause = CLOG_SEEK_FAILED;
clog_errno = errno;
return false;
}
errno = 0; errno = 0;
if (write(fd, ClogCtl->page_buffer[slotno], CLOG_BLCKSZ) != CLOG_BLCKSZ) if (write(fd, ClogCtl->page_buffer[slotno], CLOG_BLCKSZ) != CLOG_BLCKSZ)
...@@ -612,11 +670,56 @@ CLOGPhysicalWritePage(int pageno, int slotno) ...@@ -612,11 +670,56 @@ CLOGPhysicalWritePage(int pageno, int slotno)
/* if write didn't set errno, assume problem is no disk space */ /* if write didn't set errno, assume problem is no disk space */
if (errno == 0) if (errno == 0)
errno = ENOSPC; errno = ENOSPC;
elog(PANIC, "write of clog file %u, offset %u failed: %m", clog_errcause = CLOG_WRITE_FAILED;
segno, offset); clog_errno = errno;
return false;
} }
close(fd); close(fd);
return true;
}
/*
* Issue the error message after failure of CLOGPhysicalReadPage or
* CLOGPhysicalWritePage. Call this after cleaning up shared-memory state.
*/
static void
CLOGReportIOError(int pageno, TransactionId xid)
{
int segno = pageno / CLOG_PAGES_PER_SEGMENT;
int rpageno = pageno % CLOG_PAGES_PER_SEGMENT;
int offset = rpageno * CLOG_BLCKSZ;
char path[MAXPGPATH];
/* XXX TODO: provide xid as context in error messages */
ClogFileName(path, segno);
errno = clog_errno;
switch (clog_errcause)
{
case CLOG_OPEN_FAILED:
elog(ERROR, "open of %s failed: %m", path);
break;
case CLOG_CREATE_FAILED:
elog(ERROR, "creation of file %s failed: %m", path);
break;
case CLOG_SEEK_FAILED:
elog(ERROR, "lseek of clog file %u, offset %u failed: %m",
segno, offset);
break;
case CLOG_READ_FAILED:
elog(ERROR, "read of clog file %u, offset %u failed: %m",
segno, offset);
break;
case CLOG_WRITE_FAILED:
elog(ERROR, "write of clog file %u, offset %u failed: %m",
segno, offset);
break;
default:
/* can't get here, we trust */
elog(ERROR, "unknown CLOG I/O error");
break;
}
} }
/* /*
...@@ -679,7 +782,8 @@ SelectLRUCLOGPage(int pageno) ...@@ -679,7 +782,8 @@ SelectLRUCLOGPage(int pageno)
* the read to complete. * the read to complete.
*/ */
if (ClogCtl->page_status[bestslot] == CLOG_PAGE_READ_IN_PROGRESS) if (ClogCtl->page_status[bestslot] == CLOG_PAGE_READ_IN_PROGRESS)
(void) ReadCLOGPage(ClogCtl->page_number[bestslot]); (void) ReadCLOGPage(ClogCtl->page_number[bestslot],
InvalidTransactionId);
else else
WriteCLOGPage(bestslot); WriteCLOGPage(bestslot);
...@@ -857,7 +961,8 @@ restart:; ...@@ -857,7 +961,8 @@ restart:;
* This is the same logic as in SelectLRUCLOGPage. * This is the same logic as in SelectLRUCLOGPage.
*/ */
if (ClogCtl->page_status[slotno] == CLOG_PAGE_READ_IN_PROGRESS) if (ClogCtl->page_status[slotno] == CLOG_PAGE_READ_IN_PROGRESS)
(void) ReadCLOGPage(ClogCtl->page_number[slotno]); (void) ReadCLOGPage(ClogCtl->page_number[slotno],
InvalidTransactionId);
else else
WriteCLOGPage(slotno); WriteCLOGPage(slotno);
goto restart; goto restart;
...@@ -886,7 +991,7 @@ ScanCLOGDirectory(int cutoffPage, bool doDeletions) ...@@ -886,7 +991,7 @@ ScanCLOGDirectory(int cutoffPage, bool doDeletions)
cldir = opendir(ClogDir); cldir = opendir(ClogDir);
if (cldir == NULL) if (cldir == NULL)
elog(PANIC, "could not open transaction-commit log directory (%s): %m", elog(ERROR, "could not open transaction-commit log directory (%s): %m",
ClogDir); ClogDir);
errno = 0; errno = 0;
...@@ -911,7 +1016,7 @@ ScanCLOGDirectory(int cutoffPage, bool doDeletions) ...@@ -911,7 +1016,7 @@ ScanCLOGDirectory(int cutoffPage, bool doDeletions)
errno = 0; errno = 0;
} }
if (errno) if (errno)
elog(PANIC, "could not read transaction-commit log directory (%s): %m", elog(ERROR, "could not read transaction-commit log directory (%s): %m",
ClogDir); ClogDir);
closedir(cldir); closedir(cldir);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment