pquery.c 38.2 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * pquery.c
4
 *	  POSTGRES process query command code
5
 *
6
 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  $PostgreSQL: pgsql/src/backend/tcop/pquery.c,v 1.102 2006/06/27 02:51:39 tgl Exp $
12 13 14
 *
 *-------------------------------------------------------------------------
 */
15

16 17
#include "postgres.h"

18
#include "commands/prepare.h"
19
#include "commands/trigger.h"
20
#include "executor/executor.h"
21 22
#include "miscadmin.h"
#include "tcop/tcopprot.h"
23
#include "tcop/pquery.h"
24 25 26 27 28
#include "tcop/utility.h"
#include "utils/guc.h"
#include "utils/memutils.h"


29 30 31 32
/*
 * ActivePortal is the currently executing Portal (the most closely nested,
 * if there are several).
 */
Bruce Momjian's avatar
Bruce Momjian committed
33
Portal		ActivePortal = NULL;
34 35


36 37 38 39 40
static void ProcessQuery(Query *parsetree,
			 Plan *plan,
			 ParamListInfo params,
			 DestReceiver *dest,
			 char *completionTag);
41
static uint32 RunFromStore(Portal portal, ScanDirection direction, long count,
Bruce Momjian's avatar
Bruce Momjian committed
42
			 DestReceiver *dest);
43
static long PortalRunSelect(Portal portal, bool forward, long count,
Bruce Momjian's avatar
Bruce Momjian committed
44
				DestReceiver *dest);
45
static void PortalRunUtility(Portal portal, Query *query,
Bruce Momjian's avatar
Bruce Momjian committed
46
				 DestReceiver *dest, char *completionTag);
47
static void PortalRunMulti(Portal portal,
Bruce Momjian's avatar
Bruce Momjian committed
48 49
			   DestReceiver *dest, DestReceiver *altdest,
			   char *completionTag);
50
static long DoPortalRunFetch(Portal portal,
Bruce Momjian's avatar
Bruce Momjian committed
51 52 53
				 FetchDirection fdirection,
				 long count,
				 DestReceiver *dest);
54
static void DoPortalRewind(Portal portal);
55

56

57 58
/*
 * CreateQueryDesc
59
 */
60
QueryDesc *
61 62
CreateQueryDesc(Query *parsetree,
				Plan *plantree,
63 64
				Snapshot snapshot,
				Snapshot crosscheck_snapshot,
65
				DestReceiver *dest,
66 67
				ParamListInfo params,
				bool doInstrument)
68
{
69
	QueryDesc  *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
70 71 72 73

	qd->operation = parsetree->commandType;		/* operation */
	qd->parsetree = parsetree;	/* parse tree */
	qd->plantree = plantree;	/* plan */
74
	qd->snapshot = snapshot;	/* snapshot */
75
	qd->crosscheck_snapshot = crosscheck_snapshot;		/* RI check snapshot */
76
	qd->dest = dest;			/* output dest */
77
	qd->params = params;		/* parameter values passed into query */
Bruce Momjian's avatar
Bruce Momjian committed
78
	qd->doInstrument = doInstrument;	/* instrumentation wanted? */
79

80 81 82 83
	/* null these fields until set by ExecutorStart */
	qd->tupDesc = NULL;
	qd->estate = NULL;
	qd->planstate = NULL;
84

85
	return qd;
86 87
}

88 89 90 91 92 93 94 95 96 97 98 99
/*
 * FreeQueryDesc
 */
void
FreeQueryDesc(QueryDesc *qdesc)
{
	/* Can't be a live query */
	Assert(qdesc->estate == NULL);
	/* Only the QueryDesc itself need be freed */
	pfree(qdesc);
}

100

101 102
/*
 * ProcessQuery
103
 *		Execute a single plannable query within a PORTAL_MULTI_QUERY portal
104
 *
105 106
 *	parsetree: the query tree
 *	plan: the plan tree for the query
107
 *	params: any parameters needed
108 109 110 111 112
 *	dest: where to send results
 *	completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
 *		in which to store a command completion status string.
 *
 * completionTag may be NULL if caller doesn't want a status string.
113 114 115
 *
 * Must be called in a memory context that will be reset or deleted on
 * error; otherwise the executor's memory usage will be leaked.
116
 */
117
static void
118 119
ProcessQuery(Query *parsetree,
			 Plan *plan,
120
			 ParamListInfo params,
121
			 DestReceiver *dest,
122
			 char *completionTag)
123
{
124 125
	int			operation = parsetree->commandType;
	QueryDesc  *queryDesc;
126

127 128 129
	ereport(DEBUG3,
			(errmsg_internal("ProcessQuery")));

130
	/*
131
	 * Check for special-case destinations
132 133 134
	 */
	if (operation == CMD_SELECT)
	{
135
		if (parsetree->into != NULL)
136
		{
137 138 139 140
			/*
			 * SELECT INTO table (a/k/a CREATE AS ... SELECT).
			 *
			 * Override the normal communication destination; execMain.c
141 142
			 * special-cases this case.  (Perhaps would be cleaner to have an
			 * additional destination type?)
143
			 */
144
			dest = None_Receiver;
145
		}
146
	}
147

148
	/*
149 150
	 * Must always set snapshot for plannable queries.	Note we assume that
	 * caller will take care of restoring ActiveSnapshot on exit/error.
151 152 153
	 */
	ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());

154
	/*
155
	 * Create the QueryDesc object
156
	 */
157 158 159
	queryDesc = CreateQueryDesc(parsetree, plan,
								ActiveSnapshot, InvalidSnapshot,
								dest, params, false);
160

161 162 163 164 165
	/*
	 * Set up to collect AFTER triggers
	 */
	AfterTriggerBeginQuery();

166
	/*
167
	 * Call ExecutorStart to prepare the plan for execution
168
	 */
169
	ExecutorStart(queryDesc, 0);
170

171
	/*
172
	 * Run the plan to completion.
173
	 */
174
	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
175

176
	/*
177
	 * Build command completion status string, if caller wants one.
178
	 */
179 180
	if (completionTag)
	{
Bruce Momjian's avatar
Bruce Momjian committed
181
		Oid			lastOid;
182 183 184 185 186 187 188

		switch (operation)
		{
			case CMD_SELECT:
				strcpy(completionTag, "SELECT");
				break;
			case CMD_INSERT:
189 190
				if (queryDesc->estate->es_processed == 1)
					lastOid = queryDesc->estate->es_lastoid;
191 192 193
				else
					lastOid = InvalidOid;
				snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
194
				   "INSERT %u %u", lastOid, queryDesc->estate->es_processed);
195 196 197
				break;
			case CMD_UPDATE:
				snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
198
						 "UPDATE %u", queryDesc->estate->es_processed);
199 200 201
				break;
			case CMD_DELETE:
				snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
202
						 "DELETE %u", queryDesc->estate->es_processed);
203 204 205 206 207 208
				break;
			default:
				strcpy(completionTag, "???");
				break;
		}
	}
209

210 211 212
	/* Now take care of any queued AFTER triggers */
	AfterTriggerEndQuery(queryDesc->estate);

213
	/*
214
	 * Now, we close down all the scans and free allocated resources.
215
	 */
216
	ExecutorEnd(queryDesc);
217 218

	FreeQueryDesc(queryDesc);
219 220 221

	FreeSnapshot(ActiveSnapshot);
	ActiveSnapshot = NULL;
222
}
223

224 225 226 227 228 229 230 231 232 233 234 235 236
/*
 * ChoosePortalStrategy
 *		Select portal execution strategy given the intended query list.
 *
 * See the comments in portal.h.
 */
PortalStrategy
ChoosePortalStrategy(List *parseTrees)
{
	PortalStrategy strategy;

	strategy = PORTAL_MULTI_QUERY;		/* default assumption */

237
	if (list_length(parseTrees) == 1)
238
	{
239
		Query	   *query = (Query *) linitial(parseTrees);
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254

		if (query->commandType == CMD_SELECT &&
			query->canSetTag &&
			query->into == NULL)
			strategy = PORTAL_ONE_SELECT;
		else if (query->commandType == CMD_UTILITY &&
				 query->canSetTag &&
				 query->utilityStmt != NULL)
		{
			if (UtilityReturnsTuples(query->utilityStmt))
				strategy = PORTAL_UTIL_SELECT;
		}
	}
	return strategy;
}
255

256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
/*
 * FetchPortalTargetList
 *		Given a portal that returns tuples, extract the query targetlist.
 *		Returns NIL if the portal doesn't have a determinable targetlist.
 *
 * Note: do not modify the result.
 *
 * XXX be careful to keep this in sync with FetchPreparedStatementTargetList,
 * and with UtilityReturnsTuples.
 */
List *
FetchPortalTargetList(Portal portal)
{
	if (portal->strategy == PORTAL_ONE_SELECT)
		return ((Query *) linitial(portal->parseTrees))->targetList;
	if (portal->strategy == PORTAL_UTIL_SELECT)
	{
273
		Node	   *utilityStmt;
274 275 276 277 278

		utilityStmt = ((Query *) linitial(portal->parseTrees))->utilityStmt;
		switch (nodeTag(utilityStmt))
		{
			case T_FetchStmt:
279 280 281
				{
					FetchStmt  *substmt = (FetchStmt *) utilityStmt;
					Portal		subportal;
282

283 284 285 286 287
					Assert(!substmt->ismove);
					subportal = GetPortalByName(substmt->portalname);
					Assert(PortalIsValid(subportal));
					return FetchPortalTargetList(subportal);
				}
288 289

			case T_ExecuteStmt:
290 291 292
				{
					ExecuteStmt *substmt = (ExecuteStmt *) utilityStmt;
					PreparedStatement *entry;
293

294 295 296 297
					Assert(!substmt->into);
					entry = FetchPreparedStatement(substmt->name, true);
					return FetchPreparedStatementTargetList(entry);
				}
298 299 300 301 302 303 304 305

			default:
				break;
		}
	}
	return NIL;
}

306 307 308 309 310 311 312 313 314
/*
 * PortalStart
 *		Prepare a portal for execution.
 *
 * Caller must already have created the portal, done PortalDefineQuery(),
 * and adjusted portal options if needed.  If parameters are needed by
 * the query, they must be passed in here (caller is responsible for
 * giving them appropriate lifetime).
 *
315 316 317 318 319
 * The caller can optionally pass a snapshot to be used; pass InvalidSnapshot
 * for the normal behavior of setting a new snapshot.  This parameter is
 * presently ignored for non-PORTAL_ONE_SELECT portals (it's only intended
 * to be used for cursors).
 *
320 321 322 323
 * On return, portal is ready to accept PortalRun() calls, and the result
 * tupdesc (if any) is known.
 */
void
324
PortalStart(Portal portal, ParamListInfo params, Snapshot snapshot)
325
{
326
	Portal		saveActivePortal;
327
	Snapshot	saveActiveSnapshot;
328 329
	ResourceOwner saveResourceOwner;
	MemoryContext savePortalContext;
330 331
	MemoryContext oldContext;
	QueryDesc  *queryDesc;
332
	int			eflags;
333 334

	AssertArg(PortalIsValid(portal));
Bruce Momjian's avatar
Bruce Momjian committed
335
	AssertState(portal->queryContext != NULL);	/* query defined? */
336 337 338
	AssertState(portal->status == PORTAL_NEW);	/* else extra PortalStart */

	/*
339
	 * Set up global portal context pointers.  (Should we set QueryContext?)
340 341
	 */
	saveActivePortal = ActivePortal;
342
	saveActiveSnapshot = ActiveSnapshot;
343 344
	saveResourceOwner = CurrentResourceOwner;
	savePortalContext = PortalContext;
345
	PG_TRY();
346
	{
347
		ActivePortal = portal;
348
		ActiveSnapshot = NULL;	/* will be set later */
349 350
		CurrentResourceOwner = portal->resowner;
		PortalContext = PortalGetHeapMemory(portal);
Bruce Momjian's avatar
Bruce Momjian committed
351

352
		oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
Bruce Momjian's avatar
Bruce Momjian committed
353

354 355
		/* Must remember portal param list, if any */
		portal->portalParams = params;
Bruce Momjian's avatar
Bruce Momjian committed
356

357 358 359 360
		/*
		 * Determine the portal execution strategy
		 */
		portal->strategy = ChoosePortalStrategy(portal->parseTrees);
Bruce Momjian's avatar
Bruce Momjian committed
361

362 363 364 365 366 367 368 369
		/*
		 * Fire her up according to the strategy
		 */
		switch (portal->strategy)
		{
			case PORTAL_ONE_SELECT:

				/*
370
				 * Must set snapshot before starting executor.	Be sure to
371
				 * copy it into the portal's context.
372
				 */
373 374 375 376
				if (snapshot)
					ActiveSnapshot = CopySnapshot(snapshot);
				else
					ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
377 378

				/*
379
				 * Create QueryDesc in portal's context; for the moment, set
380
				 * the destination to DestNone.
381 382
				 */
				queryDesc = CreateQueryDesc((Query *) linitial(portal->parseTrees),
383
										(Plan *) linitial(portal->planTrees),
384 385
											ActiveSnapshot,
											InvalidSnapshot,
386 387 388 389
											None_Receiver,
											params,
											false);

390
				/*
391 392 393 394
				 * We do *not* call AfterTriggerBeginQuery() here.	We assume
				 * that a SELECT cannot queue any triggers.  It would be messy
				 * to support triggers since the execution of the portal may
				 * be interleaved with other queries.
395 396
				 */

397
				/*
398 399
				 * If it's a scrollable cursor, executor needs to support
				 * REWIND and backwards scan.
400
				 */
401 402 403 404 405 406 407 408 409
				if (portal->cursorOptions & CURSOR_OPT_SCROLL)
					eflags = EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD;
				else
					eflags = 0;		/* default run-to-completion flags */

				/*
				 * Call ExecutorStart to prepare the plan for execution
				 */
				ExecutorStart(queryDesc, eflags);
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424

				/*
				 * This tells PortalCleanup to shut down the executor
				 */
				portal->queryDesc = queryDesc;

				/*
				 * Remember tuple descriptor (computed by ExecutorStart)
				 */
				portal->tupDesc = queryDesc->tupDesc;

				/*
				 * Reset cursor position data to "start of query"
				 */
				portal->atStart = true;
Bruce Momjian's avatar
Bruce Momjian committed
425
				portal->atEnd = false;	/* allow fetches */
426 427 428
				portal->portalPos = 0;
				portal->posOverflow = false;
				break;
Bruce Momjian's avatar
Bruce Momjian committed
429

430
			case PORTAL_UTIL_SELECT:
Bruce Momjian's avatar
Bruce Momjian committed
431

432
				/*
433 434
				 * We don't set snapshot here, because PortalRunUtility will
				 * take care of it if needed.
435 436 437
				 */
				portal->tupDesc =
					UtilityTupleDescriptor(((Query *) linitial(portal->parseTrees))->utilityStmt);
438

439 440 441 442
				/*
				 * Reset cursor position data to "start of query"
				 */
				portal->atStart = true;
Bruce Momjian's avatar
Bruce Momjian committed
443
				portal->atEnd = false;	/* allow fetches */
444 445 446
				portal->portalPos = 0;
				portal->posOverflow = false;
				break;
Bruce Momjian's avatar
Bruce Momjian committed
447

448 449 450 451 452 453 454 455 456 457
			case PORTAL_MULTI_QUERY:
				/* Need do nothing now */
				portal->tupDesc = NULL;
				break;
		}
	}
	PG_CATCH();
	{
		/* Uncaught error while executing portal: mark it dead */
		portal->status = PORTAL_FAILED;
Bruce Momjian's avatar
Bruce Momjian committed
458

459 460
		/* Restore global vars and propagate error */
		ActivePortal = saveActivePortal;
461
		ActiveSnapshot = saveActiveSnapshot;
462 463 464 465
		CurrentResourceOwner = saveResourceOwner;
		PortalContext = savePortalContext;

		PG_RE_THROW();
466
	}
467
	PG_END_TRY();
468 469 470

	MemoryContextSwitchTo(oldContext);

471
	ActivePortal = saveActivePortal;
472
	ActiveSnapshot = saveActiveSnapshot;
473 474 475 476
	CurrentResourceOwner = saveResourceOwner;
	PortalContext = savePortalContext;

	portal->status = PORTAL_READY;
477 478
}

479 480 481 482 483
/*
 * PortalSetResultFormat
 *		Select the format codes for a portal's output.
 *
 * This must be run after PortalStart for a portal that will be read by
484 485
 * a DestRemote or DestRemoteExecute destination.  It is not presently needed
 * for other destination types.
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
 *
 * formats[] is the client format request, as per Bind message conventions.
 */
void
PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
{
	int			natts;
	int			i;

	/* Do nothing if portal won't return tuples */
	if (portal->tupDesc == NULL)
		return;
	natts = portal->tupDesc->natts;
	portal->formats = (int16 *)
		MemoryContextAlloc(PortalGetHeapMemory(portal),
501
						   natts * sizeof(int16));
502 503 504 505
	if (nFormats > 1)
	{
		/* format specified for each column */
		if (nFormats != natts)
506 507 508 509
			ereport(ERROR,
					(errcode(ERRCODE_PROTOCOL_VIOLATION),
					 errmsg("bind message has %d result formats but query has %d columns",
							nFormats, natts)));
510
		memcpy(portal->formats, formats, natts * sizeof(int16));
Bruce Momjian's avatar
Bruce Momjian committed
511 512
	}
	else if (nFormats > 0)
513 514 515 516 517 518 519 520 521 522 523 524 525 526 527
	{
		/* single format specified, use for all columns */
		int16		format1 = formats[0];

		for (i = 0; i < natts; i++)
			portal->formats[i] = format1;
	}
	else
	{
		/* use default format for all columns */
		for (i = 0; i < natts; i++)
			portal->formats[i] = 0;
	}
}

528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
/*
 * PortalRun
 *		Run a portal's query or queries.
 *
 * count <= 0 is interpreted as a no-op: the destination gets started up
 * and shut down, but nothing else happens.  Also, count == FETCH_ALL is
 * interpreted as "all rows".  Note that count is ignored in multi-query
 * situations, where we always run the portal to completion.
 *
 * dest: where to send output of primary (canSetTag) query
 *
 * altdest: where to send output of non-primary queries
 *
 * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
 *		in which to store a command completion status string.
 *		May be NULL if caller doesn't want a status string.
 *
 * Returns TRUE if the portal's execution is complete, FALSE if it was
 * suspended due to exhaustion of the count parameter.
 */
bool
549 550
PortalRun(Portal portal, long count,
		  DestReceiver *dest, DestReceiver *altdest,
551 552 553
		  char *completionTag)
{
	bool		result;
554 555
	ResourceOwner saveTopTransactionResourceOwner;
	MemoryContext saveTopTransactionContext;
556
	Portal		saveActivePortal;
557
	Snapshot	saveActiveSnapshot;
558
	ResourceOwner saveResourceOwner;
559 560
	MemoryContext savePortalContext;
	MemoryContext saveQueryContext;
561
	MemoryContext saveMemoryContext;
562 563 564 565 566 567 568

	AssertArg(PortalIsValid(portal));

	/* Initialize completion tag to empty string */
	if (completionTag)
		completionTag[0] = '\0';

569
	if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
570 571
	{
		ereport(DEBUG3,
Bruce Momjian's avatar
Bruce Momjian committed
572
				(errmsg_internal("PortalRun")));
573
		/* PORTAL_MULTI_QUERY logs its own stats per query */
574
		ResetUsage();
575
	}
Bruce Momjian's avatar
Bruce Momjian committed
576

577 578 579
	/*
	 * Check for improper portal use, and mark portal active.
	 */
580
	if (portal->status != PORTAL_READY)
581 582
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
583 584
				 errmsg("portal \"%s\" cannot be run", portal->name)));
	portal->status = PORTAL_ACTIVE;
585 586

	/*
587
	 * Set up global portal context pointers.
588 589 590
	 *
	 * We have to play a special game here to support utility commands like
	 * VACUUM and CLUSTER, which internally start and commit transactions.
591 592 593 594 595 596 597 598
	 * When we are called to execute such a command, CurrentResourceOwner will
	 * be pointing to the TopTransactionResourceOwner --- which will be
	 * destroyed and replaced in the course of the internal commit and
	 * restart.  So we need to be prepared to restore it as pointing to the
	 * exit-time TopTransactionResourceOwner.  (Ain't that ugly?  This idea of
	 * internally starting whole new transactions is not good.)
	 * CurrentMemoryContext has a similar problem, but the other pointers we
	 * save here will be NULL or pointing to longer-lived objects.
599
	 */
600 601
	saveTopTransactionResourceOwner = TopTransactionResourceOwner;
	saveTopTransactionContext = TopTransactionContext;
602
	saveActivePortal = ActivePortal;
603
	saveActiveSnapshot = ActiveSnapshot;
604
	saveResourceOwner = CurrentResourceOwner;
605 606
	savePortalContext = PortalContext;
	saveQueryContext = QueryContext;
607
	saveMemoryContext = CurrentMemoryContext;
608
	PG_TRY();
609
	{
610
		ActivePortal = portal;
611
		ActiveSnapshot = NULL;	/* will be set later */
612 613 614
		CurrentResourceOwner = portal->resowner;
		PortalContext = PortalGetHeapMemory(portal);
		QueryContext = portal->queryContext;
615

616
		MemoryContextSwitchTo(PortalContext);
Bruce Momjian's avatar
Bruce Momjian committed
617

618 619 620 621 622 623 624 625 626 627 628 629
		switch (portal->strategy)
		{
			case PORTAL_ONE_SELECT:
				(void) PortalRunSelect(portal, true, count, dest);
				/* we know the query is supposed to set the tag */
				if (completionTag && portal->commandTag)
					strcpy(completionTag, portal->commandTag);

				/* Mark portal not active */
				portal->status = PORTAL_READY;

				/*
630
				 * Since it's a forward fetch, say DONE iff atEnd is now true.
631 632 633
				 */
				result = portal->atEnd;
				break;
Bruce Momjian's avatar
Bruce Momjian committed
634

635 636 637 638 639 640 641 642 643 644 645
			case PORTAL_UTIL_SELECT:

				/*
				 * If we have not yet run the utility statement, do so,
				 * storing its results in the portal's tuplestore.
				 */
				if (!portal->portalUtilReady)
				{
					DestReceiver *treceiver;

					PortalCreateHoldStore(portal);
646
					treceiver = CreateDestReceiver(DestTuplestore, portal);
647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668
					PortalRunUtility(portal, linitial(portal->parseTrees),
									 treceiver, NULL);
					(*treceiver->rDestroy) (treceiver);
					portal->portalUtilReady = true;
				}

				/*
				 * Now fetch desired portion of results.
				 */
				(void) PortalRunSelect(portal, true, count, dest);

				/*
				 * We know the query is supposed to set the tag; we assume
				 * only the default tag is needed.
				 */
				if (completionTag && portal->commandTag)
					strcpy(completionTag, portal->commandTag);

				/* Mark portal not active */
				portal->status = PORTAL_READY;

				/*
669
				 * Since it's a forward fetch, say DONE iff atEnd is now true.
670 671 672
				 */
				result = portal->atEnd;
				break;
Bruce Momjian's avatar
Bruce Momjian committed
673

674 675
			case PORTAL_MULTI_QUERY:
				PortalRunMulti(portal, dest, altdest, completionTag);
676

677 678
				/* Prevent portal's commands from being re-executed */
				portal->status = PORTAL_DONE;
679

680 681 682
				/* Always complete at end of RunMulti */
				result = true;
				break;
683

684 685 686
			default:
				elog(ERROR, "unrecognized portal strategy: %d",
					 (int) portal->strategy);
Bruce Momjian's avatar
Bruce Momjian committed
687
				result = false; /* keep compiler quiet */
688 689 690 691 692 693 694
				break;
		}
	}
	PG_CATCH();
	{
		/* Uncaught error while executing portal: mark it dead */
		portal->status = PORTAL_FAILED;
695

696
		/* Restore global vars and propagate error */
697 698 699 700
		if (saveMemoryContext == saveTopTransactionContext)
			MemoryContextSwitchTo(TopTransactionContext);
		else
			MemoryContextSwitchTo(saveMemoryContext);
701
		ActivePortal = saveActivePortal;
702
		ActiveSnapshot = saveActiveSnapshot;
703 704 705 706
		if (saveResourceOwner == saveTopTransactionResourceOwner)
			CurrentResourceOwner = TopTransactionResourceOwner;
		else
			CurrentResourceOwner = saveResourceOwner;
707 708
		PortalContext = savePortalContext;
		QueryContext = saveQueryContext;
709

710
		PG_RE_THROW();
711
	}
712
	PG_END_TRY();
713

714 715 716 717
	if (saveMemoryContext == saveTopTransactionContext)
		MemoryContextSwitchTo(TopTransactionContext);
	else
		MemoryContextSwitchTo(saveMemoryContext);
718
	ActivePortal = saveActivePortal;
719
	ActiveSnapshot = saveActiveSnapshot;
720 721 722 723
	if (saveResourceOwner == saveTopTransactionResourceOwner)
		CurrentResourceOwner = TopTransactionResourceOwner;
	else
		CurrentResourceOwner = saveResourceOwner;
724 725 726
	PortalContext = savePortalContext;
	QueryContext = saveQueryContext;

727 728 729
	if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
		ShowUsage("EXECUTOR STATISTICS");

730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748
	return result;
}

/*
 * PortalRunSelect
 *		Execute a portal's query in SELECT cases (also UTIL_SELECT).
 *
 * This handles simple N-rows-forward-or-backward cases.  For more complex
 * nonsequential access to a portal, see PortalRunFetch.
 *
 * count <= 0 is interpreted as a no-op: the destination gets started up
 * and shut down, but nothing else happens.  Also, count == FETCH_ALL is
 * interpreted as "all rows".
 *
 * Caller must already have validated the Portal and done appropriate
 * setup (cf. PortalRun).
 *
 * Returns number of rows processed (suitable for use in result tag)
 */
749
static long
750 751 752
PortalRunSelect(Portal portal,
				bool forward,
				long count,
753
				DestReceiver *dest)
754 755 756 757 758 759
{
	QueryDesc  *queryDesc;
	ScanDirection direction;
	uint32		nprocessed;

	/*
760 761
	 * NB: queryDesc will be NULL if we are fetching from a held cursor or a
	 * completed utility query; can't use it in that path.
762 763 764 765 766 767 768
	 */
	queryDesc = PortalGetQueryDesc(portal);

	/* Caller messed up if we have neither a ready query nor held data. */
	Assert(queryDesc || portal->holdStore);

	/*
Bruce Momjian's avatar
Bruce Momjian committed
769
	 * Force the queryDesc destination to the right thing.	This supports
770 771
	 * MOVE, for example, which will pass in dest = DestNone.  This is okay to
	 * change as long as we do it on every fetch.  (The Executor must not
772 773 774 775 776 777
	 * assume that dest never changes.)
	 */
	if (queryDesc)
		queryDesc->dest = dest;

	/*
778 779 780 781 782 783 784 785 786
	 * Determine which direction to go in, and check to see if we're already
	 * at the end of the available tuples in that direction.  If so, set the
	 * direction to NoMovement to avoid trying to fetch any tuples.  (This
	 * check exists because not all plan node types are robust about being
	 * called again if they've already returned NULL once.)  Then call the
	 * executor (we must not skip this, because the destination needs to see a
	 * setup and shutdown even if no tuples are available).  Finally, update
	 * the portal position state depending on the number of tuples that were
	 * retrieved.
787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802
	 */
	if (forward)
	{
		if (portal->atEnd || count <= 0)
			direction = NoMovementScanDirection;
		else
			direction = ForwardScanDirection;

		/* In the executor, zero count processes all rows */
		if (count == FETCH_ALL)
			count = 0;

		if (portal->holdStore)
			nprocessed = RunFromStore(portal, direction, count, dest);
		else
		{
803
			ActiveSnapshot = queryDesc->snapshot;
804 805 806 807
			ExecutorRun(queryDesc, direction, count);
			nprocessed = queryDesc->estate->es_processed;
		}

808
		if (!ScanDirectionIsNoMovement(direction))
809
		{
Bruce Momjian's avatar
Bruce Momjian committed
810
			long		oldPos;
811 812

			if (nprocessed > 0)
Bruce Momjian's avatar
Bruce Momjian committed
813
				portal->atStart = false;		/* OK to go backward now */
814 815
			if (count == 0 ||
				(unsigned long) nprocessed < (unsigned long) count)
Bruce Momjian's avatar
Bruce Momjian committed
816
				portal->atEnd = true;	/* we retrieved 'em all */
817 818 819 820 821 822 823 824 825 826
			oldPos = portal->portalPos;
			portal->portalPos += nprocessed;
			/* portalPos doesn't advance when we fall off the end */
			if (portal->portalPos < oldPos)
				portal->posOverflow = true;
		}
	}
	else
	{
		if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
827 828 829 830
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
					 errmsg("cursor can only scan forward"),
					 errhint("Declare it with SCROLL option to enable backward scan.")));
831 832 833 834 835 836 837 838 839 840 841 842 843 844

		if (portal->atStart || count <= 0)
			direction = NoMovementScanDirection;
		else
			direction = BackwardScanDirection;

		/* In the executor, zero count processes all rows */
		if (count == FETCH_ALL)
			count = 0;

		if (portal->holdStore)
			nprocessed = RunFromStore(portal, direction, count, dest);
		else
		{
845
			ActiveSnapshot = queryDesc->snapshot;
846 847 848 849
			ExecutorRun(queryDesc, direction, count);
			nprocessed = queryDesc->estate->es_processed;
		}

850
		if (!ScanDirectionIsNoMovement(direction))
851 852 853
		{
			if (nprocessed > 0 && portal->atEnd)
			{
Bruce Momjian's avatar
Bruce Momjian committed
854 855
				portal->atEnd = false;	/* OK to go forward now */
				portal->portalPos++;	/* adjust for endpoint case */
856 857 858 859
			}
			if (count == 0 ||
				(unsigned long) nprocessed < (unsigned long) count)
			{
Bruce Momjian's avatar
Bruce Momjian committed
860
				portal->atStart = true; /* we retrieved 'em all */
861 862 863 864 865
				portal->portalPos = 0;
				portal->posOverflow = false;
			}
			else
			{
Bruce Momjian's avatar
Bruce Momjian committed
866
				long		oldPos;
867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893

				oldPos = portal->portalPos;
				portal->portalPos -= nprocessed;
				if (portal->portalPos > oldPos ||
					portal->portalPos <= 0)
					portal->posOverflow = true;
			}
		}
	}

	return nprocessed;
}

/*
 * RunFromStore
 *		Fetch tuples from the portal's tuple store.
 *
 * Calling conventions are similar to ExecutorRun, except that we
 * do not depend on having a queryDesc or estate.  Therefore we return the
 * number of tuples processed as the result, not in estate->es_processed.
 *
 * One difference from ExecutorRun is that the destination receiver functions
 * are run in the caller's memory context (since we have no estate).  Watch
 * out for memory leaks.
 */
static uint32
RunFromStore(Portal portal, ScanDirection direction, long count,
894
			 DestReceiver *dest)
895 896
{
	long		current_tuple_count = 0;
897 898 899
	TupleTableSlot *slot;

	slot = MakeSingleTupleTableSlot(portal->tupDesc);
900

901
	(*dest->rStartup) (dest, CMD_SELECT, portal->tupDesc);
902

903
	if (ScanDirectionIsNoMovement(direction))
904 905 906 907 908
	{
		/* do nothing except start/stop the destination */
	}
	else
	{
909
		bool		forward = ScanDirectionIsForward(direction);
910 911 912 913

		for (;;)
		{
			MemoryContext oldcontext;
914
			bool		ok;
915 916 917

			oldcontext = MemoryContextSwitchTo(portal->holdContext);

918
			ok = tuplestore_gettupleslot(portal->holdStore, forward, slot);
919 920 921

			MemoryContextSwitchTo(oldcontext);

922
			if (!ok)
923 924
				break;

925
			(*dest->receiveSlot) (slot, dest);
926

927
			ExecClearTuple(slot);
928 929

			/*
930 931 932
			 * check our tuple count.. if we've processed the proper number
			 * then quit, else loop again and process more tuples. Zero count
			 * means no limit.
933 934 935 936 937 938 939
			 */
			current_tuple_count++;
			if (count && count == current_tuple_count)
				break;
		}
	}

940
	(*dest->rShutdown) (dest);
941

942 943
	ExecDropSingleTupleTableSlot(slot);

944 945 946 947 948 949 950 951 952
	return (uint32) current_tuple_count;
}

/*
 * PortalRunUtility
 *		Execute a utility statement inside a portal.
 */
static void
PortalRunUtility(Portal portal, Query *query,
953
				 DestReceiver *dest, char *completionTag)
954
{
Bruce Momjian's avatar
Bruce Momjian committed
955
	Node	   *utilityStmt = query->utilityStmt;
956

957 958
	ereport(DEBUG3,
			(errmsg_internal("ProcessUtility")));
959 960

	/*
961 962 963 964 965 966 967 968 969
	 * Set snapshot if utility stmt needs one.	Most reliable way to do this
	 * seems to be to enumerate those that do not need one; this is a short
	 * list.  Transaction control, LOCK, and SET must *not* set a snapshot
	 * since they need to be executable at the start of a serializable
	 * transaction without freezing a snapshot.  By extension we allow SHOW
	 * not to set a snapshot.  The other stmts listed are just efficiency
	 * hacks.  Beware of listing anything that can modify the database --- if,
	 * say, it has to update an index with expressions that invoke
	 * user-defined functions, then it had better have a snapshot.
970
	 *
971 972
	 * Note we assume that caller will take care of restoring ActiveSnapshot
	 * on exit/error.
973
	 */
Bruce Momjian's avatar
Bruce Momjian committed
974 975 976 977 978 979 980 981 982 983 984 985
	if (!(IsA(utilityStmt, TransactionStmt) ||
		  IsA(utilityStmt, LockStmt) ||
		  IsA(utilityStmt, VariableSetStmt) ||
		  IsA(utilityStmt, VariableShowStmt) ||
		  IsA(utilityStmt, VariableResetStmt) ||
		  IsA(utilityStmt, ConstraintsSetStmt) ||
	/* efficiency hacks from here down */
		  IsA(utilityStmt, FetchStmt) ||
		  IsA(utilityStmt, ListenStmt) ||
		  IsA(utilityStmt, NotifyStmt) ||
		  IsA(utilityStmt, UnlistenStmt) ||
		  IsA(utilityStmt, CheckPointStmt)))
986 987 988
		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
	else
		ActiveSnapshot = NULL;
989 990 991 992

	if (query->canSetTag)
	{
		/* utility statement can override default tag string */
993
		ProcessUtility(utilityStmt, portal->portalParams, dest, completionTag);
994
		if (completionTag && completionTag[0] == '\0' && portal->commandTag)
Bruce Momjian's avatar
Bruce Momjian committed
995
			strcpy(completionTag, portal->commandTag);	/* use the default */
996 997 998 999
	}
	else
	{
		/* utility added by rewrite cannot set tag */
1000
		ProcessUtility(utilityStmt, portal->portalParams, dest, NULL);
1001 1002 1003 1004
	}

	/* Some utility statements may change context on us */
	MemoryContextSwitchTo(PortalGetHeapMemory(portal));
1005 1006 1007 1008

	if (ActiveSnapshot)
		FreeSnapshot(ActiveSnapshot);
	ActiveSnapshot = NULL;
1009 1010 1011 1012 1013 1014 1015 1016
}

/*
 * PortalRunMulti
 *		Execute a portal's queries in the general case (multi queries).
 */
static void
PortalRunMulti(Portal portal,
1017
			   DestReceiver *dest, DestReceiver *altdest,
1018 1019
			   char *completionTag)
{
1020
	ListCell   *querylist_item;
1021
	ListCell   *planlist_item;
1022

1023
	/*
1024 1025 1026 1027 1028
	 * If the destination is DestRemoteExecute, change to DestNone.  The
	 * reason is that the client won't be expecting any tuples, and indeed has
	 * no way to know what they are, since there is no provision for Describe
	 * to send a RowDescription message when this portal execution strategy is
	 * in effect.  This presently will only affect SELECT commands added to
1029 1030 1031
	 * non-SELECT queries by rewrite rules: such commands will be executed,
	 * but the results will be discarded unless you use "simple Query"
	 * protocol.
1032
	 */
1033
	if (dest->mydest == DestRemoteExecute)
1034
		dest = None_Receiver;
1035
	if (altdest->mydest == DestRemoteExecute)
1036 1037
		altdest = None_Receiver;

1038
	/*
1039 1040
	 * Loop to handle the individual queries generated from a single parsetree
	 * by analysis and rewrite.
1041
	 */
1042 1043
	forboth(querylist_item, portal->parseTrees,
			planlist_item, portal->planTrees)
1044 1045
	{
		Query	   *query = (Query *) lfirst(querylist_item);
1046
		Plan	   *plan = (Plan *) lfirst(planlist_item);
1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075

		/*
		 * If we got a cancel signal in prior command, quit
		 */
		CHECK_FOR_INTERRUPTS();

		if (query->commandType == CMD_UTILITY)
		{
			/*
			 * process utility functions (create, destroy, etc..)
			 */
			Assert(plan == NULL);

			PortalRunUtility(portal, query,
							 query->canSetTag ? dest : altdest,
							 completionTag);
		}
		else
		{
			/*
			 * process a plannable query.
			 */
			if (log_executor_stats)
				ResetUsage();

			if (query->canSetTag)
			{
				/* statement can set tag string */
				ProcessQuery(query, plan,
1076
							 portal->portalParams,
1077 1078 1079 1080 1081 1082
							 dest, completionTag);
			}
			else
			{
				/* stmt added by rewrite cannot set tag */
				ProcessQuery(query, plan,
1083
							 portal->portalParams,
1084 1085 1086 1087 1088 1089 1090 1091
							 altdest, NULL);
			}

			if (log_executor_stats)
				ShowUsage("EXECUTOR STATISTICS");
		}

		/*
1092 1093
		 * Increment command counter between queries, but not after the last
		 * one.
1094
		 */
1095
		if (lnext(planlist_item) != NULL)
1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106
			CommandCounterIncrement();

		/*
		 * Clear subsidiary contexts to recover temporary memory.
		 */
		Assert(PortalGetHeapMemory(portal) == CurrentMemoryContext);

		MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
	}

	/*
1107 1108
	 * If a command completion tag was supplied, use it.  Otherwise use the
	 * portal's commandTag as the default completion tag.
1109
	 *
1110 1111 1112
	 * Exception: clients will expect INSERT/UPDATE/DELETE tags to have
	 * counts, so fake something up if necessary.  (This could happen if the
	 * original query was replaced by a DO INSTEAD rule.)
1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136
	 */
	if (completionTag && completionTag[0] == '\0')
	{
		if (portal->commandTag)
			strcpy(completionTag, portal->commandTag);
		if (strcmp(completionTag, "INSERT") == 0)
			strcpy(completionTag, "INSERT 0 0");
		else if (strcmp(completionTag, "UPDATE") == 0)
			strcpy(completionTag, "UPDATE 0");
		else if (strcmp(completionTag, "DELETE") == 0)
			strcpy(completionTag, "DELETE 0");
	}
}

/*
 * PortalRunFetch
 *		Variant form of PortalRun that supports SQL FETCH directions.
 *
 * Returns number of rows processed (suitable for use in result tag)
 */
long
PortalRunFetch(Portal portal,
			   FetchDirection fdirection,
			   long count,
1137
			   DestReceiver *dest)
1138 1139
{
	long		result;
1140
	Portal		saveActivePortal;
1141
	Snapshot	saveActiveSnapshot;
1142
	ResourceOwner saveResourceOwner;
1143 1144 1145 1146 1147 1148 1149 1150 1151
	MemoryContext savePortalContext;
	MemoryContext saveQueryContext;
	MemoryContext oldContext;

	AssertArg(PortalIsValid(portal));

	/*
	 * Check for improper portal use, and mark portal active.
	 */
1152
	if (portal->status != PORTAL_READY)
1153 1154
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1155 1156
				 errmsg("portal \"%s\" cannot be run", portal->name)));
	portal->status = PORTAL_ACTIVE;
1157 1158

	/*
1159
	 * Set up global portal context pointers.
1160
	 */
1161
	saveActivePortal = ActivePortal;
1162
	saveActiveSnapshot = ActiveSnapshot;
1163
	saveResourceOwner = CurrentResourceOwner;
1164 1165
	savePortalContext = PortalContext;
	saveQueryContext = QueryContext;
1166 1167 1168
	PG_TRY();
	{
		ActivePortal = portal;
1169
		ActiveSnapshot = NULL;	/* will be set later */
1170 1171 1172
		CurrentResourceOwner = portal->resowner;
		PortalContext = PortalGetHeapMemory(portal);
		QueryContext = portal->queryContext;
1173

1174
		oldContext = MemoryContextSwitchTo(PortalContext);
1175

1176 1177 1178 1179 1180 1181
		switch (portal->strategy)
		{
			case PORTAL_ONE_SELECT:
				result = DoPortalRunFetch(portal, fdirection, count, dest);
				break;

1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192
			case PORTAL_UTIL_SELECT:

				/*
				 * If we have not yet run the utility statement, do so,
				 * storing its results in the portal's tuplestore.
				 */
				if (!portal->portalUtilReady)
				{
					DestReceiver *treceiver;

					PortalCreateHoldStore(portal);
1193
					treceiver = CreateDestReceiver(DestTuplestore, portal);
1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205
					PortalRunUtility(portal, linitial(portal->parseTrees),
									 treceiver, NULL);
					(*treceiver->rDestroy) (treceiver);
					portal->portalUtilReady = true;
				}

				/*
				 * Now fetch desired portion of results.
				 */
				result = DoPortalRunFetch(portal, fdirection, count, dest);
				break;

1206 1207
			default:
				elog(ERROR, "unsupported portal strategy");
Bruce Momjian's avatar
Bruce Momjian committed
1208
				result = 0;		/* keep compiler quiet */
1209 1210 1211 1212
				break;
		}
	}
	PG_CATCH();
1213
	{
1214 1215 1216 1217 1218
		/* Uncaught error while executing portal: mark it dead */
		portal->status = PORTAL_FAILED;

		/* Restore global vars and propagate error */
		ActivePortal = saveActivePortal;
1219
		ActiveSnapshot = saveActiveSnapshot;
1220 1221 1222 1223 1224
		CurrentResourceOwner = saveResourceOwner;
		PortalContext = savePortalContext;
		QueryContext = saveQueryContext;

		PG_RE_THROW();
1225
	}
1226
	PG_END_TRY();
1227 1228 1229 1230

	MemoryContextSwitchTo(oldContext);

	/* Mark portal not active */
1231
	portal->status = PORTAL_READY;
1232

1233
	ActivePortal = saveActivePortal;
1234
	ActiveSnapshot = saveActiveSnapshot;
1235
	CurrentResourceOwner = saveResourceOwner;
1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251
	PortalContext = savePortalContext;
	QueryContext = saveQueryContext;

	return result;
}

/*
 * DoPortalRunFetch
 *		Guts of PortalRunFetch --- the portal context is already set up
 *
 * Returns number of rows processed (suitable for use in result tag)
 */
static long
DoPortalRunFetch(Portal portal,
				 FetchDirection fdirection,
				 long count,
1252
				 DestReceiver *dest)
1253 1254 1255
{
	bool		forward;

1256 1257
	Assert(portal->strategy == PORTAL_ONE_SELECT ||
		   portal->strategy == PORTAL_UTIL_SELECT);
1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280

	switch (fdirection)
	{
		case FETCH_FORWARD:
			if (count < 0)
			{
				fdirection = FETCH_BACKWARD;
				count = -count;
			}
			/* fall out of switch to share code with FETCH_BACKWARD */
			break;
		case FETCH_BACKWARD:
			if (count < 0)
			{
				fdirection = FETCH_FORWARD;
				count = -count;
			}
			/* fall out of switch to share code with FETCH_FORWARD */
			break;
		case FETCH_ABSOLUTE:
			if (count > 0)
			{
				/*
1281 1282 1283 1284 1285
				 * Definition: Rewind to start, advance count-1 rows, return
				 * next row (if any).  In practice, if the goal is less than
				 * halfway back to the start, it's better to scan from where
				 * we are.	In any case, we arrange to fetch the target row
				 * going forwards.
1286 1287
				 */
				if (portal->posOverflow || portal->portalPos == LONG_MAX ||
Bruce Momjian's avatar
Bruce Momjian committed
1288
					count - 1 <= portal->portalPos / 2)
1289 1290 1291
				{
					DoPortalRewind(portal);
					if (count > 1)
Bruce Momjian's avatar
Bruce Momjian committed
1292
						PortalRunSelect(portal, true, count - 1,
1293
										None_Receiver);
1294 1295 1296 1297 1298 1299 1300 1301
				}
				else
				{
					long		pos = portal->portalPos;

					if (portal->atEnd)
						pos++;	/* need one extra fetch if off end */
					if (count <= pos)
Bruce Momjian's avatar
Bruce Momjian committed
1302
						PortalRunSelect(portal, false, pos - count + 1,
1303
										None_Receiver);
Bruce Momjian's avatar
Bruce Momjian committed
1304 1305
					else if (count > pos + 1)
						PortalRunSelect(portal, true, count - pos - 1,
1306
										None_Receiver);
1307 1308 1309 1310 1311 1312 1313
				}
				return PortalRunSelect(portal, true, 1L, dest);
			}
			else if (count < 0)
			{
				/*
				 * Definition: Advance to end, back up abs(count)-1 rows,
1314 1315 1316 1317
				 * return prior row (if any).  We could optimize this if we
				 * knew in advance where the end was, but typically we won't.
				 * (Is it worth considering case where count > half of size of
				 * query?  We could rewind once we know the size ...)
1318
				 */
1319
				PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
1320
				if (count < -1)
Bruce Momjian's avatar
Bruce Momjian committed
1321
					PortalRunSelect(portal, false, -count - 1, None_Receiver);
1322 1323
				return PortalRunSelect(portal, false, 1L, dest);
			}
Bruce Momjian's avatar
Bruce Momjian committed
1324
			else
1325
			{
1326
				/* count == 0 */
1327 1328 1329 1330 1331 1332 1333 1334 1335
				/* Rewind to start, return zero rows */
				DoPortalRewind(portal);
				return PortalRunSelect(portal, true, 0L, dest);
			}
			break;
		case FETCH_RELATIVE:
			if (count > 0)
			{
				/*
1336
				 * Definition: advance count-1 rows, return next row (if any).
1337 1338
				 */
				if (count > 1)
Bruce Momjian's avatar
Bruce Momjian committed
1339
					PortalRunSelect(portal, true, count - 1, None_Receiver);
1340 1341 1342 1343 1344
				return PortalRunSelect(portal, true, 1L, dest);
			}
			else if (count < 0)
			{
				/*
1345 1346
				 * Definition: back up abs(count)-1 rows, return prior row (if
				 * any).
1347 1348
				 */
				if (count < -1)
Bruce Momjian's avatar
Bruce Momjian committed
1349
					PortalRunSelect(portal, false, -count - 1, None_Receiver);
1350 1351
				return PortalRunSelect(portal, false, 1L, dest);
			}
Bruce Momjian's avatar
Bruce Momjian committed
1352
			else
1353
			{
1354
				/* count == 0 */
1355 1356 1357 1358 1359
				/* Same as FETCH FORWARD 0, so fall out of switch */
				fdirection = FETCH_FORWARD;
			}
			break;
		default:
1360
			elog(ERROR, "bogus direction");
1361 1362 1363 1364
			break;
	}

	/*
1365 1366
	 * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and count
	 * >= 0.
1367 1368 1369 1370 1371 1372 1373 1374
	 */
	forward = (fdirection == FETCH_FORWARD);

	/*
	 * Zero count means to re-fetch the current row, if any (per SQL92)
	 */
	if (count == 0)
	{
Bruce Momjian's avatar
Bruce Momjian committed
1375
		bool		on_row;
1376 1377 1378 1379

		/* Are we sitting on a row? */
		on_row = (!portal->atStart && !portal->atEnd);

1380
		if (dest->mydest == DestNone)
1381 1382 1383 1384 1385 1386 1387
		{
			/* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */
			return on_row ? 1L : 0L;
		}
		else
		{
			/*
1388 1389 1390 1391 1392
			 * If we are sitting on a row, back up one so we can re-fetch it.
			 * If we are not sitting on a row, we still have to start up and
			 * shut down the executor so that the destination is initialized
			 * and shut down correctly; so keep going.	To PortalRunSelect,
			 * count == 0 means we will retrieve no row.
1393 1394 1395
			 */
			if (on_row)
			{
1396
				PortalRunSelect(portal, false, 1L, None_Receiver);
1397 1398 1399 1400 1401 1402 1403 1404 1405 1406
				/* Set up to fetch one row forward */
				count = 1;
				forward = true;
			}
		}
	}

	/*
	 * Optimize MOVE BACKWARD ALL into a Rewind.
	 */
1407
	if (!forward && count == FETCH_ALL && dest->mydest == DestNone)
1408
	{
Bruce Momjian's avatar
Bruce Momjian committed
1409
		long		result = portal->portalPos;
1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442

		if (result > 0 && !portal->atEnd)
			result--;
		DoPortalRewind(portal);
		/* result is bogus if pos had overflowed, but it's best we can do */
		return result;
	}

	return PortalRunSelect(portal, forward, count, dest);
}

/*
 * DoPortalRewind - rewind a Portal to starting point
 */
static void
DoPortalRewind(Portal portal)
{
	if (portal->holdStore)
	{
		MemoryContext oldcontext;

		oldcontext = MemoryContextSwitchTo(portal->holdContext);
		tuplestore_rescan(portal->holdStore);
		MemoryContextSwitchTo(oldcontext);
	}
	if (PortalGetQueryDesc(portal))
		ExecutorRewind(PortalGetQueryDesc(portal));

	portal->atStart = true;
	portal->atEnd = false;
	portal->portalPos = 0;
	portal->posOverflow = false;
}