Commit 9a591c1b authored by Peter Eisentraut's avatar Peter Eisentraut

Fix statistics reporting in logical replication workers

This new arrangement ensures that statistics are reported right after
commit of transactions.  The previous arrangement didn't get this quite
right and could lead to assertion failures.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: default avatarErik Rijkers <er@xs4all.nl>
parent b6576e59
......@@ -274,6 +274,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
static List *table_states = NIL;
static HTAB *last_start_times = NULL;
ListCell *lc;
bool started_tx = false;
Assert(!IsTransactionState());
......@@ -290,6 +291,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
table_states = NIL;
StartTransactionCommand();
started_tx = true;
/* Fetch all non-ready tables. */
rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
......@@ -304,8 +306,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
MemoryContextSwitchTo(oldctx);
CommitTransactionCommand();
table_states_valid = true;
}
......@@ -350,11 +350,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
{
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
StartTransactionCommand();
if (!started_tx)
{
StartTransactionCommand();
started_tx = true;
}
SetSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,
rstate->lsn);
CommitTransactionCommand();
}
}
else
......@@ -457,6 +460,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
}
}
if (started_tx)
{
CommitTransactionCommand();
pgstat_report_stat(false);
}
}
/*
......@@ -806,6 +815,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
CommitTransactionCommand();
pgstat_report_stat(false);
/*
* We want to do the table data sync in single
......
......@@ -453,6 +453,7 @@ apply_handle_commit(StringInfo s)
replorigin_session_origin_timestamp = commit_data.committime;
CommitTransactionCommand();
pgstat_report_stat(false);
store_flush_position(commit_data.end_lsn);
}
......@@ -462,7 +463,6 @@ apply_handle_commit(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(commit_data.end_lsn);
pgstat_report_stat(false);
pgstat_report_activity(STATE_IDLE, NULL);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment