walsender.c 24.4 KB
Newer Older
1 2 3 4
/*-------------------------------------------------------------------------
 *
 * walsender.c
 *
5
 * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
 * charge of XLOG streaming sender in the primary server. At first, it is
 * started by the postmaster when the walreceiver in the standby server
 * connects to the primary server and requests XLOG streaming replication,
 * i.e., unlike any auxiliary process, it is not an always-running process.
 * It attempts to keep reading XLOG records from the disk and sending them
 * to the standby server, as long as the connection is alive (i.e., like
 * any backend, there is an one to one relationship between a connection
 * and a walsender process).
 *
 * Normal termination is by SIGTERM, which instructs the walsender to
 * close the connection and exit(0) at next convenient moment. Emergency
 * termination is by SIGQUIT; like any backend, the walsender will simply
 * abort and exit on SIGQUIT. A close of the connection and a FATAL error
 * are treated as not a crash but approximately normal termination;
 * the walsender will exit quickly without sending any more XLOG records.
 *
 * If the server is shut down, postmaster sends us SIGUSR2 after all
 * regular backends have exited and the shutdown checkpoint has been written.
 * This instruct walsender to send any outstanding WAL, including the
 * shutdown checkpoint record, and then exit.
 *
 * Note that there can be more than one walsender process concurrently.
 *
 * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
 *
 *
 * IDENTIFICATION
33
 *	  $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.21 2010/05/26 22:21:33 heikki Exp $
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <unistd.h>

#include "access/xlog_internal.h"
#include "catalog/pg_type.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "tcop/tcopprot.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"

56

57 58 59 60 61 62 63
/* Array of WalSnds in shared memory */
WalSndCtlData *WalSndCtl = NULL;

/* My slot in the shared memory array */
static WalSnd *MyWalSnd = NULL;

/* Global state */
Bruce Momjian's avatar
Bruce Momjian committed
64
bool		am_walsender = false;		/* Am I a walsender process ? */
65 66

/* User-settable parameters for walsender */
67
int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
Bruce Momjian's avatar
Bruce Momjian committed
68
int			WalSndDelay = 200;	/* max sleep time between some actions */
69

70
#define NAPTIME_PER_CYCLE 100000L	/* max sleep time between cycles (100ms) */
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98

/*
 * These variables are used similarly to openLogFile/Id/Seg/Off,
 * but for walsender to read the XLOG.
 */
static int	sendFile = -1;
static uint32 sendId = 0;
static uint32 sendSeg = 0;
static uint32 sendOff = 0;

/*
 * How far have we sent WAL already? This is also advertised in
 * MyWalSnd->sentPtr.
 */
static XLogRecPtr sentPtr = {0, 0};

/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t shutdown_requested = false;
static volatile sig_atomic_t ready_to_stop = false;

/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void WalSndShutdownHandler(SIGNAL_ARGS);
static void WalSndQuickDieHandler(SIGNAL_ARGS);

/* Prototypes for private functions */
static int	WalSndLoop(void);
Bruce Momjian's avatar
Bruce Momjian committed
99 100 101
static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
102
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
103
static bool XLogSend(StringInfo outMsg, bool *caughtup);
104 105 106 107
static void CheckClosedConnection(void);

/*
 * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
108 109 110 111 112 113
 *
 * We don't have a good idea of what a good value would be; there's some
 * overhead per message in both walsender and walreceiver, but on the other
 * hand sending large batches makes walsender less responsive to signals
 * because signals are checked only between messages. 128kB seems like
 * a reasonable guess for now.
114
 */
115
#define MAX_SEND_SIZE (128 * 1024)
116 117 118 119 120 121 122

/* Main entry point for walsender process */
int
WalSenderMain(void)
{
	MemoryContext walsnd_context;

123 124 125 126 127
	if (RecoveryInProgress())
		ereport(FATAL,
				(errcode(ERRCODE_CANNOT_CONNECT_NOW),
				 errmsg("recovery is still in progress, can't accept WAL streaming connections")));

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
	/* Create a per-walsender data structure in shared memory */
	InitWalSnd();

	/*
	 * Create a memory context that we will do all our work in.  We do this so
	 * that we can reset the context during error recovery and thereby avoid
	 * possible memory leaks.  Formerly this code just ran in
	 * TopMemoryContext, but resetting that would be a really bad idea.
	 *
	 * XXX: we don't actually attempt error recovery in walsender, we just
	 * close the connection and exit.
	 */
	walsnd_context = AllocSetContextCreate(TopMemoryContext,
										   "Wal Sender",
										   ALLOCSET_DEFAULT_MINSIZE,
										   ALLOCSET_DEFAULT_INITSIZE,
										   ALLOCSET_DEFAULT_MAXSIZE);
	MemoryContextSwitchTo(walsnd_context);

	/* Unblock signals (they were blocked when the postmaster forked us) */
	PG_SETMASK(&UnBlockSig);

	/* Tell the standby that walsender is ready for receiving commands */
	ReadyForQuery(DestRemote);

	/* Handle handshake messages before streaming */
	WalSndHandshake();

	/* Main loop of walsender */
	return WalSndLoop();
}

static void
WalSndHandshake(void)
{
	StringInfoData input_message;
Bruce Momjian's avatar
Bruce Momjian committed
164
	bool		replication_started = false;
165 166 167 168 169

	initStringInfo(&input_message);

	while (!replication_started)
	{
Bruce Momjian's avatar
Bruce Momjian committed
170
		int			firstchar;
171 172 173 174 175 176 177 178 179 180 181 182 183 184

		/* Wait for a command to arrive */
		firstchar = pq_getbyte();

		/*
		 * Check for any other interesting events that happened while we
		 * slept.
		 */
		if (got_SIGHUP)
		{
			got_SIGHUP = false;
			ProcessConfigFile(PGC_SIGHUP);
		}

185
		if (firstchar != EOF)
186 187 188 189 190 191
		{
			/*
			 * Read the message contents. This is expected to be done without
			 * blocking because we've been able to get message type code.
			 */
			if (pq_getmessage(&input_message, 0))
Bruce Momjian's avatar
Bruce Momjian committed
192
				firstchar = EOF;	/* suitable message already logged */
193 194 195 196 197
		}

		/* Handle the very limited subset of commands expected in this phase */
		switch (firstchar)
		{
Bruce Momjian's avatar
Bruce Momjian committed
198
			case 'Q':			/* Query message */
199
				{
Bruce Momjian's avatar
Bruce Momjian committed
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
					const char *query_string;
					XLogRecPtr	recptr;

					query_string = pq_getmsgstring(&input_message);
					pq_getmsgend(&input_message);

					if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0)
					{
						StringInfoData buf;
						char		sysid[32];
						char		tli[11];

						/*
						 * Reply with a result set with one row, two columns.
						 * First col is system ID, and second if timeline ID
						 */

						snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
								 GetSystemIdentifier());
						snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);

						/* Send a RowDescription message */
						pq_beginmessage(&buf, 'T');
						pq_sendint(&buf, 2, 2); /* 2 fields */

						/* first field */
						pq_sendstring(&buf, "systemid");		/* col name */
						pq_sendint(&buf, 0, 4); /* table oid */
						pq_sendint(&buf, 0, 2); /* attnum */
						pq_sendint(&buf, TEXTOID, 4);	/* type oid */
						pq_sendint(&buf, -1, 2);		/* typlen */
						pq_sendint(&buf, 0, 4); /* typmod */
						pq_sendint(&buf, 0, 2); /* format code */

						/* second field */
						pq_sendstring(&buf, "timeline");		/* col name */
						pq_sendint(&buf, 0, 4); /* table oid */
						pq_sendint(&buf, 0, 2); /* attnum */
						pq_sendint(&buf, INT4OID, 4);	/* type oid */
						pq_sendint(&buf, 4, 2); /* typlen */
						pq_sendint(&buf, 0, 4); /* typmod */
						pq_sendint(&buf, 0, 2); /* format code */
						pq_endmessage(&buf);

						/* Send a DataRow message */
						pq_beginmessage(&buf, 'D');
						pq_sendint(&buf, 2, 2); /* # of columns */
						pq_sendint(&buf, strlen(sysid), 4);		/* col1 len */
						pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
						pq_sendint(&buf, strlen(tli), 4);		/* col2 len */
						pq_sendbytes(&buf, (char *) tli, strlen(tli));
						pq_endmessage(&buf);

						/* Send CommandComplete and ReadyForQuery messages */
						EndCommand("SELECT", DestRemote);
						ReadyForQuery(DestRemote);
					}
					else if (sscanf(query_string, "START_REPLICATION %X/%X",
									&recptr.xlogid, &recptr.xrecoff) == 2)
					{
						StringInfoData buf;

262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
						/*
						 * Check that we're logging enough information in the
						 * WAL for log-shipping.
						 *
						 * NOTE: This only checks the current value of
						 * wal_level. Even if the current setting is not
						 * 'minimal', there can be old WAL in the pg_xlog
						 * directory that was created with 'minimal'.
						 * So this is not bulletproof, the purpose is
						 * just to give a user-friendly error message that
						 * hints how to configure the system correctly.
						 */
						if (wal_level == WAL_LEVEL_MINIMAL)
							ereport(FATAL,
									(errcode(ERRCODE_CANNOT_CONNECT_NOW),
277
									 errmsg("standby connections not allowed because wal_level=\"minimal\"")));
278

Bruce Momjian's avatar
Bruce Momjian committed
279 280 281 282 283
						/* Send a CopyOutResponse message, and start streaming */
						pq_beginmessage(&buf, 'H');
						pq_sendbyte(&buf, 0);
						pq_sendint(&buf, 0, 2);
						pq_endmessage(&buf);
284
						pq_flush();
Bruce Momjian's avatar
Bruce Momjian committed
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301

						/*
						 * Initialize position to the received one, then the
						 * xlog records begin to be shipped from that position
						 */
						sentPtr = recptr;

						/* break out of the loop */
						replication_started = true;
					}
					else
					{
						ereport(FATAL,
								(errcode(ERRCODE_PROTOCOL_VIOLATION),
								 errmsg("invalid standby query string: %s", query_string)));
					}
					break;
302 303 304
				}

			case 'X':
305
				/* standby is closing the connection */
306 307 308
				proc_exit(0);

			case EOF:
309 310
				/* standby disconnected unexpectedly */
				ereport(COMMERROR,
311 312
						(errcode(ERRCODE_PROTOCOL_VIOLATION),
						 errmsg("unexpected EOF on standby connection")));
313
				proc_exit(0);
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329

			default:
				ereport(FATAL,
						(errcode(ERRCODE_PROTOCOL_VIOLATION),
						 errmsg("invalid standby handshake message type %d", firstchar)));
		}
	}
}

/*
 * Check if the remote end has closed the connection.
 */
static void
CheckClosedConnection(void)
{
	unsigned char firstchar;
Bruce Momjian's avatar
Bruce Momjian committed
330
	int			r;
331 332 333 334

	r = pq_getbyte_if_available(&firstchar);
	if (r < 0)
	{
335
		/* unexpected error or EOF */
336
		ereport(COMMERROR,
337 338 339
				(errcode(ERRCODE_PROTOCOL_VIOLATION),
				 errmsg("unexpected EOF on standby connection")));
		proc_exit(0);
340 341 342
	}
	if (r == 0)
	{
343 344
		/* no data available without blocking */
		return;
345 346 347 348 349
	}

	/* Handle the very limited subset of commands expected in this phase */
	switch (firstchar)
	{
Bruce Momjian's avatar
Bruce Momjian committed
350 351 352
			/*
			 * 'X' means that the standby is closing down the socket.
			 */
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
		case 'X':
			proc_exit(0);

		default:
			ereport(FATAL,
					(errcode(ERRCODE_PROTOCOL_VIOLATION),
					 errmsg("invalid standby closing message type %d",
							firstchar)));
	}
}

/* Main loop of walsender process */
static int
WalSndLoop(void)
{
	StringInfoData output_message;
369
	bool		caughtup = false;
370 371 372 373 374 375

	initStringInfo(&output_message);

	/* Loop forever */
	for (;;)
	{
376
		long	remain;		/* remaining time (us) */
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396

		/*
		 * Emergency bailout if postmaster has died.  This is to avoid the
		 * necessity for manual cleanup of all postmaster children.
		 */
		if (!PostmasterIsAlive(true))
			exit(1);
		/* Process any requests or signals received recently */
		if (got_SIGHUP)
		{
			got_SIGHUP = false;
			ProcessConfigFile(PGC_SIGHUP);
		}

		/*
		 * When SIGUSR2 arrives, we send all outstanding logs up to the
		 * shutdown checkpoint record (i.e., the latest record) and exit.
		 */
		if (ready_to_stop)
		{
397
			XLogSend(&output_message, &caughtup);
398 399 400 401 402 403 404 405 406 407 408 409 410 411
			shutdown_requested = true;
		}

		/* Normal exit from the walsender is here */
		if (shutdown_requested)
		{
			/* Inform the standby that XLOG streaming was done */
			pq_puttextmessage('C', "COPY 0");
			pq_flush();

			proc_exit(0);
		}

		/*
412 413
		 * If we had sent all accumulated WAL in last round, nap for the
		 * configured time before retrying.
414 415 416
		 *
		 * On some platforms, signals won't interrupt the sleep.  To ensure we
		 * respond reasonably promptly when someone signals us, break down the
417
		 * sleep into NAPTIME_PER_CYCLE increments, and check for
418 419
		 * interrupts after each nap.
		 */
420
		if (caughtup)
421
		{
422 423 424 425 426 427
			remain = WalSndDelay * 1000L;
			while (remain > 0)
			{
				/* Check for interrupts */
				if (got_SIGHUP || shutdown_requested || ready_to_stop)
					break;
428

429 430 431
				/* Sleep and check that the connection is still alive */
				pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
				CheckClosedConnection();
432

433 434
				remain -= NAPTIME_PER_CYCLE;
			}
435 436
		}
		/* Attempt to send the log once every loop */
437
		if (!XLogSend(&output_message, &caughtup))
438 439 440 441 442 443 444
			goto eof;
	}

	/* can't get here because the above loop never exits */
	return 1;

eof:
Bruce Momjian's avatar
Bruce Momjian committed
445

446
	/*
Bruce Momjian's avatar
Bruce Momjian committed
447 448
	 * Reset whereToSendOutput to prevent ereport from attempting to send any
	 * more messages to the standby.
449 450 451 452 453
	 */
	if (whereToSendOutput == DestRemote)
		whereToSendOutput = DestNone;

	proc_exit(0);
Bruce Momjian's avatar
Bruce Momjian committed
454
	return 1;					/* keep the compiler quiet */
455 456 457 458 459 460 461
}

/* Initialize a per-walsender data structure for this walsender process */
static void
InitWalSnd(void)
{
	/* use volatile pointer to prevent code rearrangement */
Bruce Momjian's avatar
Bruce Momjian committed
462
	int			i;
463 464 465 466 467

	/*
	 * WalSndCtl should be set up already (we inherit this by fork() or
	 * EXEC_BACKEND mechanism from the postmaster).
	 */
468
	Assert(WalSndCtl != NULL);
469 470 471 472 473 474
	Assert(MyWalSnd == NULL);

	/*
	 * Find a free walsender slot and reserve it. If this fails, we must be
	 * out of WalSnd structures.
	 */
475
	for (i = 0; i < max_wal_senders; i++)
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
	{
		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];

		SpinLockAcquire(&walsnd->mutex);

		if (walsnd->pid != 0)
		{
			SpinLockRelease(&walsnd->mutex);
			continue;
		}
		else
		{
			/* found */
			MyWalSnd = (WalSnd *) walsnd;
			walsnd->pid = MyProcPid;
			MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr));
			SpinLockRelease(&walsnd->mutex);
			break;
		}
	}
	if (MyWalSnd == NULL)
		ereport(FATAL,
				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
499 500 501
				 errmsg("number of requested standby connections "
					"exceeds max_wal_senders (currently %d)",
					max_wal_senders)));
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525

	/* Arrange to clean up at walsender exit */
	on_shmem_exit(WalSndKill, 0);
}

/* Destroy the per-walsender data structure for this walsender process */
static void
WalSndKill(int code, Datum arg)
{
	Assert(MyWalSnd != NULL);

	/*
	 * Mark WalSnd struct no longer in use. Assume that no lock is required
	 * for this.
	 */
	MyWalSnd->pid = 0;

	/* WalSnd struct isn't mine anymore */
	MyWalSnd = NULL;
}

/*
 * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
 */
526
static void
527 528
XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
{
529
	XLogRecPtr	startRecPtr = recptr;
Bruce Momjian's avatar
Bruce Momjian committed
530
	char		path[MAXPGPATH];
531 532 533 534
	uint32		lastRemovedLog;
	uint32		lastRemovedSeg;
	uint32		log;
	uint32		seg;
535 536 537

	while (nbytes > 0)
	{
538
		uint32		startoff;
Bruce Momjian's avatar
Bruce Momjian committed
539 540
		int			segbytes;
		int			readbytes;
541 542 543 544 545 546 547 548 549 550 551 552 553 554

		startoff = recptr.xrecoff % XLogSegSize;

		if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
		{
			/* Switch to another logfile segment */
			if (sendFile >= 0)
				close(sendFile);

			XLByteToSeg(recptr, sendId, sendSeg);
			XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);

			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
			if (sendFile < 0)
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
			{
				/*
				 * If the file is not found, assume it's because the
				 * standby asked for a too old WAL segment that has already
				 * been removed or recycled.
				 */
				if (errno == ENOENT)
				{
					char filename[MAXFNAMELEN];
					XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
					ereport(ERROR,
							(errcode_for_file_access(),
							 errmsg("requested WAL segment %s has already been removed",
									filename)));
				}
				else
					ereport(ERROR,
							(errcode_for_file_access(),
							 errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
									path, sendId, sendSeg)));
			}
576 577 578 579 580 581 582
			sendOff = 0;
		}

		/* Need to seek in the file? */
		if (sendOff != startoff)
		{
			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
583
				ereport(ERROR,
584 585 586 587 588 589 590 591 592 593 594 595 596 597
						(errcode_for_file_access(),
						 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
								sendId, sendSeg, startoff)));
			sendOff = startoff;
		}

		/* How many bytes are within this segment? */
		if (nbytes > (XLogSegSize - startoff))
			segbytes = XLogSegSize - startoff;
		else
			segbytes = nbytes;

		readbytes = read(sendFile, buf, segbytes);
		if (readbytes <= 0)
598
			ereport(ERROR,
599
					(errcode_for_file_access(),
Bruce Momjian's avatar
Bruce Momjian committed
600 601 602
			errmsg("could not read from log file %u, segment %u, offset %u, "
				   "length %lu: %m",
				   sendId, sendSeg, sendOff, (unsigned long) segbytes)));
603 604 605 606 607 608 609 610

		/* Update state for read */
		XLByteAdvance(recptr, readbytes);

		sendOff += readbytes;
		nbytes -= readbytes;
		buf += readbytes;
	}
611 612 613 614 615 616 617 618 619

	/*
	 * After reading into the buffer, check that what we read was valid.
	 * We do this after reading, because even though the segment was present
	 * when we opened it, it might get recycled or removed while we read it.
	 * The read() succeeds in that case, but the data we tried to read might
	 * already have been overwritten with new WAL records.
	 */
	XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
620
	XLByteToSeg(startRecPtr, log, seg);
621 622 623 624 625 626 627 628 629 630
	if (log < lastRemovedLog ||
		(log == lastRemovedLog && seg <= lastRemovedSeg))
	{
		char filename[MAXFNAMELEN];
		XLogFileName(filename, ThisTimeLineID, log, seg);
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("requested WAL segment %s has already been removed",
						filename)));
	}
631 632 633
}

/*
634 635 636 637
 * Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed),
 * but not yet sent to the client, and send it. If there is no unsent WAL,
 * *caughtup is set to true and nothing is sent, otherwise *caughtup is set
 * to false.
638 639 640 641
 *
 * Returns true if OK, false if trouble.
 */
static bool
642
XLogSend(StringInfo outMsg, bool *caughtup)
643 644
{
	XLogRecPtr	SendRqstPtr;
645 646 647
	XLogRecPtr	startptr;
	XLogRecPtr	endptr;
	Size		nbytes;
Bruce Momjian's avatar
Bruce Momjian committed
648 649
	char		activitymsg[50];

650 651 652
	/* use volatile pointer to prevent code rearrangement */
	volatile WalSnd *walsnd = MyWalSnd;

Heikki Linnakangas's avatar
Heikki Linnakangas committed
653
	/* Attempt to send all records flushed to the disk already */
654 655 656 657
	SendRqstPtr = GetWriteRecPtr();

	/* Quick exit if nothing to do */
	if (!XLByteLT(sentPtr, SendRqstPtr))
658 659
	{
		*caughtup = true;
660
		return true;
661 662 663 664 665 666 667 668
	}
	/*
	 * Otherwise let the caller know that we're not fully caught up. Unless
	 * there's a huge backlog, we'll be caught up to the current WriteRecPtr
	 * after we've sent everything below, but more WAL could accumulate while
	 * we're busy sending.
	 */
	*caughtup = false;
669 670

	/*
671 672 673 674 675 676 677 678 679 680
	 * Figure out how much to send in one message. If there's less than
	 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
	 * MAX_SEND_SIZE bytes, but round to page boundary.
	 *
	 * The rounding is not only for performance reasons. Walreceiver
	 * relies on the fact that we never split a WAL record across two
	 * messages. Since a long WAL record is split at page boundary into
	 * continuation records, page boundary is always a safe cut-off point.
	 * We also assume that SendRqstPtr never points in the middle of a WAL
	 * record.
681
	 */
682 683
	startptr = sentPtr;
	if (startptr.xrecoff >= XLogFileSize)
684 685
	{
		/*
686 687
		 * crossing a logid boundary, skip the non-existent last log
		 * segment in previous logical log file.
688
		 */
689 690 691
		startptr.xlogid += 1;
		startptr.xrecoff = 0;
	}
692

693 694 695 696 697 698 699
	endptr = startptr;
	XLByteAdvance(endptr, MAX_SEND_SIZE);
	/* round down to page boundary. */
	endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
	/* if we went beyond SendRqstPtr, back off */
	if (XLByteLT(SendRqstPtr, endptr))
		endptr = SendRqstPtr;
700

701 702 703 704 705 706 707 708 709 710
	/*
	 * OK to read and send the slice.
	 *
	 * We don't need to convert the xlogid/xrecoff from host byte order to
	 * network byte order because the both server can be expected to have
	 * the same byte order. If they have different byte order, we don't
	 * reach here.
	 */
	pq_sendbyte(outMsg, 'w');
	pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
711

712 713 714 715 716 717 718
	if (endptr.xlogid != startptr.xlogid)
	{
		Assert(endptr.xlogid == startptr.xlogid + 1);
		nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
	}
	else
		nbytes = endptr.xrecoff - startptr.xrecoff;
719

720
	sentPtr = endptr;
721

722 723 724 725 726
	/*
	 * Read the log directly into the output buffer to prevent extra
	 * memcpy calls.
	 */
	enlargeStringInfo(outMsg, nbytes);
727

728 729 730
	XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
	outMsg->len += nbytes;
	outMsg->data[outMsg->len] = '\0';
731

732 733
	pq_putmessage('d', outMsg->data, outMsg->len);
	resetStringInfo(outMsg);
734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791

	/* Update shared memory status */
	SpinLockAcquire(&walsnd->mutex);
	walsnd->sentPtr = sentPtr;
	SpinLockRelease(&walsnd->mutex);

	/* Flush pending output */
	if (pq_flush())
		return false;

	/* Report progress of XLOG streaming in PS display */
	snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
			 sentPtr.xlogid, sentPtr.xrecoff);
	set_ps_display(activitymsg, false);

	return true;
}

/* SIGHUP: set flag to re-read config file at next convenient time */
static void
WalSndSigHupHandler(SIGNAL_ARGS)
{
	got_SIGHUP = true;
}

/* SIGTERM: set flag to shut down */
static void
WalSndShutdownHandler(SIGNAL_ARGS)
{
	shutdown_requested = true;
}

/*
 * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
 *
 * Some backend has bought the farm,
 * so we need to stop what we're doing and exit.
 */
static void
WalSndQuickDieHandler(SIGNAL_ARGS)
{
	PG_SETMASK(&BlockSig);

	/*
	 * We DO NOT want to run proc_exit() callbacks -- we're here because
	 * shared memory may be corrupted, so we don't want to try to clean up our
	 * transaction.  Just nail the windows shut and get out of town.  Now that
	 * there's an atexit callback to prevent third-party code from breaking
	 * things by calling exit() directly, we have to reset the callbacks
	 * explicitly to make this work as intended.
	 */
	on_exit_reset();

	/*
	 * Note we do exit(2) not exit(0).	This is to force the postmaster into a
	 * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
	 * backend.  This is necessary precisely because we don't clean up our
	 * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
Bruce Momjian's avatar
Bruce Momjian committed
792 793
	 * should ensure the postmaster sees this as a crash, too, but no harm in
	 * being doubly sure.)
794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
	 */
	exit(2);
}

/* SIGUSR2: set flag to do a last cycle and shut down afterwards */
static void
WalSndLastCycleHandler(SIGNAL_ARGS)
{
	ready_to_stop = true;
}

/* Set up signal handlers */
void
WalSndSignals(void)
{
	/* Set up signal handlers */
Bruce Momjian's avatar
Bruce Momjian committed
810 811
	pqsignal(SIGHUP, WalSndSigHupHandler);		/* set flag to read config
												 * file */
812 813 814 815 816
	pqsignal(SIGINT, SIG_IGN);	/* not used */
	pqsignal(SIGTERM, WalSndShutdownHandler);	/* request shutdown */
	pqsignal(SIGQUIT, WalSndQuickDieHandler);	/* hard crash time */
	pqsignal(SIGALRM, SIG_IGN);
	pqsignal(SIGPIPE, SIG_IGN);
Bruce Momjian's avatar
Bruce Momjian committed
817 818 819
	pqsignal(SIGUSR1, SIG_IGN); /* not used */
	pqsignal(SIGUSR2, WalSndLastCycleHandler);	/* request a last cycle and
												 * shutdown */
820 821 822 823 824 825 826 827 828 829 830 831 832

	/* Reset some signals that are accepted by postmaster but not here */
	pqsignal(SIGCHLD, SIG_DFL);
	pqsignal(SIGTTIN, SIG_DFL);
	pqsignal(SIGTTOU, SIG_DFL);
	pqsignal(SIGCONT, SIG_DFL);
	pqsignal(SIGWINCH, SIG_DFL);
}

/* Report shared-memory space needed by WalSndShmemInit */
Size
WalSndShmemSize(void)
{
Bruce Momjian's avatar
Bruce Momjian committed
833
	Size		size = 0;
834 835

	size = offsetof(WalSndCtlData, walsnds);
836
	size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
837 838 839 840 841 842 843 844

	return size;
}

/* Allocate and initialize walsender-related shared memory */
void
WalSndShmemInit(void)
{
Bruce Momjian's avatar
Bruce Momjian committed
845 846
	bool		found;
	int			i;
847 848 849 850

	WalSndCtl = (WalSndCtlData *)
		ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);

851
	if (!found)
852
	{
853 854 855 856 857 858
		/* First time through, so initialize */
		MemSet(WalSndCtl, 0, WalSndShmemSize());

		for (i = 0; i < max_wal_senders; i++)
		{
			WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
Bruce Momjian's avatar
Bruce Momjian committed
859

860 861
			SpinLockInit(&walsnd->mutex);
		}
862 863 864
	}
}

865 866 867 868 869 870
/*
 * This isn't currently used for anything. Monitoring tools might be
 * interested in the future, and we'll need something like this in the
 * future for synchronous replication.
 */
#ifdef NOT_USED
871 872 873 874 875 876 877
/*
 * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
 * if none.
 */
XLogRecPtr
GetOldestWALSendPointer(void)
{
Bruce Momjian's avatar
Bruce Momjian committed
878 879 880
	XLogRecPtr	oldest = {0, 0};
	int			i;
	bool		found = false;
881

882
	for (i = 0; i < max_wal_senders; i++)
883 884
	{
		/* use volatile pointer to prevent code rearrangement */
Bruce Momjian's avatar
Bruce Momjian committed
885 886
		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
		XLogRecPtr	recptr;
887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903

		if (walsnd->pid == 0)
			continue;

		SpinLockAcquire(&walsnd->mutex);
		recptr = walsnd->sentPtr;
		SpinLockRelease(&walsnd->mutex);

		if (recptr.xlogid == 0 && recptr.xrecoff == 0)
			continue;

		if (!found || XLByteLT(recptr, oldest))
			oldest = recptr;
		found = true;
	}
	return oldest;
}
904
#endif