Commit 489b96e8 authored by Peter Eisentraut's avatar Peter Eisentraut

Improve memory use in logical replication apply

Previously, the memory used by the logical replication apply worker for
processing messages would never be freed, so that could end up using a
lot of memory.  To improve that, change the existing ApplyContext memory
context to ApplyMessageContext and reset that after every
message (similar to MessageContext used elsewhere).  For consistency of
naming, rename the ApplyCacheContext to ApplyContext.

Author: Stas Kelvich <s.kelvich@postgrespro.ru>
parent e0bf1606
...@@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg ...@@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg
int attnum; int attnum;
} SlotErrCallbackArg; } SlotErrCallbackArg;
static MemoryContext ApplyContext = NULL; static MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyCacheContext = NULL; MemoryContext ApplyContext = NULL;
WalReceiverConn *wrconn = NULL; WalReceiverConn *wrconn = NULL;
...@@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) ...@@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
/* /*
* Make sure that we started local transaction. * Make sure that we started local transaction.
* *
* Also switches to ApplyContext as necessary. * Also switches to ApplyMessageContext as necessary.
*/ */
static bool static bool
ensure_transaction(void) ensure_transaction(void)
{ {
if (IsTransactionState()) if (IsTransactionState())
{ {
if (CurrentMemoryContext != ApplyContext) if (CurrentMemoryContext != ApplyMessageContext)
MemoryContextSwitchTo(ApplyContext); MemoryContextSwitchTo(ApplyMessageContext);
return false; return false;
} }
...@@ -162,7 +163,7 @@ ensure_transaction(void) ...@@ -162,7 +163,7 @@ ensure_transaction(void)
if (!MySubscriptionValid) if (!MySubscriptionValid)
reread_subscription(); reread_subscription();
MemoryContextSwitchTo(ApplyContext); MemoryContextSwitchTo(ApplyMessageContext);
return true; return true;
} }
...@@ -961,7 +962,7 @@ store_flush_position(XLogRecPtr remote_lsn) ...@@ -961,7 +962,7 @@ store_flush_position(XLogRecPtr remote_lsn)
FlushPosition *flushpos; FlushPosition *flushpos;
/* Need to do this in permanent context */ /* Need to do this in permanent context */
MemoryContextSwitchTo(ApplyCacheContext); MemoryContextSwitchTo(ApplyContext);
/* Track commit lsn */ /* Track commit lsn */
flushpos = (FlushPosition *) palloc(sizeof(FlushPosition)); flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
...@@ -969,7 +970,7 @@ store_flush_position(XLogRecPtr remote_lsn) ...@@ -969,7 +970,7 @@ store_flush_position(XLogRecPtr remote_lsn)
flushpos->remote_end = remote_lsn; flushpos->remote_end = remote_lsn;
dlist_push_tail(&lsn_mapping, &flushpos->node); dlist_push_tail(&lsn_mapping, &flushpos->node);
MemoryContextSwitchTo(ApplyContext); MemoryContextSwitchTo(ApplyMessageContext);
} }
...@@ -993,12 +994,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) ...@@ -993,12 +994,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
static void static void
LogicalRepApplyLoop(XLogRecPtr last_received) LogicalRepApplyLoop(XLogRecPtr last_received)
{ {
/* Init the ApplyContext which we use for easier cleanup. */ /*
ApplyContext = AllocSetContextCreate(TopMemoryContext, * Init the ApplyMessageContext which we clean up after each
"ApplyContext", * replication protocol message.
ALLOCSET_DEFAULT_MINSIZE, */
ALLOCSET_DEFAULT_INITSIZE, ApplyMessageContext = AllocSetContextCreate(ApplyContext,
ALLOCSET_DEFAULT_MAXSIZE); "ApplyMessageContext",
ALLOCSET_DEFAULT_SIZES);
/* mark as idle, before starting to loop */ /* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
...@@ -1013,7 +1015,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -1013,7 +1015,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
TimestampTz last_recv_timestamp = GetCurrentTimestamp(); TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false; bool ping_sent = false;
MemoryContextSwitchTo(ApplyContext); MemoryContextSwitchTo(ApplyMessageContext);
len = walrcv_receive(wrconn, &buf, &fd); len = walrcv_receive(wrconn, &buf, &fd);
...@@ -1045,7 +1047,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -1045,7 +1047,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
ping_sent = false; ping_sent = false;
/* Ensure we are reading the data into our memory context. */ /* Ensure we are reading the data into our memory context. */
MemoryContextSwitchTo(ApplyContext); MemoryContextSwitchTo(ApplyMessageContext);
s.data = buf; s.data = buf;
s.len = len; s.len = len;
...@@ -1091,6 +1093,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -1091,6 +1093,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, timestamp, true); UpdateWorkerStats(last_received, timestamp, true);
} }
/* other message types are purposefully ignored */ /* other message types are purposefully ignored */
MemoryContextReset(ApplyMessageContext);
} }
len = walrcv_receive(wrconn, &buf, &fd); len = walrcv_receive(wrconn, &buf, &fd);
...@@ -1115,7 +1119,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ...@@ -1115,7 +1119,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
} }
/* Cleanup the memory. */ /* Cleanup the memory. */
MemoryContextResetAndDeleteChildren(ApplyContext); MemoryContextResetAndDeleteChildren(ApplyMessageContext);
MemoryContextSwitchTo(TopMemoryContext); MemoryContextSwitchTo(TopMemoryContext);
/* Check if we need to exit the streaming loop. */ /* Check if we need to exit the streaming loop. */
...@@ -1258,7 +1262,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) ...@@ -1258,7 +1262,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
if (!reply_message) if (!reply_message)
{ {
MemoryContext oldctx = MemoryContextSwitchTo(ApplyCacheContext); MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
reply_message = makeStringInfo(); reply_message = makeStringInfo();
MemoryContextSwitchTo(oldctx); MemoryContextSwitchTo(oldctx);
} }
...@@ -1308,7 +1312,7 @@ reread_subscription(void) ...@@ -1308,7 +1312,7 @@ reread_subscription(void)
} }
/* Ensure allocations in permanent context. */ /* Ensure allocations in permanent context. */
oldctx = MemoryContextSwitchTo(ApplyCacheContext); oldctx = MemoryContextSwitchTo(ApplyContext);
newsub = GetSubscription(MyLogicalRepWorker->subid, true); newsub = GetSubscription(MyLogicalRepWorker->subid, true);
...@@ -1483,12 +1487,11 @@ ApplyWorkerMain(Datum main_arg) ...@@ -1483,12 +1487,11 @@ ApplyWorkerMain(Datum main_arg)
MyLogicalRepWorker->userid); MyLogicalRepWorker->userid);
/* Load the subscription into persistent memory context. */ /* Load the subscription into persistent memory context. */
CreateCacheMemoryContext(); ApplyContext = AllocSetContextCreate(TopMemoryContext,
ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext, "ApplyContext",
"ApplyCacheContext",
ALLOCSET_DEFAULT_SIZES); ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand(); StartTransactionCommand();
oldctx = MemoryContextSwitchTo(ApplyCacheContext); oldctx = MemoryContextSwitchTo(ApplyContext);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, false); MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
MySubscriptionValid = true; MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx); MemoryContextSwitchTo(oldctx);
...@@ -1533,7 +1536,7 @@ ApplyWorkerMain(Datum main_arg) ...@@ -1533,7 +1536,7 @@ ApplyWorkerMain(Datum main_arg)
syncslotname = LogicalRepSyncTableStart(&origin_startpos); syncslotname = LogicalRepSyncTableStart(&origin_startpos);
/* The slot name needs to be allocated in permanent memory context. */ /* The slot name needs to be allocated in permanent memory context. */
oldctx = MemoryContextSwitchTo(ApplyCacheContext); oldctx = MemoryContextSwitchTo(ApplyContext);
myslotname = pstrdup(syncslotname); myslotname = pstrdup(syncslotname);
MemoryContextSwitchTo(oldctx); MemoryContextSwitchTo(oldctx);
......
...@@ -265,6 +265,17 @@ from prepared statements simply reference the prepared statements' trees, ...@@ -265,6 +265,17 @@ from prepared statements simply reference the prepared statements' trees,
and don't actually need any storage allocated in their private contexts. and don't actually need any storage allocated in their private contexts.
Logical Replication Worker Contexts
-----------------------------------
ApplyContext --- permanent during whole lifetime of apply worker. It
is possible to use TopMemoryContext here as well, but for simplicity
of memory usage analysis we spin up different context.
ApplyMessageContext --- short-lived context that is reset after each
logical replication protocol message is processed.
Transient Contexts During Execution Transient Contexts During Execution
----------------------------------- -----------------------------------
......
...@@ -56,8 +56,8 @@ typedef struct LogicalRepWorker ...@@ -56,8 +56,8 @@ typedef struct LogicalRepWorker
TimestampTz reply_time; TimestampTz reply_time;
} LogicalRepWorker; } LogicalRepWorker;
/* Memory context for cached variables in apply worker. */ /* Main memory context for apply worker. Permanent during worker lifetime. */
extern MemoryContext ApplyCacheContext; extern MemoryContext ApplyContext;
/* libpqreceiver connection */ /* libpqreceiver connection */
extern struct WalReceiverConn *wrconn; extern struct WalReceiverConn *wrconn;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment