Commit acb7e4eb authored by Alvaro Herrera's avatar Alvaro Herrera

Implement pipeline mode in libpq

Pipeline mode in libpq lets an application avoid the Sync messages in
the FE/BE protocol that are implicit in the old libpq API after each
query.  The application can then insert Sync at its leisure with a new
libpq function PQpipelineSync.  This can lead to substantial reductions
in query latency.
Co-authored-by: default avatarCraig Ringer <craig.ringer@enterprisedb.com>
Co-authored-by: default avatarMatthieu Garrigues <matthieu.garrigues@gmail.com>
Co-authored-by: default avatarÁlvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: default avatarAndres Freund <andres@anarazel.de>
Reviewed-by: default avatarAya Iwata <iwata.aya@jp.fujitsu.com>
Reviewed-by: default avatarDaniel Vérité <daniel@manitou-mail.org>
Reviewed-by: default avatarDavid G. Johnston <david.g.johnston@gmail.com>
Reviewed-by: default avatarJustin Pryzby <pryzby@telsasoft.com>
Reviewed-by: default avatarKirk Jamison <k.jamison@fujitsu.com>
Reviewed-by: default avatarMichael Paquier <michael.paquier@gmail.com>
Reviewed-by: default avatarNikhil Sontakke <nikhils@2ndquadrant.com>
Reviewed-by: default avatarVaishnavi Prabakaran <VaishnaviP@fast.au.fujitsu.com>
Reviewed-by: default avatarZhihong Yu <zyu@yugabyte.com>

Discussion: https://postgr.es/m/CAMsr+YFUjJytRyV4J-16bEoiZyH=4nj+sQ7JP9ajwz=B4dMMZw@mail.gmail.com
Discussion: https://postgr.es/m/CAJkzx4T5E-2cQe3dtv2R78dYFvz+in8PY7A8MArvLhs_pg75gg@mail.gmail.com
parent 146cb388
......@@ -3180,6 +3180,33 @@ ExecStatusType PQresultStatus(const PGresult *res);
</para>
</listitem>
</varlistentry>
<varlistentry id="libpq-pgres-pipeline-sync">
<term><literal>PGRES_PIPELINE_SYNC</literal></term>
<listitem>
<para>
The <structname>PGresult</structname> represents a
synchronization point in pipeline mode, requested by
<xref linkend="libpq-PQpipelineSync"/>.
This status occurs only when pipeline mode has been selected.
</para>
</listitem>
</varlistentry>
<varlistentry id="libpq-pgres-pipeline-aborted">
<term><literal>PGRES_PIPELINE_ABORTED</literal></term>
<listitem>
<para>
The <structname>PGresult</structname> represents a pipeline that has
received an error from the server. <function>PQgetResult</function>
must be called repeatedly, and each time it will return this status code
until the end of the current pipeline, at which point it will return
<literal>PGRES_PIPELINE_SYNC</literal> and normal processing can
resume.
</para>
</listitem>
</varlistentry>
</variablelist>
If the result status is <literal>PGRES_TUPLES_OK</literal> or
......@@ -4677,8 +4704,9 @@ int PQsendDescribePortal(PGconn *conn, const char *portalName);
<xref linkend="libpq-PQsendQueryParams"/>,
<xref linkend="libpq-PQsendPrepare"/>,
<xref linkend="libpq-PQsendQueryPrepared"/>,
<xref linkend="libpq-PQsendDescribePrepared"/>, or
<xref linkend="libpq-PQsendDescribePortal"/>
<xref linkend="libpq-PQsendDescribePrepared"/>,
<xref linkend="libpq-PQsendDescribePortal"/>, or
<xref linkend="libpq-PQpipelineSync"/>
call, and returns it.
A null pointer is returned when the command is complete and there
will be no more results.
......@@ -4702,6 +4730,19 @@ PGresult *PQgetResult(PGconn *conn);
<xref linkend="libpq-PQconsumeInput"/>.
</para>
<para>
In pipeline mode, <function>PQgetResult</function> will return normally
unless an error occurs; for any subsequent query sent after the one
that caused the error until (and excluding) the next synchronization point,
a special result of type <literal>PGRES_PIPELINE_ABORTED</literal> will
be returned, and a null pointer will be returned after it.
When the pipeline synchronization point is reached, a result of type
<literal>PGRES_PIPELINE_SYNC</literal> will be returned.
The result of the next query after the synchronization point follows
immediately (that is, no null pointer is returned after
the synchronization point.)
</para>
<note>
<para>
Even when <xref linkend="libpq-PQresultStatus"/> indicates a fatal
......@@ -4926,6 +4967,476 @@ int PQflush(PGconn *conn);
</sect1>
<sect1 id="libpq-pipeline-mode">
<title>Pipeline Mode</title>
<indexterm zone="libpq-pipeline-mode">
<primary>libpq</primary>
<secondary>pipeline mode</secondary>
</indexterm>
<indexterm zone="libpq-pipeline-mode">
<primary>pipelining</primary>
<secondary>in libpq</secondary>
</indexterm>
<indexterm zone="libpq-pipeline-mode">
<primary>batch mode</primary>
<secondary>in libpq</secondary>
</indexterm>
<para>
<application>libpq</application> pipeline mode allows applications to
send a query without having to read the result of the previously
sent query. Taking advantage of the pipeline mode, a client will wait
less for the server, since multiple queries/results can be
sent/received in a single network transaction.
</para>
<para>
While pipeline mode provides a significant performance boost, writing
clients using the pipeline mode is more complex because it involves
managing a queue of pending queries and finding which result
corresponds to which query in the queue.
</para>
<para>
Pipeline mode also generally consumes more memory on both the client and server,
though careful and aggressive management of the send/receive queue can mitigate
this. This applies whether or not the connection is in blocking or non-blocking
mode.
</para>
<para>
While the pipeline API was introduced in
<productname>PostgreSQL</productname> 14, it is a client-side feature
which doesn't require special server support, and works on any server
that supports the v3 extended query protocol.
</para>
<sect2 id="libpq-pipeline-using">
<title>Using Pipeline Mode</title>
<para>
To issue pipelines, the application must switch the connection
into pipeline mode,
which is done with <xref linkend="libpq-PQenterPipelineMode"/>.
<xref linkend="libpq-PQpipelineStatus"/> can be used
to test whether pipeline mode is active.
In pipeline mode, only <link linkend="libpq-async">asynchronous operations</link>
are permitted, and <literal>COPY</literal> is disallowed.
Using synchronous command execution functions
such as <function>PQfn</function>,
<function>PQexec</function>,
<function>PQexecParams</function>,
<function>PQprepare</function>,
<function>PQexecPrepared</function>,
<function>PQdescribePrepared</function>,
<function>PQdescribePortal</function>,
is an error condition.
Once all dispatched commands have had their results processed, and
the end pipeline result has been consumed, the application may return
to non-pipelined mode with <xref linkend="libpq-PQexitPipelineMode"/>.
</para>
<note>
<para>
It is best to use pipeline mode with <application>libpq</application> in
<link linkend="libpq-PQsetnonblocking">non-blocking mode</link>. If used
in blocking mode it is possible for a client/server deadlock to occur.
<footnote>
<para>
The client will block trying to send queries to the server, but the
server will block trying to send results to the client from queries
it has already processed. This only occurs when the client sends
enough queries to fill both its output buffer and the server's receive
buffer before it switches to processing input from the server,
but it's hard to predict exactly when that will happen.
</para>
</footnote>
</para>
</note>
<sect3 id="libpq-pipeline-sending">
<title>Issuing Queries</title>
<para>
After entering pipeline mode, the application dispatches requests using
<xref linkend="libpq-PQsendQuery"/>,
<xref linkend="libpq-PQsendQueryParams"/>,
or its prepared-query sibling
<xref linkend="libpq-PQsendQueryPrepared"/>.
These requests are queued on the client-side until flushed to the server;
this occurs when <xref linkend="libpq-PQpipelineSync"/> is used to
establish a synchronization point in the pipeline,
or when <xref linkend="libpq-PQflush"/> is called.
The functions <xref linkend="libpq-PQsendPrepare"/>,
<xref linkend="libpq-PQsendDescribePrepared"/>, and
<xref linkend="libpq-PQsendDescribePortal"/> also work in pipeline mode.
Result processing is described below.
</para>
<para>
The server executes statements, and returns results, in the order the
client sends them. The server will begin executing the commands in the
pipeline immediately, not waiting for the end of the pipeline.
If any statement encounters an error, the server aborts the current
transaction and does not execute any subsequent command in the queue
until the next synchronization point established by
<function>PQpipelineSync</function>;
a <literal>PGRES_PIPELINE_ABORTED</literal> result is produced for
each such command.
(This remains true even if the commands in the pipeline would rollback
the transaction.)
Query processing resumes after the synchronization point.
</para>
<para>
It's fine for one operation to depend on the results of a
prior one; for example, one query may define a table that the next
query in the same pipeline uses. Similarly, an application may
create a named prepared statement and execute it with later
statements in the same pipeline.
</para>
</sect3>
<sect3 id="libpq-pipeline-results">
<title>Processing Results</title>
<para>
To process the result of one query in a pipeline, the application calls
<function>PQgetResult</function> repeatedly and handles each result
until <function>PQgetResult</function> returns null.
The result from the next query in the pipeline may then be retrieved using
<function>PQgetResult</function> again and the cycle repeated.
The application handles individual statement results as normal.
When the results of all the queries in the pipeline have been
returned, <function>PQgetResult</function> returns a result
containing the status value <literal>PGRES_PIPELINE_SYNC</literal>
</para>
<para>
The client may choose to defer result processing until the complete
pipeline has been sent, or interleave that with sending further
queries in the pipeline; see <xref linkend="libpq-pipeline-interleave"/>.
</para>
<para>
To enter single-row mode, call <function>PQsetSingleRowMode</function>
before retrieving results with <function>PQgetResult</function>.
This mode selection is effective only for the query currently
being processed. For more information on the use of
<function>PQsetSingleRowMode</function>,
refer to <xref linkend="libpq-single-row-mode"/>.
</para>
<para>
<function>PQgetResult</function> behaves the same as for normal
asynchronous processing except that it may contain the new
<type>PGresult</type> types <literal>PGRES_PIPELINE_SYNC</literal>
and <literal>PGRES_PIPELINE_ABORTED</literal>.
<literal>PGRES_PIPELINE_SYNC</literal> is reported exactly once for each
<function>PQpipelineSync</function> at the corresponding point
in the pipeline.
<literal>PGRES_PIPELINE_ABORTED</literal> is emitted in place of a normal
query result for the first error and all subsequent results
until the next <literal>PGRES_PIPELINE_SYNC</literal>;
see <xref linkend="libpq-pipeline-errors"/>.
</para>
<para>
<function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
operate as normal when processing pipeline results.
</para>
<para>
<application>libpq</application> does not provide any information to the
application about the query currently being processed (except that
<function>PQgetResult</function> returns null to indicate that we start
returning the results of next query). The application must keep track
of the order in which it sent queries, to associate them with their
corresponding results.
Applications will typically use a state machine or a FIFO queue for this.
</para>
</sect3>
<sect3 id="libpq-pipeline-errors">
<title>Error Handling</title>
<para>
From the client's perspective, after <function>PQresultStatus</function>
returns <literal>PGRES_FATAL_ERROR</literal>,
the pipeline is flagged as aborted.
<function>PQresultStatus</function> will report a
<literal>PGRES_PIPELINE_ABORTED</literal> result for each remaining queued
operation in an aborted pipeline. The result for
<function>PQpipelineSync</function> is reported as
<literal>PGRES_PIPELINE_SYNC</literal> to signal the end of the aborted pipeline
and resumption of normal result processing.
</para>
<para>
The client <emphasis>must</emphasis> process results with
<function>PQgetResult</function> during error recovery.
</para>
<para>
If the pipeline used an implicit transaction, then operations that have
already executed are rolled back and operations that were queued to follow
the failed operation are skipped entirely. The same behavior holds if the
pipeline starts and commits a single explicit transaction (i.e. the first
statement is <literal>BEGIN</literal> and the last is
<literal>COMMIT</literal>) except that the session remains in an aborted
transaction state at the end of the pipeline. If a pipeline contains
<emphasis>multiple explicit transactions</emphasis>, all transactions that
committed prior to the error remain committed, the currently in-progress
transaction is aborted, and all subsequent operations are skipped completely,
including subsequent transactions. If a pipeline synchronization point
occurs with an explicit transaction block in aborted state, the next pipeline
will become aborted immediately unless the next command puts the transaction
in normal mode with <command>ROLLBACK</command>.
</para>
<note>
<para>
The client must not assume that work is committed when it
<emphasis>sends</emphasis> a <literal>COMMIT</literal> &mdash; only when the
corresponding result is received to confirm the commit is complete.
Because errors arrive asynchronously, the application needs to be able to
restart from the last <emphasis>received</emphasis> committed change and
resend work done after that point if something goes wrong.
</para>
</note>
</sect3>
<sect3 id="libpq-pipeline-interleave">
<title>Interleaving Result Processing and Query Dispatch</title>
<para>
To avoid deadlocks on large pipelines the client should be structured
around a non-blocking event loop using operating system facilities
such as <function>select</function>, <function>poll</function>,
<function>WaitForMultipleObjectEx</function>, etc.
</para>
<para>
The client application should generally maintain a queue of work
remaining to be dispatched and a queue of work that has been dispatched
but not yet had its results processed. When the socket is writable
it should dispatch more work. When the socket is readable it should
read results and process them, matching them up to the next entry in
its corresponding results queue. Based on available memory, results from the
socket should be read frequently: there's no need to wait until the
pipeline end to read the results. Pipelines should be scoped to logical
units of work, usually (but not necessarily) one transaction per pipeline.
There's no need to exit pipeline mode and re-enter it between pipelines,
or to wait for one pipeline to finish before sending the next.
</para>
<para>
An example using <function>select()</function> and a simple state
machine to track sent and received work is in
<filename>src/test/modules/libpq_pipeline/libpq_pipeline.c</filename>
in the PostgreSQL source distribution.
</para>
</sect3>
</sect2>
<sect2 id="libpq-pipeline-functions">
<title>Functions Associated with Pipeline Mode</title>
<variablelist>
<varlistentry id="libpq-PQpipelineStatus">
<term><function>PQpipelineStatus</function><indexterm><primary>PQpipelineStatus</primary></indexterm></term>
<listitem>
<para>
Returns the current pipeline mode status of the
<application>libpq</application> connection.
<synopsis>
PGpipelineStatus PQpipelineStatus(const PGconn *conn);
</synopsis>
</para>
<para>
<function>PQpipelineStatus</function> can return one of the following values:
<variablelist>
<varlistentry>
<term>
<literal>PQ_PIPELINE_ON</literal>
</term>
<listitem>
<para>
The <application>libpq</application> connection is in
pipeline mode.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
<literal>PQ_PIPELINE_OFF</literal>
</term>
<listitem>
<para>
The <application>libpq</application> connection is
<emphasis>not</emphasis> in pipeline mode.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
<literal>PQ_PIPELINE_ABORTED</literal>
</term>
<listitem>
<para>
The <application>libpq</application> connection is in pipeline
mode and an error occurred while processing the current pipeline.
The aborted flag is cleared when <function>PQgetResult</function>
returns a result of type <literal>PGRES_PIPELINE_SYNC</literal>.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</listitem>
</varlistentry>
<varlistentry id="libpq-PQenterPipelineMode">
<term><function>PQenterPipelineMode</function><indexterm><primary>PQenterPipelineMode</primary></indexterm></term>
<listitem>
<para>
Causes a connection to enter pipeline mode if it is currently idle or
already in pipeline mode.
<synopsis>
int PQenterPipelineMode(PGconn *conn);
</synopsis>
</para>
<para>
Returns 1 for success.
Returns 0 and has no effect if the connection is not currently
idle, i.e., it has a result ready, or it is waiting for more
input from the server, etc.
This function does not actually send anything to the server,
it just changes the <application>libpq</application> connection
state.
</para>
</listitem>
</varlistentry>
<varlistentry id="libpq-PQexitPipelineMode">
<term><function>PQexitPipelineMode</function><indexterm><primary>PQexitPipelineMode</primary></indexterm></term>
<listitem>
<para>
Causes a connection to exit pipeline mode if it is currently in pipeline mode
with an empty queue and no pending results.
<synopsis>
int PQexitPipelineMode(PGconn *conn);
</synopsis>
</para>
<para>
Returns 1 for success. Returns 1 and takes no action if not in
pipeline mode. If the current statement isn't finished processing,
or <function>PQgetResult</function> has not been called to collect
results from all previously sent query, returns 0 (in which case,
use <xref linkend="libpq-PQerrorMessage"/> to get more information
about the failure).
</para>
</listitem>
</varlistentry>
<varlistentry id="libpq-PQpipelineSync">
<term><function>PQpipelineSync</function><indexterm><primary>PQpipelineSync</primary></indexterm></term>
<listitem>
<para>
Marks a synchronization point in a pipeline by sending a
<link linkend="protocol-flow-ext-query">sync message</link>
and flushing the send buffer. This serves as
the delimiter of an implicit transaction and an error recovery
point; see <xref linkend="libpq-pipeline-errors"/>.
<synopsis>
int PQpipelineSync(PGconn *conn);
</synopsis>
</para>
<para>
Returns 1 for success. Returns 0 if the connection is not in
pipeline mode or sending a
<link linkend="protocol-flow-ext-query">sync message</link>
failed.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect2>
<sect2 id="libpq-pipeline-tips">
<title>When to Use Pipeline Mode</title>
<para>
Much like asynchronous query mode, there is no meaningful performance
overhead when using pipeline mode. It increases client application complexity,
and extra caution is required to prevent client/server deadlocks, but
pipeline mode can offer considerable performance improvements, in exchange for
increased memory usage from leaving state around longer.
</para>
<para>
Pipeline mode is most useful when the server is distant, i.e., network latency
(<quote>ping time</quote>) is high, and also when many small operations
are being performed in rapid succession. There is usually less benefit
in using pipelined commands when each query takes many multiples of the client/server
round-trip time to execute. A 100-statement operation run on a server
300ms round-trip-time away would take 30 seconds in network latency alone
without pipelining; with pipelining it may spend as little as 0.3s waiting for
results from the server.
</para>
<para>
Use pipelined commands when your application does lots of small
<literal>INSERT</literal>, <literal>UPDATE</literal> and
<literal>DELETE</literal> operations that can't easily be transformed
into operations on sets, or into a <literal>COPY</literal> operation.
</para>
<para>
Pipeline mode is not useful when information from one operation is required by
the client to produce the next operation. In such cases, the client
would have to introduce a synchronization point and wait for a full client/server
round-trip to get the results it needs. However, it's often possible to
adjust the client design to exchange the required information server-side.
Read-modify-write cycles are especially good candidates; for example:
<programlisting>
BEGIN;
SELECT x FROM mytable WHERE id = 42 FOR UPDATE;
-- result: x=2
-- client adds 1 to x:
UPDATE mytable SET x = 3 WHERE id = 42;
COMMIT;
</programlisting>
could be much more efficiently done with:
<programlisting>
UPDATE mytable SET x = x + 1 WHERE id = 42;
</programlisting>
</para>
<para>
Pipelining is less useful, and more complex, when a single pipeline contains
multiple transactions (see <xref linkend="libpq-pipeline-errors"/>).
</para>
</sect2>
</sect1>
<sect1 id="libpq-single-row-mode">
<title>Retrieving Query Results Row-by-Row</title>
......@@ -4966,6 +5477,13 @@ int PQflush(PGconn *conn);
Each object should be freed with <xref linkend="libpq-PQclear"/> as usual.
</para>
<para>
When using pipeline mode, single-row mode needs to be activated for each
query in the pipeline before retrieving results for that query
with <function>PQgetResult</function>.
See <xref linkend="libpq-pipeline-mode"/> for more information.
</para>
<para>
<variablelist>
<varlistentry id="libpq-PQsetSingleRowMode">
......
......@@ -130,6 +130,10 @@
<application>libpq</application> library.
</para>
<para>
Client applications cannot use these functions while a libpq connection is in pipeline mode.
</para>
<sect2 id="lo-create">
<title>Creating a Large Object</title>
......
......@@ -1019,6 +1019,12 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
walres->err = _("empty query");
break;
case PGRES_PIPELINE_SYNC:
case PGRES_PIPELINE_ABORTED:
walres->status = WALRCV_ERROR;
walres->err = _("unexpected pipeline mode");
break;
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
case PGRES_BAD_RESPONSE:
......
......@@ -929,6 +929,8 @@ should_processing_continue(PGresult *res)
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
case PGRES_PIPELINE_SYNC:
case PGRES_PIPELINE_ABORTED:
return false;
}
return true;
......
......@@ -179,3 +179,7 @@ PQgetgssctx 176
PQsetSSLKeyPassHook_OpenSSL 177
PQgetSSLKeyPassHook_OpenSSL 178
PQdefaultSSLKeyPassHook_OpenSSL 179
PQenterPipelineMode 180
PQexitPipelineMode 181
PQpipelineSync 182
PQpipelineStatus 183
......@@ -522,6 +522,23 @@ pqDropConnection(PGconn *conn, bool flushInput)
}
}
/*
* pqFreeCommandQueue
* Free all the entries of PGcmdQueueEntry queue passed.
*/
static void
pqFreeCommandQueue(PGcmdQueueEntry *queue)
{
while (queue != NULL)
{
PGcmdQueueEntry *cur = queue;
queue = cur->next;
if (cur->query)
free(cur->query);
free(cur);
}
}
/*
* pqDropServerData
......@@ -553,6 +570,12 @@ pqDropServerData(PGconn *conn)
}
conn->notifyHead = conn->notifyTail = NULL;
pqFreeCommandQueue(conn->cmd_queue_head);
conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
pqFreeCommandQueue(conn->cmd_queue_recycle);
conn->cmd_queue_recycle = NULL;
/* Reset ParameterStatus data, as well as variables deduced from it */
pstatus = conn->pstatus;
while (pstatus != NULL)
......@@ -2459,6 +2482,7 @@ keep_going: /* We will come back to here until there is
/* Drop any PGresult we might have, too */
conn->asyncStatus = PGASYNC_IDLE;
conn->xactStatus = PQTRANS_IDLE;
conn->pipelineStatus = PQ_PIPELINE_OFF;
pqClearAsyncResult(conn);
/* Reset conn->status to put the state machine in the right state */
......@@ -3917,6 +3941,7 @@ makeEmptyPGconn(void)
conn->status = CONNECTION_BAD;
conn->asyncStatus = PGASYNC_IDLE;
conn->pipelineStatus = PQ_PIPELINE_OFF;
conn->xactStatus = PQTRANS_IDLE;
conn->options_valid = false;
conn->nonblocking = false;
......@@ -4084,8 +4109,6 @@ freePGconn(PGconn *conn)
if (conn->connip)
free(conn->connip);
/* Note that conn->Pfdebug is not ours to close or free */
if (conn->last_query)
free(conn->last_query);
if (conn->write_err_msg)
free(conn->write_err_msg);
if (conn->inBuffer)
......@@ -4174,6 +4197,7 @@ closePGconn(PGconn *conn)
conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just absent */
conn->asyncStatus = PGASYNC_IDLE;
conn->xactStatus = PQTRANS_IDLE;
conn->pipelineStatus = PQ_PIPELINE_OFF;
pqClearAsyncResult(conn); /* deallocate result */
resetPQExpBuffer(&conn->errorMessage);
release_conn_addrinfo(conn);
......@@ -6726,6 +6750,15 @@ PQbackendPID(const PGconn *conn)
return conn->be_pid;
}
PGpipelineStatus
PQpipelineStatus(const PGconn *conn)
{
if (!conn)
return PQ_PIPELINE_OFF;
return conn->pipelineStatus;
}
int
PQconnectionNeedsPassword(const PGconn *conn)
{
......
......@@ -39,7 +39,9 @@ char *const pgresStatus[] = {
"PGRES_NONFATAL_ERROR",
"PGRES_FATAL_ERROR",
"PGRES_COPY_BOTH",
"PGRES_SINGLE_TUPLE"
"PGRES_SINGLE_TUPLE",
"PGRES_PIPELINE_SYNC",
"PGRES_PIPELINE_ABORTED"
};
/*
......@@ -71,6 +73,8 @@ static PGresult *PQexecFinish(PGconn *conn);
static int PQsendDescribe(PGconn *conn, char desc_type,
const char *desc_target);
static int check_field_number(const PGresult *res, int field_num);
static void pqPipelineProcessQueue(PGconn *conn);
static int pqPipelineFlush(PGconn *conn);
/* ----------------
......@@ -1171,7 +1175,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
conn->next_result = conn->result;
conn->result = res;
/* And mark the result ready to return */
conn->asyncStatus = PGASYNC_READY;
conn->asyncStatus = PGASYNC_READY_MORE;
}
return 1;
......@@ -1184,6 +1188,87 @@ fail:
}
/*
* pqAllocCmdQueueEntry
* Get a command queue entry for caller to fill.
*
* If the recycle queue has a free element, that is returned; if not, a
* fresh one is allocated. Caller is responsible for adding it to the
* command queue (pqAppendCmdQueueEntry) once the struct is filled in, or
* releasing the memory (pqRecycleCmdQueueEntry) if an error occurs.
*
* If allocation fails, sets the error message and returns NULL.
*/
static PGcmdQueueEntry *
pqAllocCmdQueueEntry(PGconn *conn)
{
PGcmdQueueEntry *entry;
if (conn->cmd_queue_recycle == NULL)
{
entry = (PGcmdQueueEntry *) malloc(sizeof(PGcmdQueueEntry));
if (entry == NULL)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("out of memory\n"));
return NULL;
}
}
else
{
entry = conn->cmd_queue_recycle;
conn->cmd_queue_recycle = entry->next;
}
entry->next = NULL;
entry->query = NULL;
return entry;
}
/*
* pqAppendCmdQueueEntry
* Append a caller-allocated command queue entry to the queue.
*
* The query itself must already have been put in the output buffer by the
* caller.
*/
static void
pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
{
Assert(entry->next == NULL);
if (conn->cmd_queue_head == NULL)
conn->cmd_queue_head = entry;
else
conn->cmd_queue_tail->next = entry;
conn->cmd_queue_tail = entry;
}
/*
* pqRecycleCmdQueueEntry
* Push a command queue entry onto the freelist.
*/
static void
pqRecycleCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
{
if (entry == NULL)
return;
/* recyclable entries should not have a follow-on command */
Assert(entry->next == NULL);
if (entry->query)
{
free(entry->query);
entry->query = NULL;
}
entry->next = conn->cmd_queue_recycle;
conn->cmd_queue_recycle = entry;
}
/*
* PQsendQuery
* Submit a query, but don't wait for it to finish
......@@ -1209,9 +1294,15 @@ PQsendQueryContinue(PGconn *conn, const char *query)
static int
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
{
PGcmdQueueEntry *entry = NULL;
if (!PQsendQueryStart(conn, newQuery))
return 0;
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
/* check the argument */
if (!query)
{
......@@ -1220,6 +1311,9 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
return 0;
}
/* Send the query message(s) */
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
/* construct the outgoing Query message */
if (pqPutMsgStart('Q', conn) < 0 ||
pqPuts(query, conn) < 0 ||
......@@ -1230,27 +1324,62 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
}
/* remember we are using simple query protocol */
conn->queryclass = PGQUERY_SIMPLE;
entry->queryclass = PGQUERY_SIMPLE;
/* and remember the query text too, if possible */
/* if insufficient memory, last_query just winds up NULL */
if (conn->last_query)
free(conn->last_query);
conn->last_query = strdup(query);
entry->query = strdup(query);
}
else
{
/*
* In pipeline mode we cannot use the simple protocol, so we send
* Parse, Bind, Describe Portal, Execute.
*/
if (pqPutMsgStart('P', conn) < 0 ||
pqPuts("", conn) < 0 ||
pqPuts(query, conn) < 0 ||
pqPutInt(0, 2, conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
if (pqPutMsgStart('B', conn) < 0 ||
pqPuts("", conn) < 0 ||
pqPuts("", conn) < 0 ||
pqPutInt(0, 2, conn) < 0 ||
pqPutInt(0, 2, conn) < 0 ||
pqPutInt(0, 2, conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
if (pqPutMsgStart('D', conn) < 0 ||
pqPutc('P', conn) < 0 ||
pqPuts("", conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
if (pqPutMsgStart('E', conn) < 0 ||
pqPuts("", conn) < 0 ||
pqPutInt(0, 4, conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
entry->queryclass = PGQUERY_EXTENDED;
entry->query = strdup(query);
}
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
*/
if (pqFlush(conn) < 0)
{
/* error message should be set up already */
return 0;
}
if (pqPipelineFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
/*
......@@ -1307,6 +1436,8 @@ PQsendPrepare(PGconn *conn,
const char *stmtName, const char *query,
int nParams, const Oid *paramTypes)
{
PGcmdQueueEntry *entry = NULL;
if (!PQsendQueryStart(conn, true))
return 0;
......@@ -1330,6 +1461,10 @@ PQsendPrepare(PGconn *conn,
return 0;
}
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
/* construct the Parse message */
if (pqPutMsgStart('P', conn) < 0 ||
pqPuts(stmtName, conn) < 0 ||
......@@ -1356,32 +1491,38 @@ PQsendPrepare(PGconn *conn,
if (pqPutMsgEnd(conn) < 0)
goto sendFailed;
/* construct the Sync message */
/* Add a Sync, unless in pipeline mode. */
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
}
/* remember we are doing just a Parse */
conn->queryclass = PGQUERY_PREPARE;
entry->queryclass = PGQUERY_PREPARE;
/* and remember the query text too, if possible */
/* if insufficient memory, last_query just winds up NULL */
if (conn->last_query)
free(conn->last_query);
conn->last_query = strdup(query);
/* if insufficient memory, query just winds up NULL */
entry->query = strdup(query);
pqAppendCmdQueueEntry(conn, entry);
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
conn->asyncStatus = PGASYNC_BUSY;
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
* Give the data a push (in pipeline mode, only if we're past the size
* threshold). In nonblock mode, don't complain if we're unable to send
* it all; PQgetResult() will do any additional flushing needed.
*/
if (pqFlush(conn) < 0)
if (pqPipelineFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
......@@ -1429,6 +1570,7 @@ PQsendQueryPrepared(PGconn *conn,
}
/*
* PQsendQueryStart
* Common startup code for PQsendQuery and sibling routines
*/
static bool
......@@ -1450,20 +1592,57 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
libpq_gettext("no connection to the server\n"));
return false;
}
/* Can't send while already busy, either. */
if (conn->asyncStatus != PGASYNC_IDLE)
/* Can't send while already busy, either, unless enqueuing for later */
if (conn->asyncStatus != PGASYNC_IDLE &&
conn->pipelineStatus == PQ_PIPELINE_OFF)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("another command is already in progress\n"));
return false;
}
/* initialize async result-accumulation state */
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
/*
* When enqueuing commands we don't change much of the connection
* state since it's already in use for the current command. The
* connection state will get updated when pqPipelineProcessQueue()
* advances to start processing the queued message.
*
* Just make sure we can safely enqueue given the current connection
* state. We can enqueue behind another queue item, or behind a
* non-queue command (one that sends its own sync), but we can't
* enqueue if the connection is in a copy state.
*/
switch (conn->asyncStatus)
{
case PGASYNC_IDLE:
case PGASYNC_READY:
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
/* ok to queue */
break;
case PGASYNC_COPY_IN:
case PGASYNC_COPY_OUT:
case PGASYNC_COPY_BOTH:
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot queue commands during COPY\n"));
return false;
}
}
else
{
/*
* This command's results will come in immediately. Initialize async
* result-accumulation state
*/
pqClearAsyncResult(conn);
/* reset single-row processing mode */
conn->singleRowMode = false;
}
/* ready to send command message */
return true;
}
......@@ -1487,10 +1666,16 @@ PQsendQueryGuts(PGconn *conn,
int resultFormat)
{
int i;
PGcmdQueueEntry *entry;
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
/*
* We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
* using specified statement name and the unnamed portal.
* We will send Parse (if needed), Bind, Describe Portal, Execute, Sync
* (if not in pipeline mode), using specified statement name and the
* unnamed portal.
*/
if (command)
......@@ -1600,35 +1785,38 @@ PQsendQueryGuts(PGconn *conn,
pqPutMsgEnd(conn) < 0)
goto sendFailed;
/* construct the Sync message */
/* construct the Sync message if not in pipeline mode */
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
}
/* remember we are using extended query protocol */
conn->queryclass = PGQUERY_EXTENDED;
entry->queryclass = PGQUERY_EXTENDED;
/* and remember the query text too, if possible */
/* if insufficient memory, last_query just winds up NULL */
if (conn->last_query)
free(conn->last_query);
/* if insufficient memory, query just winds up NULL */
if (command)
conn->last_query = strdup(command);
else
conn->last_query = NULL;
entry->query = strdup(command);
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
* Give the data a push (in pipeline mode, only if we're past the size
* threshold). In nonblock mode, don't complain if we're unable to send
* it all; PQgetResult() will do any additional flushing needed.
*/
if (pqFlush(conn) < 0)
if (pqPipelineFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
......@@ -1647,8 +1835,9 @@ PQsetSingleRowMode(PGconn *conn)
return 0;
if (conn->asyncStatus != PGASYNC_BUSY)
return 0;
if (conn->queryclass != PGQUERY_SIMPLE &&
conn->queryclass != PGQUERY_EXTENDED)
if (!conn->cmd_queue_head ||
(conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
return 0;
if (conn->result)
return 0;
......@@ -1726,14 +1915,17 @@ PQisBusy(PGconn *conn)
return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed;
}
/*
* PQgetResult
* Get the next PGresult produced by a query. Returns NULL if no
* query work remains or an error has occurred (e.g. out of
* memory).
*
* In pipeline mode, once all the result of a query have been returned,
* PQgetResult returns NULL to let the user know that the next
* query is being processed. At the end of the pipeline, returns a
* result with PQresultStatus(result) == PGRES_PIPELINE_SYNC.
*/
PGresult *
PQgetResult(PGconn *conn)
{
......@@ -1803,8 +1995,62 @@ PQgetResult(PGconn *conn)
{
case PGASYNC_IDLE:
res = NULL; /* query is complete */
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
/*
* We're about to return the NULL that terminates the round of
* results from the current query; prepare to send the results
* of the next query when we're called next. Also, since this
* is the start of the results of the next query, clear any
* prior error message.
*/
resetPQExpBuffer(&conn->errorMessage);
pqPipelineProcessQueue(conn);
}
break;
case PGASYNC_READY:
/*
* For any query type other than simple query protocol, we advance
* the command queue here. This is because for simple query
* protocol we can get the READY state multiple times before the
* command is actually complete, since the command string can
* contain many queries. In simple query protocol, the queue
* advance is done by fe-protocol3 when it receives ReadyForQuery.
*/
if (conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
pqCommandQueueAdvance(conn);
res = pqPrepareAsyncResult(conn);
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
/*
* We're about to send the results of the current query. Set
* us idle now, and ...
*/
conn->asyncStatus = PGASYNC_IDLE;
/*
* ... in cases when we're sending a pipeline-sync result,
* move queue processing forwards immediately, so that next
* time we're called, we're prepared to return the next result
* received from the server. In all other cases, leave the
* queue state change for next time, so that a terminating
* NULL result is sent.
*
* (In other words: we don't return a NULL after a pipeline
* sync.)
*/
if (res && res->resultStatus == PGRES_PIPELINE_SYNC)
pqPipelineProcessQueue(conn);
}
else
{
/* Set the state back to BUSY, allowing parsing to proceed. */
conn->asyncStatus = PGASYNC_BUSY;
}
break;
case PGASYNC_READY_MORE:
res = pqPrepareAsyncResult(conn);
/* Set the state back to BUSY, allowing parsing to proceed. */
conn->asyncStatus = PGASYNC_BUSY;
......@@ -1985,6 +2231,13 @@ PQexecStart(PGconn *conn)
if (!conn)
return false;
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("synchronous command execution functions are not allowed in pipeline mode\n"));
return false;
}
/*
* Since this is the beginning of a query cycle, reset the error buffer.
*/
......@@ -2148,6 +2401,8 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
static int
PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
{
PGcmdQueueEntry *entry = NULL;
/* Treat null desc_target as empty string */
if (!desc_target)
desc_target = "";
......@@ -2155,6 +2410,10 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
if (!PQsendQueryStart(conn, true))
return 0;
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
/* construct the Describe message */
if (pqPutMsgStart('D', conn) < 0 ||
pqPutc(desc_type, conn) < 0 ||
......@@ -2163,32 +2422,32 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
goto sendFailed;
/* construct the Sync message */
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
}
/* remember we are doing a Describe */
conn->queryclass = PGQUERY_DESCRIBE;
/* reset last_query string (not relevant now) */
if (conn->last_query)
{
free(conn->last_query);
conn->last_query = NULL;
}
entry->queryclass = PGQUERY_DESCRIBE;
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
* Give the data a push (in pipeline mode, only if we're past the size
* threshold). In nonblock mode, don't complain if we're unable to send
* it all; PQgetResult() will do any additional flushing needed.
*/
if (pqFlush(conn) < 0)
if (pqPipelineFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
......@@ -2327,7 +2586,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
* If we sent the COPY command in extended-query mode, we must issue a
* Sync as well.
*/
if (conn->queryclass != PGQUERY_SIMPLE)
if (conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
{
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
......@@ -2541,6 +2801,13 @@ PQfn(PGconn *conn,
*/
resetPQExpBuffer(&conn->errorMessage);
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("PQfn not allowed in pipeline mode\n"));
return NULL;
}
if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE ||
conn->result != NULL)
{
......@@ -2555,6 +2822,277 @@ PQfn(PGconn *conn,
args, nargs);
}
/* ====== Pipeline mode support ======== */
/*
* PQenterPipelineMode
* Put an idle connection in pipeline mode.
*
* Returns 1 on success. On failure, errorMessage is set and 0 is returned.
*
* Commands submitted after this can be pipelined on the connection;
* there's no requirement to wait for one to finish before the next is
* dispatched.
*
* Queuing of a new query or syncing during COPY is not allowed.
*
* A set of commands is terminated by a PQpipelineSync. Multiple sync
* points can be established while in pipeline mode. Pipeline mode can
* be exited by calling PQexitPipelineMode() once all results are processed.
*
* This doesn't actually send anything on the wire, it just puts libpq
* into a state where it can pipeline work.
*/
int
PQenterPipelineMode(PGconn *conn)
{
if (!conn)
return 0;
/* succeed with no action if already in pipeline mode */
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
return 1;
if (conn->asyncStatus != PGASYNC_IDLE)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot enter pipeline mode, connection not idle\n"));
return 0;
}
conn->pipelineStatus = PQ_PIPELINE_ON;
return 1;
}
/*
* PQexitPipelineMode
* End pipeline mode and return to normal command mode.
*
* Returns 1 in success (pipeline mode successfully ended, or not in pipeline
* mode).
*
* Returns 0 if in pipeline mode and cannot be ended yet. Error message will
* be set.
*/
int
PQexitPipelineMode(PGconn *conn)
{
if (!conn)
return 0;
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
return 1;
switch (conn->asyncStatus)
{
case PGASYNC_READY:
case PGASYNC_READY_MORE:
/* there are some uncollected results */
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
return 0;
case PGASYNC_BUSY:
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot exit pipeline mode while busy\n"));
return 0;
default:
/* OK */
break;
}
/* still work to process */
if (conn->cmd_queue_head != NULL)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
return 0;
}
conn->pipelineStatus = PQ_PIPELINE_OFF;
conn->asyncStatus = PGASYNC_IDLE;
/* Flush any pending data in out buffer */
if (pqFlush(conn) < 0)
return 0; /* error message is setup already */
return 1;
}
/*
* pqCommandQueueAdvance
* Remove one query from the command queue, when we receive
* all results from the server that pertain to it.
*/
void
pqCommandQueueAdvance(PGconn *conn)
{
PGcmdQueueEntry *prevquery;
if (conn->cmd_queue_head == NULL)
return;
/* delink from queue */
prevquery = conn->cmd_queue_head;
conn->cmd_queue_head = conn->cmd_queue_head->next;
/* and make it recyclable */
prevquery->next = NULL;
pqRecycleCmdQueueEntry(conn, prevquery);
}
/*
* pqPipelineProcessQueue: subroutine for PQgetResult
* In pipeline mode, start processing the results of the next query in the queue.
*/
void
pqPipelineProcessQueue(PGconn *conn)
{
switch (conn->asyncStatus)
{
case PGASYNC_COPY_IN:
case PGASYNC_COPY_OUT:
case PGASYNC_COPY_BOTH:
case PGASYNC_READY:
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
/* client still has to process current query or results */
return;
case PGASYNC_IDLE:
/* next query please */
break;
}
/* Nothing to do if not in pipeline mode, or queue is empty */
if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
conn->cmd_queue_head == NULL)
return;
/* Initialize async result-accumulation state */
pqClearAsyncResult(conn);
/*
* Reset single-row processing mode. (Client has to set it up for each
* query, if desired.)
*/
conn->singleRowMode = false;
if (conn->pipelineStatus == PQ_PIPELINE_ABORTED &&
conn->cmd_queue_head->queryclass != PGQUERY_SYNC)
{
/*
* In an aborted pipeline we don't get anything from the server for
* each result; we're just discarding commands from the queue until we
* get to the next sync from the server.
*
* The PGRES_PIPELINE_ABORTED results tell the client that its queries
* got aborted.
*/
conn->result = PQmakeEmptyPGresult(conn, PGRES_PIPELINE_ABORTED);
if (!conn->result)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("out of memory\n"));
pqSaveErrorResult(conn);
return;
}
conn->asyncStatus = PGASYNC_READY;
}
else
{
/* allow parsing to continue */
conn->asyncStatus = PGASYNC_BUSY;
}
}
/*
* PQpipelineSync
* Send a Sync message as part of a pipeline, and flush to server
*
* It's legal to start submitting more commands in the pipeline immediately,
* without waiting for the results of the current pipeline. There's no need to
* end pipeline mode and start it again.
*
* If a command in a pipeline fails, every subsequent command up to and including
* the result to the Sync message sent by PQpipelineSync gets set to
* PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without
* error, a PGresult with PGRES_PIPELINE_SYNC is produced.
*
* Queries can already have been sent before PQpipelineSync is called, but
* PQpipelineSync need to be called before retrieving command results.
*
* The connection will remain in pipeline mode and unavailable for new
* synchronous command execution functions until all results from the pipeline
* are processed by the client.
*/
int
PQpipelineSync(PGconn *conn)
{
PGcmdQueueEntry *entry;
if (!conn)
return 0;
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot send pipeline when not in pipeline mode\n"));
return 0;
}
switch (conn->asyncStatus)
{
case PGASYNC_COPY_IN:
case PGASYNC_COPY_OUT:
case PGASYNC_COPY_BOTH:
/* should be unreachable */
appendPQExpBufferStr(&conn->errorMessage,
"internal error: cannot send pipeline while in COPY\n");
return 0;
case PGASYNC_READY:
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
case PGASYNC_IDLE:
/* OK to send sync */
break;
}
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
entry->queryclass = PGQUERY_SYNC;
entry->query = NULL;
/* construct the Sync message */
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
pqAppendCmdQueueEntry(conn, entry);
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
*/
if (PQflush(conn) < 0)
goto sendFailed;
/*
* Call pqPipelineProcessQueue so the user can call start calling
* PQgetResult.
*/
pqPipelineProcessQueue(conn);
return 1;
sendFailed:
pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
/* ====== accessor funcs for PGresult ======== */
......@@ -2569,7 +3107,7 @@ PQresultStatus(const PGresult *res)
char *
PQresStatus(ExecStatusType status)
{
if ((unsigned int) status >= sizeof pgresStatus / sizeof pgresStatus[0])
if ((unsigned int) status >= lengthof(pgresStatus))
return libpq_gettext("invalid ExecStatusType code");
return pgresStatus[status];
}
......@@ -3152,6 +3690,23 @@ PQflush(PGconn *conn)
return pqFlush(conn);
}
/*
* pqPipelineFlush
*
* In pipeline mode, data will be flushed only when the out buffer reaches the
* threshold value. In non-pipeline mode, it behaves as stock pqFlush.
*
* Returns 0 on success.
*/
static int
pqPipelineFlush(PGconn *conn)
{
if ((conn->pipelineStatus != PQ_PIPELINE_ON) ||
(conn->outCount >= OUTBUFFER_THRESHOLD))
return pqFlush(conn);
return 0;
}
/*
* PQfreemem - safely frees memory allocated
......
......@@ -158,6 +158,18 @@ pqParseInput3(PGconn *conn)
if (conn->asyncStatus != PGASYNC_IDLE)
return;
/*
* We're also notionally not-IDLE when in pipeline mode the state
* says "idle" (so we have completed receiving the results of one
* query from the server and dispatched them to the application)
* but another query is queued; yield back control to caller so
* that they can initiate processing of the next query in the
* queue.
*/
if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
conn->cmd_queue_head != NULL)
return;
/*
* Unexpected message in IDLE state; need to recover somehow.
* ERROR messages are handled using the notice processor;
......@@ -179,6 +191,7 @@ pqParseInput3(PGconn *conn)
}
else
{
/* Any other case is unexpected and we summarily skip it */
pqInternalNotice(&conn->noticeHooks,
"message type 0x%02x arrived from server while idle",
id);
......@@ -217,10 +230,37 @@ pqParseInput3(PGconn *conn)
return;
conn->asyncStatus = PGASYNC_READY;
break;
case 'Z': /* backend is ready for new query */
case 'Z': /* sync response, backend is ready for new
* query */
if (getReadyForQuery(conn))
return;
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
conn->result = PQmakeEmptyPGresult(conn,
PGRES_PIPELINE_SYNC);
if (!conn->result)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("out of memory"));
pqSaveErrorResult(conn);
}
else
{
conn->pipelineStatus = PQ_PIPELINE_ON;
conn->asyncStatus = PGASYNC_READY;
}
}
else
{
/*
* In simple query protocol, advance the command queue
* (see PQgetResult).
*/
if (conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE)
pqCommandQueueAdvance(conn);
conn->asyncStatus = PGASYNC_IDLE;
}
break;
case 'I': /* empty query */
if (conn->result == NULL)
......@@ -238,7 +278,8 @@ pqParseInput3(PGconn *conn)
break;
case '1': /* Parse Complete */
/* If we're doing PQprepare, we're done; else ignore */
if (conn->queryclass == PGQUERY_PREPARE)
if (conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_PREPARE)
{
if (conn->result == NULL)
{
......@@ -285,7 +326,8 @@ pqParseInput3(PGconn *conn)
conn->inCursor += msgLength;
}
else if (conn->result == NULL ||
conn->queryclass == PGQUERY_DESCRIBE)
(conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
{
/* First 'T' in a query sequence */
if (getRowDescriptions(conn, msgLength))
......@@ -316,7 +358,8 @@ pqParseInput3(PGconn *conn)
* instead of PGRES_TUPLES_OK. Otherwise we can just
* ignore this message.
*/
if (conn->queryclass == PGQUERY_DESCRIBE)
if (conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)
{
if (conn->result == NULL)
{
......@@ -445,7 +488,7 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
id, msgLength);
/* build an error result holding the error message */
pqSaveErrorResult(conn);
conn->asyncStatus = PGASYNC_READY; /* drop out of GetResult wait loop */
conn->asyncStatus = PGASYNC_READY; /* drop out of PQgetResult wait loop */
/* flush input data since we're giving up on processing it */
pqDropConnection(conn, true);
conn->status = CONNECTION_BAD; /* No more connection to backend */
......@@ -471,7 +514,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
* PGresult created by getParamDescriptions, and we should fill data into
* that. Otherwise, create a new, empty PGresult.
*/
if (conn->queryclass == PGQUERY_DESCRIBE)
if (!conn->cmd_queue_head ||
(conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
{
if (conn->result)
result = conn->result;
......@@ -568,7 +613,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
* If we're doing a Describe, we're done, and ready to pass the result
* back to the client.
*/
if (conn->queryclass == PGQUERY_DESCRIBE)
if ((!conn->cmd_queue_head) ||
(conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
{
conn->asyncStatus = PGASYNC_READY;
return 0;
......@@ -841,6 +888,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
PQExpBufferData workBuf;
char id;
/* If in pipeline mode, set error indicator for it */
if (isError && conn->pipelineStatus != PQ_PIPELINE_OFF)
conn->pipelineStatus = PQ_PIPELINE_ABORTED;
/*
* If this is an error message, pre-emptively clear any incomplete query
* result we may have. We'd just throw it away below anyway, and
......@@ -897,8 +948,8 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
* might need it for an error cursor display, which is only true if there
* is a PG_DIAG_STATEMENT_POSITION field.
*/
if (have_position && conn->last_query && res)
res->errQuery = pqResultStrdup(res, conn->last_query);
if (have_position && res && conn->cmd_queue_head && conn->cmd_queue_head->query)
res->errQuery = pqResultStrdup(res, conn->cmd_queue_head->query);
/*
* Now build the "overall" error message for PQresultErrorMessage.
......@@ -1817,7 +1868,8 @@ pqEndcopy3(PGconn *conn)
* If we sent the COPY command in extended-query mode, we must issue a
* Sync as well.
*/
if (conn->queryclass != PGQUERY_SIMPLE)
if (conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
{
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
......@@ -1897,6 +1949,9 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
int avail;
int i;
/* already validated by PQfn */
Assert(conn->pipelineStatus == PQ_PIPELINE_OFF);
/* PQfn already validated connection state */
if (pqPutMsgStart('F', conn) < 0 || /* function call msg */
......
......@@ -96,7 +96,10 @@ typedef enum
PGRES_NONFATAL_ERROR, /* notice or warning message */
PGRES_FATAL_ERROR, /* query failed */
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
PGRES_SINGLE_TUPLE /* single tuple from larger resultset */
PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */
PGRES_PIPELINE_SYNC, /* pipeline synchronization point */
PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort
* earlier in a pipeline */
} ExecStatusType;
typedef enum
......@@ -136,6 +139,16 @@ typedef enum
PQPING_NO_ATTEMPT /* connection not attempted (bad params) */
} PGPing;
/*
* PGpipelineStatus - Current status of pipeline mode
*/
typedef enum
{
PQ_PIPELINE_OFF,
PQ_PIPELINE_ON,
PQ_PIPELINE_ABORTED
} PGpipelineStatus;
/* PGconn encapsulates a connection to the backend.
* The contents of this struct are not supposed to be known to applications.
*/
......@@ -327,6 +340,7 @@ extern int PQserverVersion(const PGconn *conn);
extern char *PQerrorMessage(const PGconn *conn);
extern int PQsocket(const PGconn *conn);
extern int PQbackendPID(const PGconn *conn);
extern PGpipelineStatus PQpipelineStatus(const PGconn *conn);
extern int PQconnectionNeedsPassword(const PGconn *conn);
extern int PQconnectionUsedPassword(const PGconn *conn);
extern int PQclientEncoding(const PGconn *conn);
......@@ -434,6 +448,11 @@ extern PGresult *PQgetResult(PGconn *conn);
extern int PQisBusy(PGconn *conn);
extern int PQconsumeInput(PGconn *conn);
/* Routines for pipeline mode management */
extern int PQenterPipelineMode(PGconn *conn);
extern int PQexitPipelineMode(PGconn *conn);
extern int PQpipelineSync(PGconn *conn);
/* LISTEN/NOTIFY support */
extern PGnotify *PQnotifies(PGconn *conn);
......
......@@ -217,21 +217,16 @@ typedef enum
{
PGASYNC_IDLE, /* nothing's happening, dude */
PGASYNC_BUSY, /* query in progress */
PGASYNC_READY, /* result ready for PQgetResult */
PGASYNC_READY, /* query done, waiting for client to fetch
* result */
PGASYNC_READY_MORE, /* query done, waiting for client to fetch
* result, more results expected from this
* query */
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
} PGAsyncStatusType;
/* PGQueryClass tracks which query protocol we are now executing */
typedef enum
{
PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
PGQUERY_PREPARE, /* Parse only (PQprepare) */
PGQUERY_DESCRIBE /* Describe Statement or Portal */
} PGQueryClass;
/* Target server type (decoded value of target_session_attrs) */
typedef enum
{
......@@ -305,6 +300,29 @@ typedef enum pg_conn_host_type
CHT_UNIX_SOCKET
} pg_conn_host_type;
/*
* PGQueryClass tracks which query protocol is in use for each command queue
* entry, or special operation in execution
*/
typedef enum
{
PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
PGQUERY_PREPARE, /* Parse only (PQprepare) */
PGQUERY_DESCRIBE, /* Describe Statement or Portal */
PGQUERY_SYNC /* Sync (at end of a pipeline) */
} PGQueryClass;
/*
* An entry in the pending command queue.
*/
typedef struct PGcmdQueueEntry
{
PGQueryClass queryclass; /* Query type */
char *query; /* SQL command, or NULL if none/unknown/OOM */
struct PGcmdQueueEntry *next; /* list link */
} PGcmdQueueEntry;
/*
* pg_conn_host stores all information about each of possibly several hosts
* mentioned in the connection string. Most fields are derived by splitting
......@@ -389,12 +407,11 @@ struct pg_conn
ConnStatusType status;
PGAsyncStatusType asyncStatus;
PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
PGQueryClass queryclass;
char *last_query; /* last SQL command, or NULL if unknown */
char last_sqlstate[6]; /* last reported SQLSTATE */
bool options_valid; /* true if OK to attempt connection */
bool nonblocking; /* whether this connection is using nonblock
* sending semantics */
PGpipelineStatus pipelineStatus; /* status of pipeline mode */
bool singleRowMode; /* return current query result row-by-row? */
char copy_is_binary; /* 1 = copy binary, 0 = copy text */
int copy_already_done; /* # bytes already returned in COPY OUT */
......@@ -407,6 +424,19 @@ struct pg_conn
pg_conn_host *connhost; /* details about each named host */
char *connip; /* IP address for current network connection */
/*
* The pending command queue as a singly-linked list. Head is the command
* currently in execution, tail is where new commands are added.
*/
PGcmdQueueEntry *cmd_queue_head;
PGcmdQueueEntry *cmd_queue_tail;
/*
* To save malloc traffic, we don't free entries right away; instead we
* save them in this list for possible reuse.
*/
PGcmdQueueEntry *cmd_queue_recycle;
/* Connection data */
pgsocket sock; /* FD for socket, PGINVALID_SOCKET if
* unconnected */
......@@ -622,6 +652,7 @@ extern void pqSaveMessageField(PGresult *res, char code,
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
const char *value);
extern int pqRowProcessor(PGconn *conn, const char **errmsgp);
extern void pqCommandQueueAdvance(PGconn *conn);
extern int PQsendQueryContinue(PGconn *conn, const char *query);
/* === in fe-protocol3.c === */
......@@ -795,6 +826,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len);
*/
#define pqIsnonblocking(conn) ((conn)->nonblocking)
/*
* Connection's outbuffer threshold, for pipeline mode.
*/
#define OUTBUFFER_THRESHOLD 65536
#ifdef ENABLE_NLS
extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1);
extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2);
......
......@@ -10,6 +10,7 @@ SUBDIRS = \
delay_execution \
dummy_index_am \
dummy_seclabel \
libpq_pipeline \
plsample \
snapshot_too_old \
test_bloomfilter \
......
# Generated subdirectories
/log/
/results/
/tmp_check/
/libpq_pipeline
# src/test/modules/libpq_pipeline/Makefile
PROGRAM = libpq_pipeline
OBJS = libpq_pipeline.o
PG_CPPFLAGS = -I$(libpq_srcdir)
PG_LIBS_INTERNAL += $(libpq_pgport)
TAP_TESTS = 1
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = src/test/modules/libpq_pipeline
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif
Test programs and libraries for libpq
/*-------------------------------------------------------------------------
*
* libpq_pipeline.c
* Verify libpq pipeline execution functionality
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/test/modules/libpq_pipeline/libpq_pipeline.c
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <sys/time.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
#include "catalog/pg_type_d.h"
#include "common/fe_memutils.h"
#include "libpq-fe.h"
#include "portability/instr_time.h"
static void exit_nicely(PGconn *conn);
const char *const progname = "libpq_pipeline";
#define DEBUG
#ifdef DEBUG
#define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
#else
#define pg_debug(...)
#endif
static const char *const drop_table_sql =
"DROP TABLE IF EXISTS pq_pipeline_demo";
static const char *const create_table_sql =
"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer);";
static const char *const insert_sql =
"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1);";
/* max char length of an int32, plus sign and null terminator */
#define MAXINTLEN 12
static void
exit_nicely(PGconn *conn)
{
PQfinish(conn);
exit(1);
}
/*
* Print an error to stderr and terminate the program.
*/
#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
static void
pg_fatal_impl(int line, const char *fmt,...)
{
va_list args;
fflush(stdout);
fprintf(stderr, "\n%s:%d: ", progname, line);
va_start(args, fmt);
vfprintf(stderr, fmt, args);
va_end(args);
Assert(fmt[strlen(fmt) - 1] != '\n');
fprintf(stderr, "\n");
exit(1);
}
static void
test_disallowed_in_pipeline(PGconn *conn)
{
PGresult *res = NULL;
fprintf(stderr, "test error cases... ");
if (PQisnonblocking(conn))
pg_fatal("Expected blocking connection mode");
if (PQenterPipelineMode(conn) != 1)
pg_fatal("Unable to enter pipeline mode");
if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
pg_fatal("Pipeline mode not activated properly");
/* PQexec should fail in pipeline mode */
res = PQexec(conn, "SELECT 1");
if (PQresultStatus(res) != PGRES_FATAL_ERROR)
pg_fatal("PQexec should fail in pipeline mode but succeeded");
/* Entering pipeline mode when already in pipeline mode is OK */
if (PQenterPipelineMode(conn) != 1)
pg_fatal("re-entering pipeline mode should be a no-op but failed");
if (PQisBusy(conn) != 0)
pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
/* ok, back to normal command mode */
if (PQexitPipelineMode(conn) != 1)
pg_fatal("couldn't exit idle empty pipeline mode");
if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
pg_fatal("Pipeline mode not terminated properly");
/* exiting pipeline mode when not in pipeline mode should be a no-op */
if (PQexitPipelineMode(conn) != 1)
pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
/* can now PQexec again */
res = PQexec(conn, "SELECT 1");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
PQerrorMessage(conn));
fprintf(stderr, "ok\n");
}
static void
test_multi_pipelines(PGconn *conn)
{
PGresult *res = NULL;
const char *dummy_params[1] = {"1"};
Oid dummy_param_oids[1] = {INT4OID};
fprintf(stderr, "multi pipeline... ");
/*
* Queue up a couple of small pipelines and process each without returning
* to command mode first.
*/
if (PQenterPipelineMode(conn) != 1)
pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
dummy_params, NULL, NULL, 0) != 1)
pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
if (PQpipelineSync(conn) != 1)
pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
dummy_params, NULL, NULL, 0) != 1)
pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
/* OK, start processing the results */
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("Unexpected result code %s from first pipeline item",
PQresStatus(PQresultStatus(res)));
PQclear(res);
res = NULL;
if (PQgetResult(conn) != NULL)
pg_fatal("PQgetResult returned something extra after first result");
if (PQexitPipelineMode(conn) != 0)
pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("PQgetResult returned null when sync result expected: %s",
PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
pg_fatal("Unexpected result code %s instead of sync result, error: %s",
PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
PQclear(res);
/* second pipeline */
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("Unexpected result code %s from second pipeline item",
PQresStatus(PQresultStatus(res)));
res = PQgetResult(conn);
if (res != NULL)
pg_fatal("Expected null result, got %s",
PQresStatus(PQresultStatus(res)));
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
pg_fatal("Unexpected result code %s from second pipeline sync",
PQresStatus(PQresultStatus(res)));
/* We're still in pipeline mode ... */
if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
pg_fatal("Fell out of pipeline mode somehow");
/* until we end it, which we can safely do now */
if (PQexitPipelineMode(conn) != 1)
pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
PQerrorMessage(conn));
if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
pg_fatal("exiting pipeline mode didn't seem to work");
fprintf(stderr, "ok\n");
}
/*
* When an operation in a pipeline fails the rest of the pipeline is flushed. We
* still have to get results for each pipeline item, but the item will just be
* a PGRES_PIPELINE_ABORTED code.
*
* This intentionally doesn't use a transaction to wrap the pipeline. You should
* usually use an xact, but in this case we want to observe the effects of each
* statement.
*/
static void
test_pipeline_abort(PGconn *conn)
{
PGresult *res = NULL;
const char *dummy_params[1] = {"1"};
Oid dummy_param_oids[1] = {INT4OID};
int i;
bool goterror;
fprintf(stderr, "aborted pipeline... ");
res = PQexec(conn, drop_table_sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
res = PQexec(conn, create_table_sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
/*
* Queue up a couple of small pipelines and process each without returning
* to command mode first. Make sure the second operation in the first
* pipeline ERRORs.
*/
if (PQenterPipelineMode(conn) != 1)
pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
dummy_params[0] = "1";
if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
dummy_params, NULL, NULL, 0) != 1)
pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
1, dummy_param_oids, dummy_params,
NULL, NULL, 0) != 1)
pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
dummy_params[0] = "2";
if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
dummy_params, NULL, NULL, 0) != 1)
pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
dummy_params[0] = "3";
if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
dummy_params, NULL, NULL, 0) != 1)
pg_fatal("dispatching second-pipeline insert failed: %s",
PQerrorMessage(conn));
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
/*
* OK, start processing the pipeline results.
*
* We should get a command-ok for the first query, then a fatal error and
* a pipeline aborted message for the second insert, a pipeline-end, then
* a command-ok and a pipeline-ok for the second pipeline operation.
*/
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("Unexpected result status %s: %s",
PQresStatus(PQresultStatus(res)),
PQresultErrorMessage(res));
PQclear(res);
/* NULL result to signal end-of-results for this command */
if ((res = PQgetResult(conn)) != NULL)
pg_fatal("Expected null result, got %s",
PQresStatus(PQresultStatus(res)));
/* Second query caused error, so we expect an error next */
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_FATAL_ERROR)
pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
PQresStatus(PQresultStatus(res)));
PQclear(res);
/* NULL result to signal end-of-results for this command */
if ((res = PQgetResult(conn)) != NULL)
pg_fatal("Expected null result, got %s",
PQresStatus(PQresultStatus(res)));
/*
* pipeline should now be aborted.
*
* Note that we could still queue more queries at this point if we wanted;
* they'd get added to a new third pipeline since we've already sent a
* second. The aborted flag relates only to the pipeline being received.
*/
if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
pg_fatal("pipeline should be flagged as aborted but isn't");
/* third query in pipeline, the second insert */
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
PQresStatus(PQresultStatus(res)));
PQclear(res);
/* NULL result to signal end-of-results for this command */
if ((res = PQgetResult(conn)) != NULL)
pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
pg_fatal("pipeline should be flagged as aborted but isn't");
/* Ensure we're still in pipeline */
if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
pg_fatal("Fell out of pipeline mode somehow");
/*
* The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
*
* (This is so clients know to start processing results normally again and
* can tell the difference between skipped commands and the sync.)
*/
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
pg_fatal("Unexpected result code from first pipeline sync\n"
"Expected PGRES_PIPELINE_SYNC, got %s",
PQresStatus(PQresultStatus(res)));
PQclear(res);
if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
pg_fatal("sync should've cleared the aborted flag but didn't");
/* We're still in pipeline mode... */
if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
pg_fatal("Fell out of pipeline mode somehow");
/* the insert from the second pipeline */
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("Unexpected result code %s from first item in second pipeline",
PQresStatus(PQresultStatus(res)));
PQclear(res);
/* Read the NULL result at the end of the command */
if ((res = PQgetResult(conn)) != NULL)
pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
/* the second pipeline sync */
if ((res = PQgetResult(conn)) == NULL)
pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
pg_fatal("Unexpected result code %s from second pipeline sync",
PQresStatus(PQresultStatus(res)));
PQclear(res);
if ((res = PQgetResult(conn)) != NULL)
pg_fatal("Expected null result, got %s: %s",
PQresStatus(PQresultStatus(res)),
PQerrorMessage(conn));
/* Try to send two queries in one command */
if (PQsendQuery(conn, "SELECT 1; SELECT 2") != 1)
pg_fatal("failed to send query: %s", PQerrorMessage(conn));
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
goterror = false;
while ((res = PQgetResult(conn)) != NULL)
{
switch (PQresultStatus(res))
{
case PGRES_FATAL_ERROR:
if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
pg_fatal("expected error about multiple commands, got %s",
PQerrorMessage(conn));
printf("got expected %s", PQerrorMessage(conn));
goterror = true;
break;
default:
pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
break;
}
}
if (!goterror)
pg_fatal("did not get cannot-insert-multiple-commands error");
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("got NULL result");
if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
pg_fatal("Unexpected result code %s from pipeline sync",
PQresStatus(PQresultStatus(res)));
/* Test single-row mode with an error partways */
if (PQsendQuery(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g") != 1)
pg_fatal("failed to send query: %s", PQerrorMessage(conn));
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
PQsetSingleRowMode(conn);
goterror = false;
while ((res = PQgetResult(conn)) != NULL)
{
switch (PQresultStatus(res))
{
case PGRES_SINGLE_TUPLE:
printf("got row: %s\n", PQgetvalue(res, 0, 0));
break;
case PGRES_FATAL_ERROR:
if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
pg_fatal("expected division-by-zero, got: %s (%s)",
PQerrorMessage(conn),
PQresultErrorField(res, PG_DIAG_SQLSTATE));
printf("got expected division-by-zero\n");
goterror = true;
break;
default:
pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
}
PQclear(res);
}
if (!goterror)
pg_fatal("did not get division-by-zero error");
/* the third pipeline sync */
if ((res = PQgetResult(conn)) == NULL)
pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
pg_fatal("Unexpected result code %s from third pipeline sync",
PQresStatus(PQresultStatus(res)));
PQclear(res);
/* We're still in pipeline mode... */
if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
pg_fatal("Fell out of pipeline mode somehow");
/* until we end it, which we can safely do now */
if (PQexitPipelineMode(conn) != 1)
pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
PQerrorMessage(conn));
if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
pg_fatal("exiting pipeline mode didn't seem to work");
fprintf(stderr, "ok\n");
/*-
* Since we fired the pipelines off without a surrounding xact, the results
* should be:
*
* - Implicit xact started by server around 1st pipeline
* - First insert applied
* - Second statement aborted xact
* - Third insert skipped
* - Sync rolled back first implicit xact
* - Implicit xact created by server around 2nd pipeline
* - insert applied from 2nd pipeline
* - Sync commits 2nd xact
*
* So we should only have the value 3 that we inserted.
*/
res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("Expected tuples, got %s: %s",
PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
if (PQntuples(res) != 1)
pg_fatal("expected 1 result, got %d", PQntuples(res));
for (i = 0; i < PQntuples(res); i++)
{
const char *val = PQgetvalue(res, i, 0);
if (strcmp(val, "3") != 0)
pg_fatal("expected only insert with value 3, got %s", val);
}
PQclear(res);
}
/* State machine enum for test_pipelined_insert */
enum PipelineInsertStep
{
BI_BEGIN_TX,
BI_DROP_TABLE,
BI_CREATE_TABLE,
BI_PREPARE,
BI_INSERT_ROWS,
BI_COMMIT_TX,
BI_SYNC,
BI_DONE
};
static void
test_pipelined_insert(PGconn *conn, int n_rows)
{
const char *insert_params[1];
Oid insert_param_oids[1] = {INT4OID};
char insert_param_0[MAXINTLEN];
enum PipelineInsertStep send_step = BI_BEGIN_TX,
recv_step = BI_BEGIN_TX;
int rows_to_send,
rows_to_receive;
insert_params[0] = &insert_param_0[0];
rows_to_send = rows_to_receive = n_rows;
/*
* Do a pipelined insert into a table created at the start of the pipeline
*/
if (PQenterPipelineMode(conn) != 1)
pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
while (send_step != BI_PREPARE)
{
const char *sql;
switch (send_step)
{
case BI_BEGIN_TX:
sql = "BEGIN TRANSACTION";
send_step = BI_DROP_TABLE;
break;
case BI_DROP_TABLE:
sql = drop_table_sql;
send_step = BI_CREATE_TABLE;
break;
case BI_CREATE_TABLE:
sql = create_table_sql;
send_step = BI_PREPARE;
break;
default:
pg_fatal("invalid state");
}
pg_debug("sending: %s\n", sql);
if (PQsendQueryParams(conn, sql,
0, NULL, NULL, NULL, NULL, 0) != 1)
pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
}
Assert(send_step == BI_PREPARE);
pg_debug("sending: %s\n", insert_sql);
if (PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids) != 1)
pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
send_step = BI_INSERT_ROWS;
/*
* Now we start inserting. We'll be sending enough data that we could fill
* our output buffer, so to avoid deadlocking we need to enter nonblocking
* mode and consume input while we send more output. As results of each
* query are processed we should pop them to allow processing of the next
* query. There's no need to finish the pipeline before processing
* results.
*/
if (PQsetnonblocking(conn, 1) != 0)
pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
while (recv_step != BI_DONE)
{
int sock;
fd_set input_mask;
fd_set output_mask;
sock = PQsocket(conn);
if (sock < 0)
break; /* shouldn't happen */
FD_ZERO(&input_mask);
FD_SET(sock, &input_mask);
FD_ZERO(&output_mask);
FD_SET(sock, &output_mask);
if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
{
fprintf(stderr, "select() failed: %s\n", strerror(errno));
exit_nicely(conn);
}
/*
* Process any results, so we keep the server's output buffer free
* flowing and it can continue to process input
*/
if (FD_ISSET(sock, &input_mask))
{
PQconsumeInput(conn);
/* Read until we'd block if we tried to read */
while (!PQisBusy(conn) && recv_step < BI_DONE)
{
PGresult *res;
const char *cmdtag;
const char *description = "";
int status;
/*
* Read next result. If no more results from this query,
* advance to the next query
*/
res = PQgetResult(conn);
if (res == NULL)
continue;
status = PGRES_COMMAND_OK;
switch (recv_step)
{
case BI_BEGIN_TX:
cmdtag = "BEGIN";
recv_step++;
break;
case BI_DROP_TABLE:
cmdtag = "DROP TABLE";
recv_step++;
break;
case BI_CREATE_TABLE:
cmdtag = "CREATE TABLE";
recv_step++;
break;
case BI_PREPARE:
cmdtag = "";
description = "PREPARE";
recv_step++;
break;
case BI_INSERT_ROWS:
cmdtag = "INSERT";
rows_to_receive--;
if (rows_to_receive == 0)
recv_step++;
break;
case BI_COMMIT_TX:
cmdtag = "COMMIT";
recv_step++;
break;
case BI_SYNC:
cmdtag = "";
description = "SYNC";
status = PGRES_PIPELINE_SYNC;
recv_step++;
break;
case BI_DONE:
/* unreachable */
description = "";
abort();
}
if (PQresultStatus(res) != status)
pg_fatal("%s reported status %s, expected %s\n"
"Error message: \"%s\"",
description, PQresStatus(PQresultStatus(res)),
PQresStatus(status), PQerrorMessage(conn));
if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
pg_fatal("%s expected command tag '%s', got '%s'",
description, cmdtag, PQcmdStatus(res));
pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
PQclear(res);
}
}
/* Write more rows and/or the end pipeline message, if needed */
if (FD_ISSET(sock, &output_mask))
{
PQflush(conn);
if (send_step == BI_INSERT_ROWS)
{
snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send);
if (PQsendQueryPrepared(conn, "my_insert",
1, insert_params, NULL, NULL, 0) == 1)
{
pg_debug("sent row %d\n", rows_to_send);
rows_to_send--;
if (rows_to_send == 0)
send_step++;
}
else
{
/*
* in nonblocking mode, so it's OK for an insert to fail
* to send
*/
fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
rows_to_send, PQerrorMessage(conn));
}
}
else if (send_step == BI_COMMIT_TX)
{
if (PQsendQueryParams(conn, "COMMIT",
0, NULL, NULL, NULL, NULL, 0) == 1)
{
pg_debug("sent COMMIT\n");
send_step++;
}
else
{
fprintf(stderr, "WARNING: failed to send commit: %s\n",
PQerrorMessage(conn));
}
}
else if (send_step == BI_SYNC)
{
if (PQpipelineSync(conn) == 1)
{
fprintf(stdout, "pipeline sync sent\n");
send_step++;
}
else
{
fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
PQerrorMessage(conn));
}
}
}
}
/* We've got the sync message and the pipeline should be done */
if (PQexitPipelineMode(conn) != 1)
pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
PQerrorMessage(conn));
if (PQsetnonblocking(conn, 0) != 0)
pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
fprintf(stderr, "ok\n");
}
static void
test_prepared(PGconn *conn)
{
PGresult *res = NULL;
Oid param_oids[1] = {INT4OID};
Oid expected_oids[4];
Oid typ;
fprintf(stderr, "prepared... ");
if (PQenterPipelineMode(conn) != 1)
pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
"interval '1 sec'",
1, param_oids) != 1)
pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
expected_oids[0] = INT4OID;
expected_oids[1] = TEXTOID;
expected_oids[2] = NUMERICOID;
expected_oids[3] = INTERVALOID;
if (PQsendDescribePrepared(conn, "select_one") != 1)
pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("PQgetResult returned null");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
PQclear(res);
res = PQgetResult(conn);
if (res != NULL)
pg_fatal("expected NULL result");
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("PQgetResult returned NULL");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
if (PQnfields(res) != lengthof(expected_oids))
pg_fatal("expected %d columns, got %d",
lengthof(expected_oids), PQnfields(res));
for (int i = 0; i < PQnfields(res); i++)
{
typ = PQftype(res, i);
if (typ != expected_oids[i])
pg_fatal("field %d: expected type %u, got %u",
i, expected_oids[i], typ);
}
PQclear(res);
res = PQgetResult(conn);
if (res != NULL)
pg_fatal("expected NULL result");
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
if (PQexitPipelineMode(conn) != 1)
pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
PQexec(conn, "BEGIN");
PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
PQenterPipelineMode(conn);
if (PQsendDescribePortal(conn, "cursor_one") != 1)
pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("expected NULL result");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
typ = PQftype(res, 0);
if (typ != INT4OID)
pg_fatal("portal: expected type %u, got %u",
INT4OID, typ);
PQclear(res);
res = PQgetResult(conn);
if (res != NULL)
pg_fatal("expected NULL result");
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
if (PQexitPipelineMode(conn) != 1)
pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
fprintf(stderr, "ok\n");
}
static void
test_simple_pipeline(PGconn *conn)
{
PGresult *res = NULL;
const char *dummy_params[1] = {"1"};
Oid dummy_param_oids[1] = {INT4OID};
fprintf(stderr, "simple pipeline... ");
/*
* Enter pipeline mode and dispatch a set of operations, which we'll then
* process the results of as they come in.
*
* For a simple case we should be able to do this without interim
* processing of results since our output buffer will give us enough slush
* to work with and we won't block on sending. So blocking mode is fine.
*/
if (PQisnonblocking(conn))
pg_fatal("Expected blocking connection mode");
if (PQenterPipelineMode(conn) != 1)
pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
if (PQsendQueryParams(conn, "SELECT $1",
1, dummy_param_oids, dummy_params,
NULL, NULL, 0) != 1)
pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
if (PQexitPipelineMode(conn) != 0)
pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("Unexpected result code %s from first pipeline item",
PQresStatus(PQresultStatus(res)));
PQclear(res);
res = NULL;
if (PQgetResult(conn) != NULL)
pg_fatal("PQgetResult returned something extra after first query result.");
/*
* Even though we've processed the result there's still a sync to come and
* we can't exit pipeline mode yet
*/
if (PQexitPipelineMode(conn) != 0)
pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
res = PQgetResult(conn);
if (res == NULL)
pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
PQclear(res);
res = NULL;
if (PQgetResult(conn) != NULL)
pg_fatal("PQgetResult returned something extra after pipeline end: %s",
PQresStatus(PQresultStatus(res)));
/* We're still in pipeline mode... */
if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
pg_fatal("Fell out of pipeline mode somehow");
/* ... until we end it, which we can safely do now */
if (PQexitPipelineMode(conn) != 1)
pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
PQerrorMessage(conn));
if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
pg_fatal("Exiting pipeline mode didn't seem to work");
fprintf(stderr, "ok\n");
}
static void
test_singlerowmode(PGconn *conn)
{
PGresult *res;
int i;
bool pipeline_ended = false;
/* 1 pipeline, 3 queries in it */
if (PQenterPipelineMode(conn) != 1)
pg_fatal("failed to enter pipeline mode: %s",
PQerrorMessage(conn));
for (i = 0; i < 3; i++)
{
char *param[1];
param[0] = psprintf("%d", 44 + i);
if (PQsendQueryParams(conn,
"SELECT generate_series(42, $1)",
1,
NULL,
(const char **) param,
NULL,
NULL,
0) != 1)
pg_fatal("failed to send query: %s",
PQerrorMessage(conn));
pfree(param[0]);
}
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
for (i = 0; !pipeline_ended; i++)
{
bool first = true;
bool saw_ending_tuplesok;
bool isSingleTuple = false;
/* Set single row mode for only first 2 SELECT queries */
if (i < 2)
{
if (PQsetSingleRowMode(conn) != 1)
pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
}
/* Consume rows for this query */
saw_ending_tuplesok = false;
while ((res = PQgetResult(conn)) != NULL)
{
ExecStatusType est = PQresultStatus(res);
if (est == PGRES_PIPELINE_SYNC)
{
fprintf(stderr, "end of pipeline reached\n");
pipeline_ended = true;
PQclear(res);
if (i != 3)
pg_fatal("Expected three results, got %d", i);
break;
}
/* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
if (first)
{
if (i <= 1 && est != PGRES_SINGLE_TUPLE)
pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
i, PQresStatus(est));
if (i >= 2 && est != PGRES_TUPLES_OK)
pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
i, PQresStatus(est));
first = false;
}
fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
switch (est)
{
case PGRES_TUPLES_OK:
fprintf(stderr, ", tuples: %d\n", PQntuples(res));
saw_ending_tuplesok = true;
if (isSingleTuple)
{
if (PQntuples(res) == 0)
fprintf(stderr, "all tuples received in query %d\n", i);
else
pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
}
break;
case PGRES_SINGLE_TUPLE:
isSingleTuple = true;
fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
break;
default:
pg_fatal("unexpected");
}
PQclear(res);
}
if (!pipeline_ended && !saw_ending_tuplesok)
pg_fatal("didn't get expected terminating TUPLES_OK");
}
if (PQexitPipelineMode(conn) != 1)
pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
}
/*
* Simple test to verify that a pipeline is discarded as a whole when there's
* an error, ignoring transaction commands.
*/
static void
test_transaction(PGconn *conn)
{
PGresult *res;
bool expect_null;
int num_syncs = 0;
res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
"CREATE TABLE pq_pipeline_tst (id int)");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("failed to create test table: %s",
PQerrorMessage(conn));
PQclear(res);
if (PQenterPipelineMode(conn) != 1)
pg_fatal("failed to enter pipeline mode: %s",
PQerrorMessage(conn));
if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
pg_fatal("could not send prepare on pipeline: %s",
PQerrorMessage(conn));
if (PQsendQueryParams(conn,
"BEGIN",
0, NULL, NULL, NULL, NULL, 0) != 1)
pg_fatal("failed to send query: %s",
PQerrorMessage(conn));
if (PQsendQueryParams(conn,
"SELECT 0/0",
0, NULL, NULL, NULL, NULL, 0) != 1)
pg_fatal("failed to send query: %s",
PQerrorMessage(conn));
/*
* send a ROLLBACK using a prepared stmt. Doesn't work because we need to
* get out of the pipeline-aborted state first.
*/
if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
pg_fatal("failed to execute prepared: %s",
PQerrorMessage(conn));
/* This insert fails because we're in pipeline-aborted state */
if (PQsendQueryParams(conn,
"INSERT INTO pq_pipeline_tst VALUES (1)",
0, NULL, NULL, NULL, NULL, 0) != 1)
pg_fatal("failed to send query: %s",
PQerrorMessage(conn));
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
num_syncs++;
/*
* This insert fails even though the pipeline got a SYNC, because we're in
* an aborted transaction
*/
if (PQsendQueryParams(conn,
"INSERT INTO pq_pipeline_tst VALUES (2)",
0, NULL, NULL, NULL, NULL, 0) != 1)
pg_fatal("failed to send query: %s",
PQerrorMessage(conn));
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
num_syncs++;
/*
* Send ROLLBACK using prepared stmt. This one works because we just did
* PQpipelineSync above.
*/
if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
pg_fatal("failed to execute prepared: %s",
PQerrorMessage(conn));
/*
* Now that we're out of a transaction and in pipeline-good mode, this
* insert works
*/
if (PQsendQueryParams(conn,
"INSERT INTO pq_pipeline_tst VALUES (3)",
0, NULL, NULL, NULL, NULL, 0) != 1)
pg_fatal("failed to send query: %s",
PQerrorMessage(conn));
/* Send two syncs now -- match up to SYNC messages below */
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
num_syncs++;
if (PQpipelineSync(conn) != 1)
pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
num_syncs++;
expect_null = false;
for (int i = 0;; i++)
{
ExecStatusType restype;
res = PQgetResult(conn);
if (res == NULL)
{
printf("%d: got NULL result\n", i);
if (!expect_null)
pg_fatal("did not expect NULL here");
expect_null = false;
continue;
}
restype = PQresultStatus(res);
printf("%d: got status %s", i, PQresStatus(restype));
if (expect_null)
pg_fatal("expected NULL");
if (restype == PGRES_FATAL_ERROR)
printf("; error: %s", PQerrorMessage(conn));
else if (restype == PGRES_PIPELINE_ABORTED)
{
printf(": command didn't run because pipeline aborted\n");
}
else
printf("\n");
PQclear(res);
if (restype == PGRES_PIPELINE_SYNC)
num_syncs--;
else
expect_null = true;
if (num_syncs <= 0)
break;
}
if (PQgetResult(conn) != NULL)
pg_fatal("returned something extra after all the syncs: %s",
PQresStatus(PQresultStatus(res)));
if (PQexitPipelineMode(conn) != 1)
pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
/* We expect to find one tuple containing the value "3" */
res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
if (PQntuples(res) != 1)
pg_fatal("did not get 1 tuple");
if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
pg_fatal("did not get expected tuple");
PQclear(res);
fprintf(stderr, "ok\n");
}
static void
usage(const char *progname)
{
fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
fprintf(stderr, "Usage:\n");
fprintf(stderr, " %s tests", progname);
fprintf(stderr, " %s testname [conninfo [number_of_rows]]\n", progname);
}
static void
print_test_list(void)
{
printf("disallowed_in_pipeline\n");
printf("multi_pipelines\n");
printf("pipeline_abort\n");
printf("pipelined_insert\n");
printf("prepared\n");
printf("simple_pipeline\n");
printf("singlerow\n");
printf("transaction\n");
}
int
main(int argc, char **argv)
{
const char *conninfo = "";
PGconn *conn;
int numrows = 10000;
PGresult *res;
if (strcmp(argv[1], "tests") == 0)
{
print_test_list();
exit(0);
}
/*
* The testname parameter is mandatory; it can be followed by a conninfo
* string and number of rows.
*/
if (argc < 2 || argc > 4)
{
usage(argv[0]);
exit(1);
}
if (argc >= 3)
conninfo = pg_strdup(argv[2]);
if (argc >= 4)
{
errno = 0;
numrows = strtol(argv[3], NULL, 10);
if (errno != 0 || numrows <= 0)
{
fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n", argv[3]);
exit(1);
}
}
/* Make a connection to the database */
conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK)
{
fprintf(stderr, "Connection to database failed: %s\n",
PQerrorMessage(conn));
exit_nicely(conn);
}
res = PQexec(conn, "SET lc_messages TO \"C\"");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
if (strcmp(argv[1], "disallowed_in_pipeline") == 0)
test_disallowed_in_pipeline(conn);
else if (strcmp(argv[1], "multi_pipelines") == 0)
test_multi_pipelines(conn);
else if (strcmp(argv[1], "pipeline_abort") == 0)
test_pipeline_abort(conn);
else if (strcmp(argv[1], "pipelined_insert") == 0)
test_pipelined_insert(conn, numrows);
else if (strcmp(argv[1], "prepared") == 0)
test_prepared(conn);
else if (strcmp(argv[1], "simple_pipeline") == 0)
test_simple_pipeline(conn);
else if (strcmp(argv[1], "singlerow") == 0)
test_singlerowmode(conn);
else if (strcmp(argv[1], "transaction") == 0)
test_transaction(conn);
else
{
fprintf(stderr, "\"%s\" is not a recognized test name\n", argv[1]);
usage(argv[0]);
exit(1);
}
/* close the connection to the database and cleanup */
PQfinish(conn);
return 0;
}
use strict;
use warnings;
use Config;
use PostgresNode;
use TestLib;
use Test::More tests => 8;
use Cwd;
my $node = get_new_node('main');
$node->init;
$node->start;
my $numrows = 10000;
$ENV{PATH} = "$ENV{PATH}:" . getcwd();
my ($out, $err) = run_command(['libpq_pipeline', 'tests']);
die "oops: $err" unless $err eq '';
my @tests = split(/\s/, $out);
for my $testname (@tests)
{
$node->command_ok(
[ 'libpq_pipeline', $testname, $node->connstr('postgres'), $numrows ],
"libpq_pipeline $testname");
}
$node->stop('fast');
......@@ -33,10 +33,11 @@ my @unlink_on_exit;
# Set of variables for modules in contrib/ and src/test/modules/
my $contrib_defines = { 'refint' => 'REFINT_VERBOSE' };
my @contrib_uselibpq = ('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo');
my @contrib_uselibpgport = ('oid2name', 'vacuumlo');
my @contrib_uselibpgcommon = ('oid2name', 'vacuumlo');
my $contrib_extralibs = undef;
my @contrib_uselibpq =
('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo', 'libpq_pipeline');
my @contrib_uselibpgport = ('libpq_pipeline', 'oid2name', 'vacuumlo');
my @contrib_uselibpgcommon = ('libpq_pipeline', 'oid2name', 'vacuumlo');
my $contrib_extralibs = { 'libpq_pipeline' => ['ws2_32.lib'] };
my $contrib_extraincludes = { 'dblink' => ['src/backend'] };
my $contrib_extrasource = {
'cube' => [ 'contrib/cube/cubescan.l', 'contrib/cube/cubeparse.y' ],
......
......@@ -1563,10 +1563,12 @@ PG_Locale_Strategy
PG_Lock_Status
PG_init_t
PGcancel
PGcmdQueueEntry
PGconn
PGdataValue
PGlobjfuncs
PGnotify
PGpipelineStatus
PGresAttDesc
PGresAttValue
PGresParamDesc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment