rserv.c 7.49 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/* rserv.c
 * Support functions for erServer replication.
 * (c) 2000 Vadim Mikheev, PostgreSQL Inc.
 */

#include "executor/spi.h"		/* this is what you need to work with SPI */
#include "commands/trigger.h"	/* -"- and triggers */
#include "utils/tqual.h"		/* -"- and SnapshotData */
#include <ctype.h>				/* tolower () */

#ifdef PG_FUNCTION_INFO_V1
#define CurrentTriggerData ((TriggerData *) fcinfo->context)
#endif

#ifdef PG_FUNCTION_INFO_V1
PG_FUNCTION_INFO_V1(_rserv_log_);
PG_FUNCTION_INFO_V1(_rserv_sync_);
PG_FUNCTION_INFO_V1(_rserv_debug_);
19 20 21 22
Datum		_rserv_log_(PG_FUNCTION_ARGS);
Datum		_rserv_sync_(PG_FUNCTION_ARGS);
Datum		_rserv_debug_(PG_FUNCTION_ARGS);

23 24 25 26 27 28 29 30
#else
HeapTuple	_rserv_log_(void);
int32		_rserv_sync_(int32);
int32		_rserv_debug_(int32);
#endif

static int	debug = 0;

31
static char *OutputValue(char *key, char *buf, int size);
32 33 34 35 36 37 38 39 40 41 42 43 44 45

#ifdef PG_FUNCTION_INFO_V1
Datum
_rserv_log_(PG_FUNCTION_ARGS)
#else
HeapTuple
_rserv_log_()
#endif
{
	Trigger    *trigger;		/* to get trigger name */
	int			nargs;			/* # of args specified in CREATE TRIGGER */
	char	  **args;			/* argument: argnum */
	Relation	rel;			/* triggered relation */
	HeapTuple	tuple;			/* tuple to return */
46
	HeapTuple	newtuple = NULL;	/* tuple to return */
47 48 49 50 51 52 53 54 55 56 57 58 59
	TupleDesc	tupdesc;		/* tuple description */
	int			keynum;
	char	   *key;
	char	   *okey;
	char	   *newkey = NULL;
	int			deleted;
	char		sql[8192];
	char		outbuf[8192];
	char		oidbuf[64];
	int			ret;

	/* Called by trigger manager ? */
	if (!CurrentTriggerData)
60
		/* internal error */
61 62 63 64
		elog(ERROR, "_rserv_log_: triggers are not initialized");

	/* Should be called for ROW trigger */
	if (TRIGGER_FIRED_FOR_STATEMENT(CurrentTriggerData->tg_event))
65
		/* internal error */
66 67 68 69 70 71 72 73
		elog(ERROR, "_rserv_log_: can't process STATEMENT events");

	tuple = CurrentTriggerData->tg_trigtuple;

	trigger = CurrentTriggerData->tg_trigger;
	nargs = trigger->tgnargs;
	args = trigger->tgargs;

74
	if (nargs != 1)				/* odd number of arguments! */
75
		/* internal error */
76 77 78 79 80
		elog(ERROR, "_rserv_log_: need in *one* argument");

	keynum = atoi(args[0]);

	if (keynum < 0 && keynum != ObjectIdAttributeNumber)
81
		/* internal error */
82 83 84 85 86
		elog(ERROR, "_rserv_log_: invalid keynum %d", keynum);

	rel = CurrentTriggerData->tg_relation;
	tupdesc = rel->rd_att;

87
	deleted = (TRIGGER_FIRED_BY_DELETE(CurrentTriggerData->tg_event)) ?
88 89 90 91 92
		1 : 0;

	if (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event))
		newtuple = CurrentTriggerData->tg_newtuple;

93
#ifndef PG_FUNCTION_INFO_V1
94

95 96 97 98 99 100
	/*
	 * Setting CurrentTriggerData to NULL prevents direct calls to trigger
	 * functions in queries. Normally, trigger functions have to be called
	 * by trigger manager code only.
	 */
	CurrentTriggerData = NULL;
101
#endif
102 103 104

	/* Connect to SPI manager */
	if ((ret = SPI_connect()) < 0)
105
		/* internal error */
106 107 108 109
		elog(ERROR, "_rserv_log_: SPI_connect returned %d", ret);

	if (keynum == ObjectIdAttributeNumber)
	{
110
		snprintf(oidbuf, sizeof(oidbuf), "%u", HeapTupleGetOid(tuple));
111 112 113 114 115 116
		key = oidbuf;
	}
	else
		key = SPI_getvalue(tuple, tupdesc, keynum);

	if (key == NULL)
117 118 119
		ereport(ERROR,
				(errcode(ERRCODE_NOT_NULL_VIOLATION),
				 errmsg("key must be not null")));
120 121 122 123 124

	if (newtuple && keynum != ObjectIdAttributeNumber)
	{
		newkey = SPI_getvalue(newtuple, tupdesc, keynum);
		if (newkey == NULL)
125 126 127
			ereport(ERROR,
					(errcode(ERRCODE_NOT_NULL_VIOLATION),
					 errmsg("key must be not null")));
128 129 130
		if (strcmp(newkey, key) == 0)
			newkey = NULL;
		else
131
			deleted = 1;		/* old key was deleted */
132 133 134 135 136 137 138
	}

	if (strpbrk(key, "\\	\n'"))
		okey = OutputValue(key, outbuf, sizeof(outbuf));
	else
		okey = key;

139
	snprintf(sql, 8192, "update _RSERV_LOG_ set logid = %d, logtime = now(), "
Bruce Momjian's avatar
Bruce Momjian committed
140 141
			 "deleted = %d where reloid = %u and key = '%s'",
			 GetCurrentTransactionId(), deleted, rel->rd_id, okey);
142 143

	if (debug)
144
		elog(DEBUG4, "sql: %s", sql);
145 146 147 148

	ret = SPI_exec(sql, 0);

	if (ret < 0)
149 150 151
		ereport(ERROR,
				(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION),
				 errmsg("SPI_exec(update) returned %d", ret)));
152 153 154 155 156

	/*
	 * If no tuple was UPDATEd then do INSERT...
	 */
	if (SPI_processed > 1)
157 158 159 160
		ereport(ERROR,
				(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION),
				 errmsg("duplicate tuples")));

161 162
	else if (SPI_processed == 0)
	{
163
		snprintf(sql, 8192, "insert into _RSERV_LOG_ "
Bruce Momjian's avatar
Bruce Momjian committed
164 165 166 167
				 "(reloid, logid, logtime, deleted, key) "
				 "values (%u, %d, now(), %d, '%s')",
				 rel->rd_id, GetCurrentTransactionId(),
				 deleted, okey);
168 169

		if (debug)
170
			elog(DEBUG4, "sql: %s", sql);
171 172 173 174

		ret = SPI_exec(sql, 0);

		if (ret < 0)
175 176 177
			ereport(ERROR,
					(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION),
					 errmsg("SPI_exec(insert) returned %d", ret)));
178 179 180 181 182 183 184 185 186 187 188 189
	}

	if (okey != key && okey != outbuf)
		pfree(okey);

	if (newkey)
	{
		if (strpbrk(newkey, "\\	\n'"))
			okey = OutputValue(newkey, outbuf, sizeof(outbuf));
		else
			okey = newkey;

190
		snprintf(sql, 8192, "insert into _RSERV_LOG_ "
Bruce Momjian's avatar
Bruce Momjian committed
191 192 193
				 "(reloid, logid, logtime, deleted, key) "
				 "values (%u, %d, now(), 0, '%s')",
				 rel->rd_id, GetCurrentTransactionId(), okey);
194 195

		if (debug)
196
			elog(DEBUG4, "sql: %s", sql);
197 198 199 200

		ret = SPI_exec(sql, 0);

		if (ret < 0)
201 202 203
			ereport(ERROR,
					(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION),
					 errmsg("SPI_exec returned %d", ret)));
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226

		if (okey != newkey && okey != outbuf)
			pfree(okey);
	}

	SPI_finish();

#ifdef PG_FUNCTION_INFO_V1
	return (PointerGetDatum(tuple));
#else
	return (tuple);
#endif
}

#ifdef PG_FUNCTION_INFO_V1
Datum
_rserv_sync_(PG_FUNCTION_ARGS)
#else
int32
_rserv_sync_(int32 server)
#endif
{
#ifdef PG_FUNCTION_INFO_V1
227
	int32		server = PG_GETARG_INT32(0);
228
#endif
229 230 231 232 233
	char		sql[8192];
	char		buf[8192];
	char	   *active = buf;
	uint32		xcnt;
	int			ret;
234 235

	if (SerializableSnapshot == NULL)
236
		/* internal error */
237 238 239 240 241
		elog(ERROR, "_rserv_sync_: SerializableSnapshot is NULL");

	buf[0] = 0;
	for (xcnt = 0; xcnt < SerializableSnapshot->xcnt; xcnt++)
	{
242
		snprintf(buf + strlen(buf), 8192 - strlen(buf),
Bruce Momjian's avatar
Bruce Momjian committed
243 244
				 "%s%u", (xcnt) ? ", " : "",
				 SerializableSnapshot->xip[xcnt]);
245 246 247
	}

	if ((ret = SPI_connect()) < 0)
248
		/* internal error */
249 250
		elog(ERROR, "_rserv_sync_: SPI_connect returned %d", ret);

251
	snprintf(sql, 8192, "insert into _RSERV_SYNC_ "
Bruce Momjian's avatar
Bruce Momjian committed
252
			 "(server, syncid, synctime, status, minid, maxid, active) "
253
	  "values (%u, currval('_rserv_sync_seq_'), now(), 0, %d, %d, '%s')",
Bruce Momjian's avatar
Bruce Momjian committed
254
			 server, SerializableSnapshot->xmin, SerializableSnapshot->xmax, active);
255 256 257 258

	ret = SPI_exec(sql, 0);

	if (ret < 0)
259 260 261
		ereport(ERROR,
				(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION),
				 errmsg("SPI_exec returned %d", ret)));
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276

	SPI_finish();

	return (0);
}

#ifdef PG_FUNCTION_INFO_V1
Datum
_rserv_debug_(PG_FUNCTION_ARGS)
#else
int32
_rserv_debug_(int32 newval)
#endif
{
#ifdef PG_FUNCTION_INFO_V1
277
	int32		newval = PG_GETARG_INT32(0);
278
#endif
279
	int32		oldval = debug;
280 281 282 283 284 285

	debug = newval;

	return (oldval);
}

286
#define ExtendBy	1024
287

288
static char *
289 290 291 292 293 294 295 296
OutputValue(char *key, char *buf, int size)
{
	int			i = 0;
	char	   *out = buf;
	char	   *subst = NULL;
	int			slen = 0;

	size--;
297
	for (;;)
298 299 300
	{
		switch (*key)
		{
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
			case '\\':
				subst = "\\\\";
				slen = 2;
				break;
			case '	':
				subst = "\\011";
				slen = 4;
				break;
			case '\n':
				subst = "\\012";
				slen = 4;
				break;
			case '\'':
				subst = "\\047";
				slen = 4;
				break;
			case '\0':
				out[i] = 0;
				return (out);
			default:
				slen = 1;
				break;
323 324 325 326 327 328
		}

		if (i + slen >= size)
		{
			if (out == buf)
			{
329
				out = (char *) palloc(size + ExtendBy);
330 331 332 333 334
				strncpy(out, buf, i);
				size += ExtendBy;
			}
			else
			{
335
				out = (char *) repalloc(out, size + ExtendBy);
336 337 338 339 340 341 342 343 344 345 346 347 348 349
				size += ExtendBy;
			}
		}

		if (slen == 1)
			out[i++] = *key;
		else
		{
			memcpy(out + i, subst, slen);
			i += slen;
		}
		key++;
	}

350
	return (out);
351 352

}