Commit 09e196e4 authored by Alvaro Herrera's avatar Alvaro Herrera

Use callbacks in SlruScanDirectory for the actual action

Previously, the code assumed that the only possible action to take was
to delete files behind a certain cutoff point.  The async notify code
was already a crock: it used a different "pagePrecedes" function for
truncation than for regular operation.  By allowing it to pass a
callback to SlruScanDirectory it can do cleanly exactly what it needs to
do.

The clog.c code also had its own use for SlruScanDirectory, which is
made a bit simpler with this.
parent 1a00c0ef
...@@ -606,7 +606,7 @@ TruncateCLOG(TransactionId oldestXact) ...@@ -606,7 +606,7 @@ TruncateCLOG(TransactionId oldestXact)
cutoffPage = TransactionIdToPage(oldestXact); cutoffPage = TransactionIdToPage(oldestXact);
/* Check to see if there's any files that could be removed */ /* Check to see if there's any files that could be removed */
if (!SlruScanDirectory(ClogCtl, cutoffPage, false)) if (!SlruScanDirectory(ClogCtl, SlruScanDirCbReportPresence, &cutoffPage))
return; /* nothing to remove */ return; /* nothing to remove */
/* Write XLOG record and flush XLOG to disk */ /* Write XLOG record and flush XLOG to disk */
......
...@@ -132,6 +132,8 @@ static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, ...@@ -132,6 +132,8 @@ static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid); static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid);
static int SlruSelectLRUPage(SlruCtl ctl, int pageno); static int SlruSelectLRUPage(SlruCtl ctl, int pageno);
static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
int segpage, void *data);
/* /*
* Initialization of shared memory * Initialization of shared memory
...@@ -1137,32 +1139,83 @@ restart:; ...@@ -1137,32 +1139,83 @@ restart:;
LWLockRelease(shared->ControlLock); LWLockRelease(shared->ControlLock);
/* Now we can remove the old segment(s) */ /* Now we can remove the old segment(s) */
(void) SlruScanDirectory(ctl, cutoffPage, true); (void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
} }
/* /*
* SimpleLruTruncate subroutine: scan directory for removable segments. * SlruScanDirectory callback
* Actually remove them iff doDeletions is true. Return TRUE iff any * This callback reports true if there's any segment prior to the one
* removable segments were found. Note: no locking is needed. * containing the page passed as "data".
*/
bool
SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
{
int cutoffPage = *(int *) data;
cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
if (ctl->PagePrecedes(segpage, cutoffPage))
return true; /* found one; don't iterate any more */
return false; /* keep going */
}
/*
* SlruScanDirectory callback.
* This callback deletes segments prior to the one passed in as "data".
*/
static bool
SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data)
{
char path[MAXPGPATH];
int cutoffPage = *(int *) data;
if (ctl->PagePrecedes(segpage, cutoffPage))
{
snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, filename);
ereport(DEBUG2,
(errmsg("removing file \"%s\"", path)));
unlink(path);
}
return false; /* keep going */
}
/*
* SlruScanDirectory callback.
* This callback deletes all segments.
*/
bool
SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
{
char path[MAXPGPATH];
snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, filename);
ereport(DEBUG2,
(errmsg("removing file \"%s\"", path)));
unlink(path);
return false; /* keep going */
}
/*
* Scan the SimpleLRU directory and apply a callback to each file found in it.
*
* If the callback returns true, the scan is stopped. The last return value
* from the callback is returned.
* *
* This can be called directly from clog.c, for reasons explained there. * Note that the ordering in which the directory is scanned is not guaranteed.
*
* Note that no locking is applied.
*/ */
bool bool
SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions) SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
{ {
bool found = false;
DIR *cldir; DIR *cldir;
struct dirent *clde; struct dirent *clde;
int segno; int segno;
int segpage; int segpage;
char path[MAXPGPATH]; bool retval;
/*
* The cutoff point is the start of the segment containing cutoffPage.
* (This is redundant when called from SimpleLruTruncate, but not when
* called directly from clog.c.)
*/
cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
cldir = AllocateDir(ctl->Dir); cldir = AllocateDir(ctl->Dir);
while ((clde = ReadDir(cldir, ctl->Dir)) != NULL) while ((clde = ReadDir(cldir, ctl->Dir)) != NULL)
...@@ -1172,20 +1225,15 @@ SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions) ...@@ -1172,20 +1225,15 @@ SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions)
{ {
segno = (int) strtol(clde->d_name, NULL, 16); segno = (int) strtol(clde->d_name, NULL, 16);
segpage = segno * SLRU_PAGES_PER_SEGMENT; segpage = segno * SLRU_PAGES_PER_SEGMENT;
if (ctl->PagePrecedes(segpage, cutoffPage))
{ elog(DEBUG2, "SlruScanDirectory invoking callback on %s/%s",
found = true; ctl->Dir, clde->d_name);
if (doDeletions) retval = callback(ctl, clde->d_name, segpage, data);
{ if (retval)
snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, clde->d_name); break;
ereport(DEBUG2,
(errmsg("removing file \"%s\"", path)));
unlink(path);
}
}
} }
} }
FreeDir(cldir); FreeDir(cldir);
return found; return retval;
} }
...@@ -194,7 +194,7 @@ typedef struct QueuePosition ...@@ -194,7 +194,7 @@ typedef struct QueuePosition
/* choose logically smaller QueuePosition */ /* choose logically smaller QueuePosition */
#define QUEUE_POS_MIN(x,y) \ #define QUEUE_POS_MIN(x,y) \
(asyncQueuePagePrecedesLogically((x).page, (y).page) ? (x) : \ (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \ (x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y)) (x).offset < (y).offset ? (x) : (y))
...@@ -360,8 +360,7 @@ static bool backendHasExecutedInitialListen = false; ...@@ -360,8 +360,7 @@ static bool backendHasExecutedInitialListen = false;
bool Trace_notify = false; bool Trace_notify = false;
/* local function prototypes */ /* local function prototypes */
static bool asyncQueuePagePrecedesPhysically(int p, int q); static bool asyncQueuePagePrecedes(int p, int q);
static bool asyncQueuePagePrecedesLogically(int p, int q);
static void queue_listen(ListenActionKind action, const char *channel); static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg); static void Async_UnlistenOnExit(int code, Datum arg);
static void Exec_ListenPreCommit(void); static void Exec_ListenPreCommit(void);
...@@ -388,25 +387,11 @@ static void NotifyMyFrontEnd(const char *channel, ...@@ -388,25 +387,11 @@ static void NotifyMyFrontEnd(const char *channel,
static bool AsyncExistsPendingNotify(const char *channel, const char *payload); static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
static void ClearPendingActionsAndNotifies(void); static void ClearPendingActionsAndNotifies(void);
/* /*
* We will work on the page range of 0..QUEUE_MAX_PAGE. * We will work on the page range of 0..QUEUE_MAX_PAGE.
*
* asyncQueuePagePrecedesPhysically just checks numerically without any magic
* if one page precedes another one. This is wrong for normal operation but
* is helpful when clearing pg_notify/ during startup.
*
* asyncQueuePagePrecedesLogically compares using wraparound logic, as is
* required by slru.c.
*/ */
static bool static bool
asyncQueuePagePrecedesPhysically(int p, int q) asyncQueuePagePrecedes(int p, int q)
{
return p < q;
}
static bool
asyncQueuePagePrecedesLogically(int p, int q)
{ {
int diff; int diff;
...@@ -484,7 +469,7 @@ AsyncShmemInit(void) ...@@ -484,7 +469,7 @@ AsyncShmemInit(void)
/* /*
* Set up SLRU management of the pg_notify data. * Set up SLRU management of the pg_notify data.
*/ */
AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically; AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0, SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0,
AsyncCtlLock, "pg_notify"); AsyncCtlLock, "pg_notify");
/* Override default assumption that writes should be fsync'd */ /* Override default assumption that writes should be fsync'd */
...@@ -494,15 +479,8 @@ AsyncShmemInit(void) ...@@ -494,15 +479,8 @@ AsyncShmemInit(void)
{ {
/* /*
* During start or reboot, clean out the pg_notify directory. * During start or reboot, clean out the pg_notify directory.
*
* Since we want to remove every file, we temporarily use
* asyncQueuePagePrecedesPhysically() and pass INT_MAX as the
* comparison value; every file in the directory should therefore
* appear to be less than that.
*/ */
AsyncCtl->PagePrecedes = asyncQueuePagePrecedesPhysically; (void) SlruScanDirectory(AsyncCtl, SlruScanDirCbDeleteAll, NULL);
(void) SlruScanDirectory(AsyncCtl, INT_MAX, true);
AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically;
/* Now initialize page zero to empty */ /* Now initialize page zero to empty */
LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
...@@ -1223,7 +1201,7 @@ asyncQueueIsFull(void) ...@@ -1223,7 +1201,7 @@ asyncQueueIsFull(void)
nexthead = 0; /* wrap around */ nexthead = 0; /* wrap around */
boundary = QUEUE_POS_PAGE(QUEUE_TAIL); boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
boundary -= boundary % SLRU_PAGES_PER_SEGMENT; boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
return asyncQueuePagePrecedesLogically(nexthead, boundary); return asyncQueuePagePrecedes(nexthead, boundary);
} }
/* /*
...@@ -2074,7 +2052,7 @@ asyncQueueAdvanceTail(void) ...@@ -2074,7 +2052,7 @@ asyncQueueAdvanceTail(void)
*/ */
newtailpage = QUEUE_POS_PAGE(min); newtailpage = QUEUE_POS_PAGE(min);
boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT); boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
if (asyncQueuePagePrecedesLogically(oldtailpage, boundary)) if (asyncQueuePagePrecedes(oldtailpage, boundary))
{ {
/* /*
* SimpleLruTruncate() will ask for AsyncCtlLock but will also release * SimpleLruTruncate() will ask for AsyncCtlLock but will also release
......
...@@ -145,6 +145,15 @@ extern int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, ...@@ -145,6 +145,15 @@ extern int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno,
extern void SimpleLruWritePage(SlruCtl ctl, int slotno); extern void SimpleLruWritePage(SlruCtl ctl, int slotno);
extern void SimpleLruFlush(SlruCtl ctl, bool checkpoint); extern void SimpleLruFlush(SlruCtl ctl, bool checkpoint);
extern void SimpleLruTruncate(SlruCtl ctl, int cutoffPage); extern void SimpleLruTruncate(SlruCtl ctl, int cutoffPage);
extern bool SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions);
typedef bool (*SlruScanCallback) (SlruCtl ctl, char *filename, int segpage,
void *data);
extern bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data);
/* SlruScanDirectory public callbacks */
extern bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename,
int segpage, void *data);
extern bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage,
void *data);
#endif /* SLRU_H */ #endif /* SLRU_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment