Commit 967e276e authored by Robert Haas's avatar Robert Haas

Remove AtSubStart_Notify.

Allocate notify-related state lazily instead. This makes trivial
subtransactions noticeably faster.

Patch by me, reviewed and tested by Dilip Kumar, Kyotaro Horiguchi,
and Jeevan Ladhe.

Discussion: https://postgr.es/m/CA+TgmobE1J22S1eC-6N-je9LgrcwZypkwp+zH6JXo9mc=4Nk3A@mail.gmail.com
parent 6837632b
...@@ -4743,7 +4743,6 @@ StartSubTransaction(void) ...@@ -4743,7 +4743,6 @@ StartSubTransaction(void)
*/ */
AtSubStart_Memory(); AtSubStart_Memory();
AtSubStart_ResourceOwner(); AtSubStart_ResourceOwner();
AtSubStart_Notify();
AfterTriggerBeginSubXact(); AfterTriggerBeginSubXact();
s->state = TRANS_INPROGRESS; s->state = TRANS_INPROGRESS;
......
...@@ -344,9 +344,14 @@ typedef struct ...@@ -344,9 +344,14 @@ typedef struct
char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */ char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
} ListenAction; } ListenAction;
static List *pendingActions = NIL; /* list of ListenAction */ typedef struct ActionList
{
int nestingLevel; /* current transaction nesting depth */
List *actions; /* list of ListenAction structs */
struct ActionList *upper; /* details for upper transaction levels */
} ActionList;
static List *upperPendingActions = NIL; /* list of upper-xact lists */ static ActionList *pendingActions = NULL;
/* /*
* State for outbound notifies consists of a list of all channels+payloads * State for outbound notifies consists of a list of all channels+payloads
...@@ -385,8 +390,10 @@ typedef struct Notification ...@@ -385,8 +390,10 @@ typedef struct Notification
typedef struct NotificationList typedef struct NotificationList
{ {
int nestingLevel; /* current transaction nesting depth */
List *events; /* list of Notification structs */ List *events; /* list of Notification structs */
HTAB *hashtab; /* hash of NotificationHash structs, or NULL */ HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
struct NotificationList *upper; /* details for upper transaction levels */
} NotificationList; } NotificationList;
#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */ #define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
...@@ -396,9 +403,7 @@ typedef struct NotificationHash ...@@ -396,9 +403,7 @@ typedef struct NotificationHash
Notification *event; /* => the actual Notification struct */ Notification *event; /* => the actual Notification struct */
} NotificationHash; } NotificationHash;
static NotificationList *pendingNotifies = NULL; /* current list, if any */ static NotificationList *pendingNotifies = NULL;
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
/* /*
* Inbound notifications are initially processed by HandleNotifyInterrupt(), * Inbound notifications are initially processed by HandleNotifyInterrupt(),
...@@ -609,6 +614,7 @@ pg_notify(PG_FUNCTION_ARGS) ...@@ -609,6 +614,7 @@ pg_notify(PG_FUNCTION_ARGS)
void void
Async_Notify(const char *channel, const char *payload) Async_Notify(const char *channel, const char *payload)
{ {
int my_level = GetCurrentTransactionNestLevel();
size_t channel_len; size_t channel_len;
size_t payload_len; size_t payload_len;
Notification *n; Notification *n;
...@@ -659,6 +665,27 @@ Async_Notify(const char *channel, const char *payload) ...@@ -659,6 +665,27 @@ Async_Notify(const char *channel, const char *payload)
else else
n->data[channel_len + 1] = '\0'; n->data[channel_len + 1] = '\0';
if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
{
NotificationList *notifies;
/*
* First notify event in current (sub)xact. Note that we allocate the
* NotificationList in TopTransactionContext; the nestingLevel might
* get changed later by AtSubCommit_Notify.
*/
notifies = (NotificationList *)
MemoryContextAlloc(TopTransactionContext,
sizeof(NotificationList));
notifies->nestingLevel = my_level;
notifies->events = list_make1(n);
/* We certainly don't need a hashtable yet */
notifies->hashtab = NULL;
notifies->upper = pendingNotifies;
pendingNotifies = notifies;
}
else
{
/* Now check for duplicates */ /* Now check for duplicates */
if (AsyncExistsPendingNotify(n)) if (AsyncExistsPendingNotify(n))
{ {
...@@ -668,16 +695,6 @@ Async_Notify(const char *channel, const char *payload) ...@@ -668,16 +695,6 @@ Async_Notify(const char *channel, const char *payload)
return; return;
} }
if (pendingNotifies == NULL)
{
/* First notify event in current (sub)xact */
pendingNotifies = (NotificationList *) palloc(sizeof(NotificationList));
pendingNotifies->events = list_make1(n);
/* We certainly don't need a hashtable yet */
pendingNotifies->hashtab = NULL;
}
else
{
/* Append more events to existing list */ /* Append more events to existing list */
AddEventToPendingNotifies(n); AddEventToPendingNotifies(n);
} }
...@@ -698,6 +715,7 @@ queue_listen(ListenActionKind action, const char *channel) ...@@ -698,6 +715,7 @@ queue_listen(ListenActionKind action, const char *channel)
{ {
MemoryContext oldcontext; MemoryContext oldcontext;
ListenAction *actrec; ListenAction *actrec;
int my_level = GetCurrentTransactionNestLevel();
/* /*
* Unlike Async_Notify, we don't try to collapse out duplicates. It would * Unlike Async_Notify, we don't try to collapse out duplicates. It would
...@@ -713,7 +731,24 @@ queue_listen(ListenActionKind action, const char *channel) ...@@ -713,7 +731,24 @@ queue_listen(ListenActionKind action, const char *channel)
actrec->action = action; actrec->action = action;
strcpy(actrec->channel, channel); strcpy(actrec->channel, channel);
pendingActions = lappend(pendingActions, actrec); if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
{
ActionList *actions;
/*
* First action in current sub(xact). Note that we allocate the
* ActionList in TopTransactionContext; the nestingLevel might get
* changed later by AtSubCommit_Notify.
*/
actions = (ActionList *)
MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
actions->nestingLevel = my_level;
actions->actions = list_make1(actrec);
actions->upper = pendingActions;
pendingActions = actions;
}
else
pendingActions->actions = lappend(pendingActions->actions, actrec);
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
} }
...@@ -744,7 +779,7 @@ Async_Unlisten(const char *channel) ...@@ -744,7 +779,7 @@ Async_Unlisten(const char *channel)
elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid); elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
/* If we couldn't possibly be listening, no need to queue anything */ /* If we couldn't possibly be listening, no need to queue anything */
if (pendingActions == NIL && !unlistenExitRegistered) if (pendingActions == NULL && !unlistenExitRegistered)
return; return;
queue_listen(LISTEN_UNLISTEN, channel); queue_listen(LISTEN_UNLISTEN, channel);
...@@ -762,7 +797,7 @@ Async_UnlistenAll(void) ...@@ -762,7 +797,7 @@ Async_UnlistenAll(void)
elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid); elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
/* If we couldn't possibly be listening, no need to queue anything */ /* If we couldn't possibly be listening, no need to queue anything */
if (pendingActions == NIL && !unlistenExitRegistered) if (pendingActions == NULL && !unlistenExitRegistered)
return; return;
queue_listen(LISTEN_UNLISTEN_ALL, ""); queue_listen(LISTEN_UNLISTEN_ALL, "");
...@@ -858,7 +893,9 @@ PreCommit_Notify(void) ...@@ -858,7 +893,9 @@ PreCommit_Notify(void)
elog(DEBUG1, "PreCommit_Notify"); elog(DEBUG1, "PreCommit_Notify");
/* Preflight for any pending listen/unlisten actions */ /* Preflight for any pending listen/unlisten actions */
foreach(p, pendingActions) if (pendingActions != NULL)
{
foreach(p, pendingActions->actions)
{ {
ListenAction *actrec = (ListenAction *) lfirst(p); ListenAction *actrec = (ListenAction *) lfirst(p);
...@@ -875,6 +912,7 @@ PreCommit_Notify(void) ...@@ -875,6 +912,7 @@ PreCommit_Notify(void)
break; break;
} }
} }
}
/* Queue any pending notifies (must happen after the above) */ /* Queue any pending notifies (must happen after the above) */
if (pendingNotifies) if (pendingNotifies)
...@@ -961,7 +999,9 @@ AtCommit_Notify(void) ...@@ -961,7 +999,9 @@ AtCommit_Notify(void)
elog(DEBUG1, "AtCommit_Notify"); elog(DEBUG1, "AtCommit_Notify");
/* Perform any pending listen/unlisten actions */ /* Perform any pending listen/unlisten actions */
foreach(p, pendingActions) if (pendingActions != NULL)
{
foreach(p, pendingActions->actions)
{ {
ListenAction *actrec = (ListenAction *) lfirst(p); ListenAction *actrec = (ListenAction *) lfirst(p);
...@@ -978,6 +1018,7 @@ AtCommit_Notify(void) ...@@ -978,6 +1018,7 @@ AtCommit_Notify(void)
break; break;
} }
} }
}
/* If no longer listening to anything, get out of listener array */ /* If no longer listening to anything, get out of listener array */
if (amRegisteredListener && listenChannels == NIL) if (amRegisteredListener && listenChannels == NIL)
...@@ -1705,36 +1746,6 @@ AtAbort_Notify(void) ...@@ -1705,36 +1746,6 @@ AtAbort_Notify(void)
ClearPendingActionsAndNotifies(); ClearPendingActionsAndNotifies();
} }
/*
* AtSubStart_Notify() --- Take care of subtransaction start.
*
* Push empty state for the new subtransaction.
*/
void
AtSubStart_Notify(void)
{
MemoryContext old_cxt;
/* Keep the list-of-lists in TopTransactionContext for simplicity */
old_cxt = MemoryContextSwitchTo(TopTransactionContext);
upperPendingActions = lcons(pendingActions, upperPendingActions);
Assert(list_length(upperPendingActions) ==
GetCurrentTransactionNestLevel() - 1);
pendingActions = NIL;
upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
Assert(list_length(upperPendingNotifies) ==
GetCurrentTransactionNestLevel() - 1);
pendingNotifies = NULL;
MemoryContextSwitchTo(old_cxt);
}
/* /*
* AtSubCommit_Notify() --- Take care of subtransaction commit. * AtSubCommit_Notify() --- Take care of subtransaction commit.
* *
...@@ -1743,46 +1754,57 @@ AtSubStart_Notify(void) ...@@ -1743,46 +1754,57 @@ AtSubStart_Notify(void)
void void
AtSubCommit_Notify(void) AtSubCommit_Notify(void)
{ {
List *parentPendingActions; int my_level = GetCurrentTransactionNestLevel();
NotificationList *parentPendingNotifies;
parentPendingActions = linitial_node(List, upperPendingActions); /* If there are actions at our nesting level, we must reparent them. */
upperPendingActions = list_delete_first(upperPendingActions); if (pendingActions != NULL &&
pendingActions->nestingLevel >= my_level)
{
if (pendingActions->upper == NULL ||
pendingActions->upper->nestingLevel < my_level - 1)
{
/* nothing to merge; give the whole thing to the parent */
--pendingActions->nestingLevel;
}
else
{
ActionList *childPendingActions = pendingActions;
Assert(list_length(upperPendingActions) == pendingActions = pendingActions->upper;
GetCurrentTransactionNestLevel() - 2);
/* /*
* Mustn't try to eliminate duplicates here --- see queue_listen() * Mustn't try to eliminate duplicates here --- see queue_listen()
*/ */
pendingActions = list_concat(parentPendingActions, pendingActions); pendingActions->actions =
list_concat(pendingActions->actions,
parentPendingNotifies = (NotificationList *) linitial(upperPendingNotifies); childPendingActions->actions);
upperPendingNotifies = list_delete_first(upperPendingNotifies); pfree(childPendingActions);
}
Assert(list_length(upperPendingNotifies) == }
GetCurrentTransactionNestLevel() - 2);
if (pendingNotifies == NULL) /* If there are notifies at our nesting level, we must reparent them. */
if (pendingNotifies != NULL &&
pendingNotifies->nestingLevel >= my_level)
{ {
/* easy, no notify events happened in current subxact */ Assert(pendingNotifies->nestingLevel == my_level);
pendingNotifies = parentPendingNotifies;
} if (pendingNotifies->upper == NULL ||
else if (parentPendingNotifies == NULL) pendingNotifies->upper->nestingLevel < my_level - 1)
{ {
/* easy, subxact's list becomes parent's */ /* nothing to merge; give the whole thing to the parent */
--pendingNotifies->nestingLevel;
} }
else else
{ {
/* /*
* Formerly, we didn't bother to eliminate duplicates here, but now we * Formerly, we didn't bother to eliminate duplicates here, but
* must, else we fall foul of "Assert(!found)", either here or during * now we must, else we fall foul of "Assert(!found)", either here
* a later attempt to build the parent-level hashtable. * or during a later attempt to build the parent-level hashtable.
*/ */
NotificationList *childPendingNotifies = pendingNotifies; NotificationList *childPendingNotifies = pendingNotifies;
ListCell *l; ListCell *l;
pendingNotifies = parentPendingNotifies; pendingNotifies = pendingNotifies->upper;
/* Insert all the subxact's events into parent, except for dups */ /* Insert all the subxact's events into parent, except for dups */
foreach(l, childPendingNotifies->events) foreach(l, childPendingNotifies->events)
{ {
...@@ -1791,6 +1813,8 @@ AtSubCommit_Notify(void) ...@@ -1791,6 +1813,8 @@ AtSubCommit_Notify(void)
if (!AsyncExistsPendingNotify(childn)) if (!AsyncExistsPendingNotify(childn))
AddEventToPendingNotifies(childn); AddEventToPendingNotifies(childn);
} }
pfree(childPendingNotifies);
}
} }
} }
...@@ -1805,23 +1829,31 @@ AtSubAbort_Notify(void) ...@@ -1805,23 +1829,31 @@ AtSubAbort_Notify(void)
/* /*
* All we have to do is pop the stack --- the actions/notifies made in * All we have to do is pop the stack --- the actions/notifies made in
* this subxact are no longer interesting, and the space will be freed * this subxact are no longer interesting, and the space will be freed
* when CurTransactionContext is recycled. * when CurTransactionContext is recycled. We still have to free the
* ActionList and NotificationList objects themselves, though, because
* those are allocated in TopTransactionContext.
* *
* This routine could be called more than once at a given nesting level if * Note that there might be no entries at all, or no entries for the
* there is trouble during subxact abort. Avoid dumping core by using * current subtransaction level, either because none were ever created,
* GetCurrentTransactionNestLevel as the indicator of how far we need to * or because we reentered this routine due to trouble during subxact
* prune the list. * abort.
*/ */
while (list_length(upperPendingActions) > my_level - 2) while (pendingActions != NULL &&
pendingActions->nestingLevel >= my_level)
{ {
pendingActions = linitial_node(List, upperPendingActions); ActionList *childPendingActions = pendingActions;
upperPendingActions = list_delete_first(upperPendingActions);
pendingActions = pendingActions->upper;
pfree(childPendingActions);
} }
while (list_length(upperPendingNotifies) > my_level - 2) while (pendingNotifies != NULL &&
pendingNotifies->nestingLevel >= my_level)
{ {
pendingNotifies = (NotificationList *) linitial(upperPendingNotifies); NotificationList *childPendingNotifies = pendingNotifies;
upperPendingNotifies = list_delete_first(upperPendingNotifies);
pendingNotifies = pendingNotifies->upper;
pfree(childPendingNotifies);
} }
} }
...@@ -2374,12 +2406,11 @@ static void ...@@ -2374,12 +2406,11 @@ static void
ClearPendingActionsAndNotifies(void) ClearPendingActionsAndNotifies(void)
{ {
/* /*
* We used to have to explicitly deallocate the list members and nodes, * Everything's allocated in either TopTransactionContext or the context
* because they were malloc'd. Now, since we know they are palloc'd in * for the subtransaction to which it corresponds. So, there's nothing
* CurTransactionContext, we need not do that --- they'll go away * to do here except rest the pointers; the space will be reclaimed when
* automatically at transaction exit. We need only reset the list head * the contexts are deleted.
* pointers.
*/ */
pendingActions = NIL; pendingActions = NULL;
pendingNotifies = NULL; pendingNotifies = NULL;
} }
...@@ -40,7 +40,6 @@ extern void Async_UnlistenAll(void); ...@@ -40,7 +40,6 @@ extern void Async_UnlistenAll(void);
extern void PreCommit_Notify(void); extern void PreCommit_Notify(void);
extern void AtCommit_Notify(void); extern void AtCommit_Notify(void);
extern void AtAbort_Notify(void); extern void AtAbort_Notify(void);
extern void AtSubStart_Notify(void);
extern void AtSubCommit_Notify(void); extern void AtSubCommit_Notify(void);
extern void AtSubAbort_Notify(void); extern void AtSubAbort_Notify(void);
extern void AtPrepare_Notify(void); extern void AtPrepare_Notify(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment