rserv.c 6.98 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/* rserv.c
 * Support functions for erServer replication.
 * (c) 2000 Vadim Mikheev, PostgreSQL Inc.
 */

#include "executor/spi.h"		/* this is what you need to work with SPI */
#include "commands/trigger.h"	/* -"- and triggers */
#include "utils/tqual.h"		/* -"- and SnapshotData */
#include <ctype.h>				/* tolower () */

#ifdef PG_FUNCTION_INFO_V1
#define CurrentTriggerData ((TriggerData *) fcinfo->context)
#endif

#ifdef PG_FUNCTION_INFO_V1
PG_FUNCTION_INFO_V1(_rserv_log_);
PG_FUNCTION_INFO_V1(_rserv_sync_);
PG_FUNCTION_INFO_V1(_rserv_debug_);
19 20 21 22
Datum		_rserv_log_(PG_FUNCTION_ARGS);
Datum		_rserv_sync_(PG_FUNCTION_ARGS);
Datum		_rserv_debug_(PG_FUNCTION_ARGS);

23 24 25 26 27 28 29 30
#else
HeapTuple	_rserv_log_(void);
int32		_rserv_sync_(int32);
int32		_rserv_debug_(int32);
#endif

static int	debug = 0;

31
static char *OutputValue(char *key, char *buf, int size);
32 33 34 35 36 37 38 39 40 41 42 43 44 45

#ifdef PG_FUNCTION_INFO_V1
Datum
_rserv_log_(PG_FUNCTION_ARGS)
#else
HeapTuple
_rserv_log_()
#endif
{
	Trigger    *trigger;		/* to get trigger name */
	int			nargs;			/* # of args specified in CREATE TRIGGER */
	char	  **args;			/* argument: argnum */
	Relation	rel;			/* triggered relation */
	HeapTuple	tuple;			/* tuple to return */
46
	HeapTuple	newtuple = NULL;	/* tuple to return */
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
	TupleDesc	tupdesc;		/* tuple description */
	int			keynum;
	char	   *key;
	char	   *okey;
	char	   *newkey = NULL;
	int			deleted;
	char		sql[8192];
	char		outbuf[8192];
	char		oidbuf[64];
	int			ret;

	/* Called by trigger manager ? */
	if (!CurrentTriggerData)
		elog(ERROR, "_rserv_log_: triggers are not initialized");

	/* Should be called for ROW trigger */
	if (TRIGGER_FIRED_FOR_STATEMENT(CurrentTriggerData->tg_event))
		elog(ERROR, "_rserv_log_: can't process STATEMENT events");

	tuple = CurrentTriggerData->tg_trigtuple;

	trigger = CurrentTriggerData->tg_trigger;
	nargs = trigger->tgnargs;
	args = trigger->tgargs;

72
	if (nargs != 1)				/* odd number of arguments! */
73 74 75 76 77 78 79 80 81 82
		elog(ERROR, "_rserv_log_: need in *one* argument");

	keynum = atoi(args[0]);

	if (keynum < 0 && keynum != ObjectIdAttributeNumber)
		elog(ERROR, "_rserv_log_: invalid keynum %d", keynum);

	rel = CurrentTriggerData->tg_relation;
	tupdesc = rel->rd_att;

83
	deleted = (TRIGGER_FIRED_BY_DELETE(CurrentTriggerData->tg_event)) ?
84 85 86 87 88
		1 : 0;

	if (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event))
		newtuple = CurrentTriggerData->tg_newtuple;

89
#ifndef PG_FUNCTION_INFO_V1
90

91 92 93 94 95 96
	/*
	 * Setting CurrentTriggerData to NULL prevents direct calls to trigger
	 * functions in queries. Normally, trigger functions have to be called
	 * by trigger manager code only.
	 */
	CurrentTriggerData = NULL;
97
#endif
98 99 100 101 102 103 104

	/* Connect to SPI manager */
	if ((ret = SPI_connect()) < 0)
		elog(ERROR, "_rserv_log_: SPI_connect returned %d", ret);

	if (keynum == ObjectIdAttributeNumber)
	{
105 106 107 108
		snprintf(oidbuf, "%u", 64,
				 rel->rd_rel->relhasoids
		         ? HeapTupleGetOid(tuple)
		         : InvalidOid);
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
		key = oidbuf;
	}
	else
		key = SPI_getvalue(tuple, tupdesc, keynum);

	if (key == NULL)
		elog(ERROR, "_rserv_log_: key must be not null");

	if (newtuple && keynum != ObjectIdAttributeNumber)
	{
		newkey = SPI_getvalue(newtuple, tupdesc, keynum);
		if (newkey == NULL)
			elog(ERROR, "_rserv_log_: key must be not null");
		if (strcmp(newkey, key) == 0)
			newkey = NULL;
		else
125
			deleted = 1;		/* old key was deleted */
126 127 128 129 130 131 132
	}

	if (strpbrk(key, "\\	\n'"))
		okey = OutputValue(key, outbuf, sizeof(outbuf));
	else
		okey = key;

133
	snprintf(sql, 8192, "update _RSERV_LOG_ set logid = %d, logtime = now(), "
134
			"deleted = %d where reloid = %u and key = '%s'",
135 136 137
			GetCurrentTransactionId(), deleted, rel->rd_id, okey);

	if (debug)
Bruce Momjian's avatar
Bruce Momjian committed
138
		elog(DEBUG3, sql);
139 140 141 142 143 144 145 146 147 148 149 150 151

	ret = SPI_exec(sql, 0);

	if (ret < 0)
		elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);

	/*
	 * If no tuple was UPDATEd then do INSERT...
	 */
	if (SPI_processed > 1)
		elog(ERROR, "_rserv_log_: duplicate tuples");
	else if (SPI_processed == 0)
	{
152
		snprintf(sql, 8192, "insert into _RSERV_LOG_ "
153 154
				"(reloid, logid, logtime, deleted, key) "
				"values (%u, %d, now(), %d, '%s')",
155
				rel->rd_id, GetCurrentTransactionId(),
156 157 158
				deleted, okey);

		if (debug)
Bruce Momjian's avatar
Bruce Momjian committed
159
			elog(DEBUG3, sql);
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176

		ret = SPI_exec(sql, 0);

		if (ret < 0)
			elog(ERROR, "_rserv_log_: SPI_exec(insert) returned %d", ret);
	}

	if (okey != key && okey != outbuf)
		pfree(okey);

	if (newkey)
	{
		if (strpbrk(newkey, "\\	\n'"))
			okey = OutputValue(newkey, outbuf, sizeof(outbuf));
		else
			okey = newkey;

177
		snprintf(sql, 8192, "insert into _RSERV_LOG_ "
178
				"(reloid, logid, logtime, deleted, key) "
179
				"values (%u, %d, now(), 0, '%s')",
180 181 182
				rel->rd_id, GetCurrentTransactionId(), okey);

		if (debug)
Bruce Momjian's avatar
Bruce Momjian committed
183
			elog(DEBUG3, sql);
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211

		ret = SPI_exec(sql, 0);

		if (ret < 0)
			elog(ERROR, "_rserv_log_: SPI_exec returned %d", ret);

		if (okey != newkey && okey != outbuf)
			pfree(okey);
	}

	SPI_finish();

#ifdef PG_FUNCTION_INFO_V1
	return (PointerGetDatum(tuple));
#else
	return (tuple);
#endif
}

#ifdef PG_FUNCTION_INFO_V1
Datum
_rserv_sync_(PG_FUNCTION_ARGS)
#else
int32
_rserv_sync_(int32 server)
#endif
{
#ifdef PG_FUNCTION_INFO_V1
212
	int32		server = PG_GETARG_INT32(0);
213
#endif
214 215 216 217 218
	char		sql[8192];
	char		buf[8192];
	char	   *active = buf;
	uint32		xcnt;
	int			ret;
219 220 221 222 223 224 225

	if (SerializableSnapshot == NULL)
		elog(ERROR, "_rserv_sync_: SerializableSnapshot is NULL");

	buf[0] = 0;
	for (xcnt = 0; xcnt < SerializableSnapshot->xcnt; xcnt++)
	{
226 227
		snprintf(buf + strlen(buf), 8192 - strlen(buf),
				"%s%u", (xcnt) ? ", " : "",
228
				SerializableSnapshot->xip[xcnt]);
229 230 231 232 233
	}

	if ((ret = SPI_connect()) < 0)
		elog(ERROR, "_rserv_sync_: SPI_connect returned %d", ret);

234
	snprintf(sql, 8192, "insert into _RSERV_SYNC_ "
235
			"(server, syncid, synctime, status, minid, maxid, active) "
236
	  "values (%u, currval('_rserv_sync_seq_'), now(), 0, %d, %d, '%s')",
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
			server, SerializableSnapshot->xmin, SerializableSnapshot->xmax, active);

	ret = SPI_exec(sql, 0);

	if (ret < 0)
		elog(ERROR, "_rserv_sync_: SPI_exec returned %d", ret);

	SPI_finish();

	return (0);
}

#ifdef PG_FUNCTION_INFO_V1
Datum
_rserv_debug_(PG_FUNCTION_ARGS)
#else
int32
_rserv_debug_(int32 newval)
#endif
{
#ifdef PG_FUNCTION_INFO_V1
258
	int32		newval = PG_GETARG_INT32(0);
259
#endif
260
	int32		oldval = debug;
261 262 263 264 265 266

	debug = newval;

	return (oldval);
}

267
#define ExtendBy	1024
268

269
static char *
270 271 272 273 274 275 276 277
OutputValue(char *key, char *buf, int size)
{
	int			i = 0;
	char	   *out = buf;
	char	   *subst = NULL;
	int			slen = 0;

	size--;
278
	for (;;)
279 280 281
	{
		switch (*key)
		{
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
			case '\\':
				subst = "\\\\";
				slen = 2;
				break;
			case '	':
				subst = "\\011";
				slen = 4;
				break;
			case '\n':
				subst = "\\012";
				slen = 4;
				break;
			case '\'':
				subst = "\\047";
				slen = 4;
				break;
			case '\0':
				out[i] = 0;
				return (out);
			default:
				slen = 1;
				break;
304 305 306 307 308 309
		}

		if (i + slen >= size)
		{
			if (out == buf)
			{
310
				out = (char *) palloc(size + ExtendBy);
311 312 313 314 315
				strncpy(out, buf, i);
				size += ExtendBy;
			}
			else
			{
316
				out = (char *) repalloc(out, size + ExtendBy);
317 318 319 320 321 322 323 324 325 326 327 328 329 330
				size += ExtendBy;
			}
		}

		if (slen == 1)
			out[i++] = *key;
		else
		{
			memcpy(out + i, subst, slen);
			i += slen;
		}
		key++;
	}

331
	return (out);
332 333

}