Commit 88c6cff8 authored by Peter Eisentraut's avatar Peter Eisentraut

Improve code comments

Author: Erik Rijkers <er@xs4all.nl>
parent ae1aa28e
...@@ -12,18 +12,18 @@ ...@@ -12,18 +12,18 @@
* logical replication. * logical replication.
* *
* The initial data synchronization is done separately for each table, * The initial data synchronization is done separately for each table,
* in separate apply worker that only fetches the initial snapshot data * in a separate apply worker that only fetches the initial snapshot data
* from the publisher and then synchronizes the position in stream with * from the publisher and then synchronizes the position in the stream with
* the main apply worker. * the main apply worker.
* *
* The are several reasons for doing the synchronization this way: * There are several reasons for doing the synchronization this way:
* - It allows us to parallelize the initial data synchronization * - It allows us to parallelize the initial data synchronization
* which lowers the time needed for it to happen. * which lowers the time needed for it to happen.
* - The initial synchronization does not have to hold the xid and LSN * - The initial synchronization does not have to hold the xid and LSN
* for the time it takes to copy data of all tables, causing less * for the time it takes to copy data of all tables, causing less
* bloat and lower disk consumption compared to doing the * bloat and lower disk consumption compared to doing the
* synchronization in single process for whole database. * synchronization in a single process for the whole database.
* - It allows us to synchronize the tables added after the initial * - It allows us to synchronize any tables added after the initial
* synchronization has finished. * synchronization has finished.
* *
* The stream position synchronization works in multiple steps. * The stream position synchronization works in multiple steps.
...@@ -147,7 +147,7 @@ finish_sync_worker(void) ...@@ -147,7 +147,7 @@ finish_sync_worker(void)
} }
/* /*
* Wait until the relation synchronization state is set in catalog to the * Wait until the relation synchronization state is set in the catalog to the
* expected one. * expected one.
* *
* Used when transitioning from CATCHUP state to SYNCDONE. * Used when transitioning from CATCHUP state to SYNCDONE.
...@@ -206,12 +206,12 @@ wait_for_relation_state_change(Oid relid, char expected_state) ...@@ -206,12 +206,12 @@ wait_for_relation_state_change(Oid relid, char expected_state)
} }
/* /*
* Wait until the the apply worker changes the state of our synchronization * Wait until the apply worker changes the state of our synchronization
* worker to the expected one. * worker to the expected one.
* *
* Used when transitioning from SYNCWAIT state to CATCHUP. * Used when transitioning from SYNCWAIT state to CATCHUP.
* *
* Returns false if the apply worker has disappeared or table state has been * Returns false if the apply worker has disappeared or the table state has been
* reset. * reset.
*/ */
static bool static bool
...@@ -225,7 +225,7 @@ wait_for_worker_state_change(char expected_state) ...@@ -225,7 +225,7 @@ wait_for_worker_state_change(char expected_state)
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/* Bail if he apply has died. */ /* Bail if the apply has died. */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(MyLogicalRepWorker->subid, worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
InvalidOid, false); InvalidOid, false);
...@@ -333,7 +333,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ...@@ -333,7 +333,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Assert(!IsTransactionState()); Assert(!IsTransactionState());
/* We need up to date sync state info for subscription tables here. */ /* We need up-to-date sync state info for subscription tables here. */
if (!table_states_valid) if (!table_states_valid)
{ {
MemoryContext oldctx; MemoryContext oldctx;
...@@ -365,7 +365,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ...@@ -365,7 +365,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
} }
/* /*
* Prepare hash table for tracking last start times of workers, to avoid * Prepare a hash table for tracking last start times of workers, to avoid
* immediate restarts. We don't need it if there are no tables that need * immediate restarts. We don't need it if there are no tables that need
* syncing. * syncing.
*/ */
...@@ -401,7 +401,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ...@@ -401,7 +401,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
{ {
/* /*
* Apply has caught up to the position where the table sync has * Apply has caught up to the position where the table sync has
* finished. Time to mark the table as ready so that apply will * finished. Mark the table as ready so that the apply will
* just continue to replicate it normally. * just continue to replicate it normally.
*/ */
if (current_lsn >= rstate->lsn) if (current_lsn >= rstate->lsn)
...@@ -436,7 +436,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ...@@ -436,7 +436,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
else else
/* /*
* If no sync worker for this table yet, count running sync * If there is no sync worker for this table yet, count running sync
* workers for this subscription, while we have the lock, for * workers for this subscription, while we have the lock, for
* later. * later.
*/ */
...@@ -477,7 +477,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ...@@ -477,7 +477,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
/* /*
* If there is no sync worker registered for the table and there * If there is no sync worker registered for the table and there
* is some free sync worker slot, start new sync worker for the * is some free sync worker slot, start a new sync worker for the
* table. * table.
*/ */
else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription) else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
...@@ -551,7 +551,7 @@ copy_read_data(void *outbuf, int minread, int maxread) ...@@ -551,7 +551,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
int bytesread = 0; int bytesread = 0;
int avail; int avail;
/* If there are some leftover data from previous read, use them. */ /* If there are some leftover data from previous read, use it. */
avail = copybuf->len - copybuf->cursor; avail = copybuf->len - copybuf->cursor;
if (avail) if (avail)
{ {
...@@ -694,7 +694,7 @@ fetch_remote_table_info(char *nspname, char *relname, ...@@ -694,7 +694,7 @@ fetch_remote_table_info(char *nspname, char *relname,
(errmsg("could not fetch table info for table \"%s.%s\": %s", (errmsg("could not fetch table info for table \"%s.%s\": %s",
nspname, relname, res->err))); nspname, relname, res->err)));
/* We don't know number of rows coming, so allocate enough space. */ /* We don't know the number of rows coming, so allocate enough space. */
lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
lrel->attkeys = NULL; lrel->attkeys = NULL;
...@@ -852,22 +852,22 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -852,22 +852,22 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
pgstat_report_stat(false); pgstat_report_stat(false);
/* /*
* We want to do the table data sync in single transaction. * We want to do the table data sync in a single transaction.
*/ */
StartTransactionCommand(); StartTransactionCommand();
/* /*
* Use standard write lock here. It might be better to * Use a standard write lock here. It might be better to
* disallow access to table while it's being synchronized. But * disallow access to the table while it's being synchronized. But
* we don't want to block the main apply process from working * we don't want to block the main apply process from working
* and it has to open relation in RowExclusiveLock when * and it has to open the relation in RowExclusiveLock when
* remapping remote relation id to local one. * remapping remote relation id to local one.
*/ */
rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock); rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock);
/* /*
* Create temporary slot for the sync process. We do this * Create a temporary slot for the sync process. We do this
* inside transaction so that we can use the snapshot made by * inside the transaction so that we can use the snapshot made by
* the slot to get existing data. * the slot to get existing data.
*/ */
res = walrcv_exec(wrconn, res = walrcv_exec(wrconn,
...@@ -883,7 +883,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ...@@ -883,7 +883,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* Create new temporary logical decoding slot. * Create new temporary logical decoding slot.
* *
* We'll use slot for data copy so make sure the snapshot is * We'll use slot for data copy so make sure the snapshot is
* used for the transaction, that way the COPY will get data * used for the transaction; that way the COPY will get data
* that is consistent with the lsn used by the slot to start * that is consistent with the lsn used by the slot to start
* decoding. * decoding.
*/ */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment