Commit d5daae47 authored by Tom Lane's avatar Tom Lane

Fix construction of updated-columns bitmap in logical replication.

Commit b9c130a1 failed to apply the publisher-to-subscriber column
mapping while checking which columns were updated.  Perhaps less
significantly, it didn't exclude dropped columns either.  This could
result in an incorrect updated-columns bitmap and thus wrong decisions
about whether to fire column-specific triggers on the subscriber while
applying updates.  In HEAD (since commit 9de77b54), it could also
result in accesses off the end of the colstatus array, as detected by
buildfarm member skink.  Fix the logic, and adjust 003_constraints.pl
so that the problem is exposed in unpatched code.

In HEAD, also add some assertions to check that we don't access off
the ends of these newly variable-sized arrays.

Back-patch to v10, as b9c130a1 was.

Discussion: https://postgr.es/m/CAH2-Wz=79hKQ4++c5A060RYbjTHgiYTHz=fw6mptCtgghH2gJA@mail.gmail.com
parent d98c08cd
...@@ -548,6 +548,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) ...@@ -548,6 +548,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
/* Allocate space for per-column values; zero out unused StringInfoDatas */ /* Allocate space for per-column values; zero out unused StringInfoDatas */
tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData)); tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
tuple->colstatus = (char *) palloc(natts * sizeof(char)); tuple->colstatus = (char *) palloc(natts * sizeof(char));
tuple->ncols = natts;
/* Read the data */ /* Read the data */
for (i = 0; i < natts; i++) for (i = 0; i < natts; i++)
......
...@@ -354,6 +354,8 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, ...@@ -354,6 +354,8 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
{ {
StringInfo colvalue = &tupleData->colvalues[remoteattnum]; StringInfo colvalue = &tupleData->colvalues[remoteattnum];
Assert(remoteattnum < tupleData->ncols);
errarg.local_attnum = i; errarg.local_attnum = i;
errarg.remote_attnum = remoteattnum; errarg.remote_attnum = remoteattnum;
...@@ -477,6 +479,8 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, ...@@ -477,6 +479,8 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
if (remoteattnum < 0) if (remoteattnum < 0)
continue; continue;
Assert(remoteattnum < tupleData->ncols);
if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
{ {
StringInfo colvalue = &tupleData->colvalues[remoteattnum]; StringInfo colvalue = &tupleData->colvalues[remoteattnum];
...@@ -831,10 +835,18 @@ apply_handle_update(StringInfo s) ...@@ -831,10 +835,18 @@ apply_handle_update(StringInfo s)
target_rte = list_nth(estate->es_range_table, 0); target_rte = list_nth(estate->es_range_table, 0);
for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
{ {
if (newtup.colstatus[i] != LOGICALREP_COLUMN_UNCHANGED) Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
target_rte->updatedCols = bms_add_member(target_rte->updatedCols, int remoteattnum = rel->attrmap->attnums[i];
if (!att->attisdropped && remoteattnum >= 0)
{
Assert(remoteattnum < newtup.ncols);
if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
target_rte->updatedCols =
bms_add_member(target_rte->updatedCols,
i + 1 - FirstLowInvalidHeapAttributeNumber); i + 1 - FirstLowInvalidHeapAttributeNumber);
} }
}
fill_extraUpdatedCols(target_rte, RelationGetDescr(rel->localrel)); fill_extraUpdatedCols(target_rte, RelationGetDescr(rel->localrel));
......
...@@ -27,13 +27,18 @@ ...@@ -27,13 +27,18 @@
#define LOGICALREP_PROTO_MIN_VERSION_NUM 1 #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
#define LOGICALREP_PROTO_VERSION_NUM 1 #define LOGICALREP_PROTO_VERSION_NUM 1
/* Tuple coming via logical replication. */ /*
* This struct stores a tuple received via logical replication.
* Keep in mind that the columns correspond to the *remote* table.
*/
typedef struct LogicalRepTupleData typedef struct LogicalRepTupleData
{ {
/* Array of StringInfos, one per column; some may be unused */ /* Array of StringInfos, one per column; some may be unused */
StringInfoData *colvalues; StringInfoData *colvalues;
/* Array of markers for null/unchanged/text/binary, one per column */ /* Array of markers for null/unchanged/text/binary, one per column */
char *colstatus; char *colstatus;
/* Length of above arrays */
int ncols;
} LogicalRepTupleData; } LogicalRepTupleData;
/* Possible values for LogicalRepTupleData.colstatus[colnum] */ /* Possible values for LogicalRepTupleData.colstatus[colnum] */
......
...@@ -19,14 +19,14 @@ $node_subscriber->start; ...@@ -19,14 +19,14 @@ $node_subscriber->start;
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
"CREATE TABLE tab_fk (bid int PRIMARY KEY);"); "CREATE TABLE tab_fk (bid int PRIMARY KEY);");
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
"CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));" "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, junk text, bid int REFERENCES tab_fk (bid));"
); );
# Setup structure on subscriber # Setup structure on subscriber; column order intentionally different
$node_subscriber->safe_psql('postgres', $node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_fk (bid int PRIMARY KEY);"); "CREATE TABLE tab_fk (bid int PRIMARY KEY);");
$node_subscriber->safe_psql('postgres', $node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));" "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid), junk text);"
); );
# Setup logical replication # Setup logical replication
...@@ -42,8 +42,10 @@ $node_publisher->wait_for_catchup('tap_sub'); ...@@ -42,8 +42,10 @@ $node_publisher->wait_for_catchup('tap_sub');
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
"INSERT INTO tab_fk (bid) VALUES (1);"); "INSERT INTO tab_fk (bid) VALUES (1);");
# "junk" value is meant to be large enough to force out-of-line storage
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
"INSERT INTO tab_fk_ref (id, bid) VALUES (1, 1);"); "INSERT INTO tab_fk_ref (id, bid, junk) VALUES (1, 1, repeat(pi()::text,20000));"
);
$node_publisher->wait_for_catchup('tap_sub'); $node_publisher->wait_for_catchup('tap_sub');
...@@ -128,7 +130,7 @@ $node_publisher->wait_for_catchup('tap_sub'); ...@@ -128,7 +130,7 @@ $node_publisher->wait_for_catchup('tap_sub');
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(id), max(id) FROM tab_fk_ref;"); "SELECT count(*), min(id), max(id) FROM tab_fk_ref;");
is($result, qq(2|1|2), is($result, qq(2|1|2),
'check column trigger applied on even for other column'); 'check column trigger applied even on update for other column');
$node_subscriber->stop('fast'); $node_subscriber->stop('fast');
$node_publisher->stop('fast'); $node_publisher->stop('fast');
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment