Commit 644f0d7c authored by Amit Kapila's avatar Amit Kapila

Use Enum for top level logical replication message types.

Logical replication protocol uses a single byte character to identify a
message type in logical replication protocol. The code uses string
literals for the same. Use Enum so that

1. All the string literals used can be found at a single place. This
makes it easy to add more types without the risk of conflicts.

2. It's easy to locate the code handling a given message type.

3. When used with switch statements, it is easy to identify the missing
cases using -Wswitch.

Author: Ashutosh Bapat
Reviewed-by: Kyotaro Horiguchi, Andres Freund, Peter Smith and Amit Kapila
Discussion: https://postgr.es/m/CAExHW5uPzQ7L0oAd_ENyvaiYMOPgkrAoJpE+ZY5-obdcVT6NPg@mail.gmail.com
parent a929e17e
...@@ -44,7 +44,7 @@ static const char *logicalrep_read_namespace(StringInfo in); ...@@ -44,7 +44,7 @@ static const char *logicalrep_read_namespace(StringInfo in);
void void
logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn) logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
{ {
pq_sendbyte(out, 'B'); /* BEGIN */ pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
/* fixed fields */ /* fixed fields */
pq_sendint64(out, txn->final_lsn); pq_sendint64(out, txn->final_lsn);
...@@ -76,7 +76,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, ...@@ -76,7 +76,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
{ {
uint8 flags = 0; uint8 flags = 0;
pq_sendbyte(out, 'C'); /* sending COMMIT */ pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
/* send the flags field (unused for now) */ /* send the flags field (unused for now) */
pq_sendbyte(out, flags); pq_sendbyte(out, flags);
...@@ -112,7 +112,7 @@ void ...@@ -112,7 +112,7 @@ void
logicalrep_write_origin(StringInfo out, const char *origin, logicalrep_write_origin(StringInfo out, const char *origin,
XLogRecPtr origin_lsn) XLogRecPtr origin_lsn)
{ {
pq_sendbyte(out, 'O'); /* ORIGIN */ pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
/* fixed fields */ /* fixed fields */
pq_sendint64(out, origin_lsn); pq_sendint64(out, origin_lsn);
...@@ -141,7 +141,7 @@ void ...@@ -141,7 +141,7 @@ void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
HeapTuple newtuple, bool binary) HeapTuple newtuple, bool binary)
{ {
pq_sendbyte(out, 'I'); /* action INSERT */ pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
/* transaction ID (if not valid, we're not streaming) */ /* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid)) if (TransactionIdIsValid(xid))
...@@ -185,7 +185,7 @@ void ...@@ -185,7 +185,7 @@ void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
HeapTuple oldtuple, HeapTuple newtuple, bool binary) HeapTuple oldtuple, HeapTuple newtuple, bool binary)
{ {
pq_sendbyte(out, 'U'); /* action UPDATE */ pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
...@@ -263,7 +263,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, ...@@ -263,7 +263,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
pq_sendbyte(out, 'D'); /* action DELETE */ pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
/* transaction ID (if not valid, we're not streaming) */ /* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid)) if (TransactionIdIsValid(xid))
...@@ -317,7 +317,7 @@ logicalrep_write_truncate(StringInfo out, ...@@ -317,7 +317,7 @@ logicalrep_write_truncate(StringInfo out,
int i; int i;
uint8 flags = 0; uint8 flags = 0;
pq_sendbyte(out, 'T'); /* action TRUNCATE */ pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
/* transaction ID (if not valid, we're not streaming) */ /* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid)) if (TransactionIdIsValid(xid))
...@@ -369,7 +369,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) ...@@ -369,7 +369,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
{ {
char *relname; char *relname;
pq_sendbyte(out, 'R'); /* sending RELATION */ pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
/* transaction ID (if not valid, we're not streaming) */ /* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid)) if (TransactionIdIsValid(xid))
...@@ -425,7 +425,7 @@ logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid) ...@@ -425,7 +425,7 @@ logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
HeapTuple tup; HeapTuple tup;
Form_pg_type typtup; Form_pg_type typtup;
pq_sendbyte(out, 'Y'); /* sending TYPE */ pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
/* transaction ID (if not valid, we're not streaming) */ /* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid)) if (TransactionIdIsValid(xid))
...@@ -755,7 +755,7 @@ void ...@@ -755,7 +755,7 @@ void
logicalrep_write_stream_start(StringInfo out, logicalrep_write_stream_start(StringInfo out,
TransactionId xid, bool first_segment) TransactionId xid, bool first_segment)
{ {
pq_sendbyte(out, 'S'); /* action STREAM START */ pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
Assert(TransactionIdIsValid(xid)); Assert(TransactionIdIsValid(xid));
...@@ -788,7 +788,7 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment) ...@@ -788,7 +788,7 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment)
void void
logicalrep_write_stream_stop(StringInfo out) logicalrep_write_stream_stop(StringInfo out)
{ {
pq_sendbyte(out, 'E'); /* action STREAM END */ pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END);
} }
/* /*
...@@ -800,7 +800,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, ...@@ -800,7 +800,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
{ {
uint8 flags = 0; uint8 flags = 0;
pq_sendbyte(out, 'c'); /* action STREAM COMMIT */ pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
Assert(TransactionIdIsValid(txn->xid)); Assert(TransactionIdIsValid(txn->xid));
...@@ -849,7 +849,7 @@ void ...@@ -849,7 +849,7 @@ void
logicalrep_write_stream_abort(StringInfo out, TransactionId xid, logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
TransactionId subxid) TransactionId subxid)
{ {
pq_sendbyte(out, 'A'); /* action STREAM ABORT */ pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid)); Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
......
...@@ -1896,67 +1896,66 @@ apply_handle_truncate(StringInfo s) ...@@ -1896,67 +1896,66 @@ apply_handle_truncate(StringInfo s)
static void static void
apply_dispatch(StringInfo s) apply_dispatch(StringInfo s)
{ {
char action = pq_getmsgbyte(s); LogicalRepMsgType action = pq_getmsgbyte(s);
switch (action) switch (action)
{ {
/* BEGIN */ case LOGICAL_REP_MSG_BEGIN:
case 'B':
apply_handle_begin(s); apply_handle_begin(s);
break; return;
/* COMMIT */
case 'C': case LOGICAL_REP_MSG_COMMIT:
apply_handle_commit(s); apply_handle_commit(s);
break; return;
/* INSERT */
case 'I': case LOGICAL_REP_MSG_INSERT:
apply_handle_insert(s); apply_handle_insert(s);
break; return;
/* UPDATE */
case 'U': case LOGICAL_REP_MSG_UPDATE:
apply_handle_update(s); apply_handle_update(s);
break; return;
/* DELETE */
case 'D': case LOGICAL_REP_MSG_DELETE:
apply_handle_delete(s); apply_handle_delete(s);
break; return;
/* TRUNCATE */
case 'T': case LOGICAL_REP_MSG_TRUNCATE:
apply_handle_truncate(s); apply_handle_truncate(s);
break; return;
/* RELATION */
case 'R': case LOGICAL_REP_MSG_RELATION:
apply_handle_relation(s); apply_handle_relation(s);
break; return;
/* TYPE */
case 'Y': case LOGICAL_REP_MSG_TYPE:
apply_handle_type(s); apply_handle_type(s);
break; return;
/* ORIGIN */
case 'O': case LOGICAL_REP_MSG_ORIGIN:
apply_handle_origin(s); apply_handle_origin(s);
break; return;
/* STREAM START */
case 'S': case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s); apply_handle_stream_start(s);
break; return;
/* STREAM END */
case 'E': case LOGICAL_REP_MSG_STREAM_END:
apply_handle_stream_stop(s); apply_handle_stream_stop(s);
break; return;
/* STREAM ABORT */
case 'A': case LOGICAL_REP_MSG_STREAM_ABORT:
apply_handle_stream_abort(s); apply_handle_stream_abort(s);
break; return;
/* STREAM COMMIT */
case 'c': case LOGICAL_REP_MSG_STREAM_COMMIT:
apply_handle_stream_commit(s); apply_handle_stream_commit(s);
break; return;
default: }
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION), (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid logical replication message type \"%c\"", action))); errmsg("invalid logical replication message type \"%c\"", action)));
}
} }
/* /*
......
...@@ -33,6 +33,33 @@ ...@@ -33,6 +33,33 @@
#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM #define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM
/*
* Logical message types
*
* Used by logical replication wire protocol.
*
* Note: though this is an enum, the values are used to identify message types
* in logical replication protocol, which uses a single byte to identify a
* message type. Hence the values should be single byte wide and preferrably
* human readable characters.
*/
typedef enum LogicalRepMsgType
{
LOGICAL_REP_MSG_BEGIN = 'B',
LOGICAL_REP_MSG_COMMIT = 'C',
LOGICAL_REP_MSG_ORIGIN = 'O',
LOGICAL_REP_MSG_INSERT = 'I',
LOGICAL_REP_MSG_UPDATE = 'U',
LOGICAL_REP_MSG_DELETE = 'D',
LOGICAL_REP_MSG_TRUNCATE = 'T',
LOGICAL_REP_MSG_RELATION = 'R',
LOGICAL_REP_MSG_TYPE = 'Y',
LOGICAL_REP_MSG_STREAM_START = 'S',
LOGICAL_REP_MSG_STREAM_END = 'E',
LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
LOGICAL_REP_MSG_STREAM_ABORT = 'A'
} LogicalRepMsgType;
/* /*
* This struct stores a tuple received via logical replication. * This struct stores a tuple received via logical replication.
* Keep in mind that the columns correspond to the *remote* table. * Keep in mind that the columns correspond to the *remote* table.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment