Commit b9c130a1 authored by Peter Eisentraut's avatar Peter Eisentraut

Have logical replication subscriber fire column triggers

The logical replication apply worker did not fire per-column update
triggers because the updatedCols bitmap in the RTE was not populated.
This fixes that.
Reviewed-by: default avatarEuler Taveira <euler@timbira.com.br>
Discussion: https://www.postgresql.org/message-id/flat/21673e2d-597c-6afe-637e-e8b10425b240%402ndquadrant.com
parent 7b283d0e
...@@ -691,6 +691,7 @@ apply_handle_update(StringInfo s) ...@@ -691,6 +691,7 @@ apply_handle_update(StringInfo s)
bool has_oldtup; bool has_oldtup;
TupleTableSlot *localslot; TupleTableSlot *localslot;
TupleTableSlot *remoteslot; TupleTableSlot *remoteslot;
RangeTblEntry *target_rte;
bool found; bool found;
MemoryContext oldctx; MemoryContext oldctx;
...@@ -721,6 +722,21 @@ apply_handle_update(StringInfo s) ...@@ -721,6 +722,21 @@ apply_handle_update(StringInfo s)
&estate->es_tupleTable); &estate->es_tupleTable);
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
/*
* Populate updatedCols so that per-column triggers can fire. This could
* include more columns than were actually changed on the publisher
* because the logical replication protocol doesn't contain that
* information. But it would for example exclude columns that only exist
* on the subscriber, since we are not touching those.
*/
target_rte = list_nth(estate->es_range_table, 0);
for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
{
if (newtup.changed[i])
target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
i + 1 - FirstLowInvalidHeapAttributeNumber);
}
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
ExecOpenIndices(estate->es_result_relation_info, false); ExecOpenIndices(estate->es_result_relation_info, false);
......
...@@ -3,7 +3,7 @@ use strict; ...@@ -3,7 +3,7 @@ use strict;
use warnings; use warnings;
use PostgresNode; use PostgresNode;
use TestLib; use TestLib;
use Test::More tests => 4; use Test::More tests => 6;
# Initialize publisher node # Initialize publisher node
my $node_publisher = get_new_node('publisher'); my $node_publisher = get_new_node('publisher');
...@@ -81,6 +81,8 @@ BEGIN ...@@ -81,6 +81,8 @@ BEGIN
ELSE ELSE
RETURN NULL; RETURN NULL;
END IF; END IF;
ELSIF (TG_OP = 'UPDATE') THEN
RETURN NULL;
ELSE ELSE
RAISE WARNING 'Unknown action'; RAISE WARNING 'Unknown action';
RETURN NULL; RETURN NULL;
...@@ -88,7 +90,7 @@ BEGIN ...@@ -88,7 +90,7 @@ BEGIN
END; END;
\$\$ LANGUAGE plpgsql; \$\$ LANGUAGE plpgsql;
CREATE TRIGGER filter_basic_dml_trg CREATE TRIGGER filter_basic_dml_trg
BEFORE INSERT ON tab_fk_ref BEFORE INSERT OR UPDATE OF bid ON tab_fk_ref
FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn(); FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn();
ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg; ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg;
}); });
...@@ -99,10 +101,32 @@ $node_publisher->safe_psql('postgres', ...@@ -99,10 +101,32 @@ $node_publisher->safe_psql('postgres',
$node_publisher->wait_for_catchup('tap_sub'); $node_publisher->wait_for_catchup('tap_sub');
# The row should be skipped on subscriber # The trigger should cause the insert to be skipped on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
is($result, qq(2|1|2), 'check replica insert trigger applied on subscriber');
# Update data
$node_publisher->safe_psql('postgres',
"UPDATE tab_fk_ref SET bid = 2 WHERE bid = 1;");
$node_publisher->wait_for_catchup('tap_sub');
# The trigger should cause the update to be skipped on subscriber
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
is($result, qq(2|1|2), 'check replica trigger applied on subscriber'); is($result, qq(2|1|2), 'check replica update column trigger applied on subscriber');
# Update on a column not specified in the trigger, but it will trigger
# anyway because logical replication ships all columns in an update.
$node_publisher->safe_psql('postgres',
"UPDATE tab_fk_ref SET id = 6 WHERE id = 1;");
$node_publisher->wait_for_catchup('tap_sub');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(id), max(id) FROM tab_fk_ref;");
is($result, qq(2|1|2), 'check column trigger applied on even for other column');
$node_subscriber->stop('fast'); $node_subscriber->stop('fast');
$node_publisher->stop('fast'); $node_publisher->stop('fast');
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment