Commit bbd8550b authored by Robert Haas's avatar Robert Haas

Refactor other replication commands to use DestRemoteSimple.

Commit a84069d9 added a new type of
DestReceiver to avoid duplicating the existing code for the SHOW
command, but it turns out we can leverage that new DestReceiver
type in a few more places, saving some code.

Michael Paquier, reviewed by Andres Freund and by me.

Discussion: http://postgr.es/m/CAB7nPqSdFOQC0evc0r1nJeQyGBqjBrR41MC4rcMqUUpoJaZbtQ%40mail.gmail.com
Discussion: http://postgr.es/m/CAB7nPqT2K4XFT1JgqufFBjsOc-NUKXg5qBDucHPMbk6Xi1kYaA@mail.gmail.com
parent c3e3844a
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "fmgr.h" #include "fmgr.h"
#include "libpq/pqformat.h" #include "libpq/pqformat.h"
#include "utils/builtins.h"
/* /*
* At startup time, send a RowDescription message. * At startup time, send a RowDescription message.
...@@ -99,6 +100,26 @@ printsimple(TupleTableSlot *slot, DestReceiver *self) ...@@ -99,6 +100,26 @@ printsimple(TupleTableSlot *slot, DestReceiver *self)
} }
break; break;
case INT4OID:
{
int32 num = DatumGetInt32(value);
char str[12]; /* sign, 10 digits and '\0' */
pg_ltoa(num, str);
pq_sendcountedtext(&buf, str, strlen(str), false);
}
break;
case INT8OID:
{
int64 num = DatumGetInt64(value);
char str[23]; /* sign, 21 digits and '\0' */
pg_lltoa(num, str);
pq_sendcountedtext(&buf, str, strlen(str), false);
}
break;
default: default:
elog(ERROR, "unsupported type OID: %u", attr->atttypid); elog(ERROR, "unsupported type OID: %u", attr->atttypid);
} }
......
...@@ -629,6 +629,14 @@ TupleDescInitBuiltinEntry(TupleDesc desc, ...@@ -629,6 +629,14 @@ TupleDescInitBuiltinEntry(TupleDesc desc,
att->attstorage = 'p'; att->attstorage = 'p';
att->attcollation = InvalidOid; att->attcollation = InvalidOid;
break; break;
case INT8OID:
att->attlen = 8;
att->attbyval = FLOAT8PASSBYVAL;
att->attalign = 'd';
att->attstorage = 'p';
att->attcollation = InvalidOid;
break;
} }
} }
......
...@@ -302,13 +302,15 @@ WalSndShutdown(void) ...@@ -302,13 +302,15 @@ WalSndShutdown(void)
static void static void
IdentifySystem(void) IdentifySystem(void)
{ {
StringInfoData buf;
char sysid[32]; char sysid[32];
char tli[11];
char xpos[MAXFNAMELEN]; char xpos[MAXFNAMELEN];
XLogRecPtr logptr; XLogRecPtr logptr;
char *dbname = NULL; char *dbname = NULL;
Size len; DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Datum values[4];
bool nulls[4];
/* /*
* Reply with a result set with one row, four columns. First col is system * Reply with a result set with one row, four columns. First col is system
...@@ -328,8 +330,6 @@ IdentifySystem(void) ...@@ -328,8 +330,6 @@ IdentifySystem(void)
else else
logptr = GetFlushRecPtr(); logptr = GetFlushRecPtr();
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr); snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
if (MyDatabaseId != InvalidOid) if (MyDatabaseId != InvalidOid)
...@@ -346,79 +346,42 @@ IdentifySystem(void) ...@@ -346,79 +346,42 @@ IdentifySystem(void)
MemoryContextSwitchTo(cur); MemoryContextSwitchTo(cur);
} }
/* Send a RowDescription message */ dest = CreateDestReceiver(DestRemoteSimple);
pq_beginmessage(&buf, 'T'); MemSet(nulls, false, sizeof(nulls));
pq_sendint(&buf, 4, 2); /* 4 fields */
/* first field */
pq_sendstring(&buf, "systemid"); /* col name */
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, TEXTOID, 4); /* type oid */
pq_sendint(&buf, -1, 2); /* typlen */
pq_sendint(&buf, 0, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */
/* second field */
pq_sendstring(&buf, "timeline"); /* col name */
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, INT4OID, 4); /* type oid */
pq_sendint(&buf, 4, 2); /* typlen */
pq_sendint(&buf, 0, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */
/* third field */
pq_sendstring(&buf, "xlogpos"); /* col name */
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, TEXTOID, 4); /* type oid */
pq_sendint(&buf, -1, 2); /* typlen */
pq_sendint(&buf, 0, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */
/* fourth field */ /* need a tuple descriptor representing four columns */
pq_sendstring(&buf, "dbname"); /* col name */ tupdesc = CreateTemplateTupleDesc(4, false);
pq_sendint(&buf, 0, 4); /* table oid */ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
pq_sendint(&buf, 0, 2); /* attnum */ TEXTOID, -1, 0);
pq_sendint(&buf, TEXTOID, 4); /* type oid */ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
pq_sendint(&buf, -1, 2); /* typlen */ INT4OID, -1, 0);
pq_sendint(&buf, 0, 4); /* typmod */ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
pq_sendint(&buf, 0, 2); /* format code */ TEXTOID, -1, 0);
pq_endmessage(&buf); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
TEXTOID, -1, 0);
/* Send a DataRow message */ /* prepare for projection of tuples */
pq_beginmessage(&buf, 'D'); tstate = begin_tup_output_tupdesc(dest, tupdesc);
pq_sendint(&buf, 4, 2); /* # of columns */
/* column 1: system identifier */ /* column 1: system identifier */
len = strlen(sysid); values[0] = CStringGetTextDatum(sysid);
pq_sendint(&buf, len, 4);
pq_sendbytes(&buf, (char *) &sysid, len);
/* column 2: timeline */ /* column 2: timeline */
len = strlen(tli); values[1] = Int32GetDatum(ThisTimeLineID);
pq_sendint(&buf, len, 4);
pq_sendbytes(&buf, (char *) tli, len);
/* column 3: xlog position */ /* column 3: xlog position */
len = strlen(xpos); values[2] = CStringGetTextDatum(xpos);
pq_sendint(&buf, len, 4);
pq_sendbytes(&buf, (char *) xpos, len);
/* column 4: database name, or NULL if none */ /* column 4: database name, or NULL if none */
if (dbname) if (dbname)
{ values[3] = CStringGetTextDatum(dbname);
len = strlen(dbname);
pq_sendint(&buf, len, 4);
pq_sendbytes(&buf, (char *) dbname, len);
}
else else
{ nulls[3] = true;
pq_sendint(&buf, -1, 4);
}
pq_endmessage(&buf); /* send it to dest */
do_tup_output(tstate, values, nulls);
end_tup_output(tstate);
} }
...@@ -695,54 +658,41 @@ StartReplication(StartReplicationCmd *cmd) ...@@ -695,54 +658,41 @@ StartReplication(StartReplicationCmd *cmd)
*/ */
if (sendTimeLineIsHistoric) if (sendTimeLineIsHistoric)
{ {
char tli_str[11];
char startpos_str[8 + 1 + 8 + 1]; char startpos_str[8 + 1 + 8 + 1];
Size len; DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Datum values[2];
bool nulls[2];
snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
snprintf(startpos_str, sizeof(startpos_str), "%X/%X", snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
(uint32) (sendTimeLineValidUpto >> 32), (uint32) (sendTimeLineValidUpto >> 32),
(uint32) sendTimeLineValidUpto); (uint32) sendTimeLineValidUpto);
pq_beginmessage(&buf, 'T'); /* RowDescription */ dest = CreateDestReceiver(DestRemoteSimple);
pq_sendint(&buf, 2, 2); /* 2 fields */ MemSet(nulls, false, sizeof(nulls));
/* Field header */
pq_sendstring(&buf, "next_tli");
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
/* /*
* Need a tuple descriptor representing two columns.
* int8 may seem like a surprising data type for this, but in theory * int8 may seem like a surprising data type for this, but in theory
* int4 would not be wide enough for this, as TimeLineID is unsigned. * int4 would not be wide enough for this, as TimeLineID is unsigned.
*/ */
pq_sendint(&buf, INT8OID, 4); /* type oid */ tupdesc = CreateTemplateTupleDesc(2, false);
pq_sendint(&buf, -1, 2); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
pq_sendint(&buf, 0, 4); INT8OID, -1, 0);
pq_sendint(&buf, 0, 2); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
TEXTOID, -1, 0);
pq_sendstring(&buf, "next_tli_startpos"); /* prepare for projection of tuple */
pq_sendint(&buf, 0, 4); /* table oid */ tstate = begin_tup_output_tupdesc(dest, tupdesc);
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, TEXTOID, 4); /* type oid */
pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);
/* Data row */ values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
pq_beginmessage(&buf, 'D'); values[1] = CStringGetTextDatum(startpos_str);
pq_sendint(&buf, 2, 2); /* number of columns */
len = strlen(tli_str); /* send it to dest */
pq_sendint(&buf, len, 4); /* length */ do_tup_output(tstate, values, nulls);
pq_sendbytes(&buf, tli_str, len);
len = strlen(startpos_str); end_tup_output(tstate);
pq_sendint(&buf, len, 4); /* length */
pq_sendbytes(&buf, startpos_str, len);
pq_endmessage(&buf);
} }
/* Send CommandComplete message */ /* Send CommandComplete message */
...@@ -790,8 +740,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ...@@ -790,8 +740,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{ {
const char *snapshot_name = NULL; const char *snapshot_name = NULL;
char xpos[MAXFNAMELEN]; char xpos[MAXFNAMELEN];
StringInfoData buf; char *slot_name;
Size len; DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Datum values[4];
bool nulls[4];
Assert(!MyReplicationSlot); Assert(!MyReplicationSlot);
...@@ -868,82 +822,51 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ...@@ -868,82 +822,51 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
(uint32) (MyReplicationSlot->data.confirmed_flush >> 32), (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
(uint32) MyReplicationSlot->data.confirmed_flush); (uint32) MyReplicationSlot->data.confirmed_flush);
pq_beginmessage(&buf, 'T'); dest = CreateDestReceiver(DestRemoteSimple);
pq_sendint(&buf, 4, 2); /* 4 fields */ MemSet(nulls, false, sizeof(nulls));
/* first field: slot name */
pq_sendstring(&buf, "slot_name"); /* col name */
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, TEXTOID, 4); /* type oid */
pq_sendint(&buf, -1, 2); /* typlen */
pq_sendint(&buf, 0, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */
/* second field: LSN at which we became consistent */
pq_sendstring(&buf, "consistent_point"); /* col name */
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, TEXTOID, 4); /* type oid */
pq_sendint(&buf, -1, 2); /* typlen */
pq_sendint(&buf, 0, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */
/* third field: exported snapshot's name */ /*
pq_sendstring(&buf, "snapshot_name"); /* col name */ * Need a tuple descriptor representing four columns:
pq_sendint(&buf, 0, 4); /* table oid */ * - first field: the slot name
pq_sendint(&buf, 0, 2); /* attnum */ * - second field: LSN at which we became consistent
pq_sendint(&buf, TEXTOID, 4); /* type oid */ * - third field: exported snapshot's name
pq_sendint(&buf, -1, 2); /* typlen */ * - fourth field: output plugin
pq_sendint(&buf, 0, 4); /* typmod */ */
pq_sendint(&buf, 0, 2); /* format code */ tupdesc = CreateTemplateTupleDesc(4, false);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
/* fourth field: output plugin */ TEXTOID, -1, 0);
pq_sendstring(&buf, "output_plugin"); /* col name */ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
pq_sendint(&buf, 0, 4); /* table oid */ TEXTOID, -1, 0);
pq_sendint(&buf, 0, 2); /* attnum */ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
pq_sendint(&buf, TEXTOID, 4); /* type oid */ TEXTOID, -1, 0);
pq_sendint(&buf, -1, 2); /* typlen */ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
pq_sendint(&buf, 0, 4); /* typmod */ TEXTOID, -1, 0);
pq_sendint(&buf, 0, 2); /* format code */
pq_endmessage(&buf);
/* Send a DataRow message */ /* prepare for projection of tuples */
pq_beginmessage(&buf, 'D'); tstate = begin_tup_output_tupdesc(dest, tupdesc);
pq_sendint(&buf, 4, 2); /* # of columns */
/* slot_name */ /* slot_name */
len = strlen(NameStr(MyReplicationSlot->data.name)); slot_name = NameStr(MyReplicationSlot->data.name);
pq_sendint(&buf, len, 4); /* col1 len */ values[0] = CStringGetTextDatum(slot_name);
pq_sendbytes(&buf, NameStr(MyReplicationSlot->data.name), len);
/* consistent wal location */ /* consistent wal location */
len = strlen(xpos); values[1] = CStringGetTextDatum(xpos);
pq_sendint(&buf, len, 4);
pq_sendbytes(&buf, xpos, len);
/* snapshot name, or NULL if none */ /* snapshot name, or NULL if none */
if (snapshot_name != NULL) if (snapshot_name != NULL)
{ values[2] = CStringGetTextDatum(snapshot_name);
len = strlen(snapshot_name);
pq_sendint(&buf, len, 4);
pq_sendbytes(&buf, snapshot_name, len);
}
else else
pq_sendint(&buf, -1, 4); nulls[2] = true;
/* plugin, or NULL if none */ /* plugin, or NULL if none */
if (cmd->plugin != NULL) if (cmd->plugin != NULL)
{ values[3] = CStringGetTextDatum(cmd->plugin);
len = strlen(cmd->plugin);
pq_sendint(&buf, len, 4);
pq_sendbytes(&buf, cmd->plugin, len);
}
else else
pq_sendint(&buf, -1, 4); nulls[3] = true;
pq_endmessage(&buf); /* send it to dest */
do_tup_output(tstate, values, nulls);
end_tup_output(tstate);
ReplicationSlotRelease(); ReplicationSlotRelease();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment