Commit 12c5cad7 authored by Michael Paquier's avatar Michael Paquier

Handle logical decoding in multi-insert for catalog tuples

The code path for multi-insert decoding is not stressed yet for
catalogs (a future patch may introduce this capability), so no
back-patch is needed.

Author: Daniel Gustafsson
Discussion: https://postgr.es/m/9690D72F-5C4F-4016-9572-6D16684E1D87@yesql.se
parent 84ec9b23
...@@ -891,6 +891,13 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -891,6 +891,13 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xlrec = (xl_heap_multi_insert *) XLogRecGetData(r); xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
/*
* Ignore insert records without new tuples. This happens when a
* multi_insert is done on a catalog or on a non-persistent relation.
*/
if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
return;
/* only interested in our database */ /* only interested in our database */
XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL); XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL);
if (rnode.dbNode != ctx->slot->data.database) if (rnode.dbNode != ctx->slot->data.database)
...@@ -901,8 +908,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -901,8 +908,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
return; return;
/* /*
* As multi_insert is not used for catalogs yet, the block should always * We know that this multi_insert isn't for a catalog, so the block should
* have data even if a full-page write of it is taken. * always have data even if a full-page write of it is taken.
*/ */
tupledata = XLogRecGetBlockData(r, 0, &tuplelen); tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
Assert(tupledata != NULL); Assert(tupledata != NULL);
...@@ -914,6 +921,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -914,6 +921,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_multi_insert_tuple *xlhdr; xl_multi_insert_tuple *xlhdr;
int datalen; int datalen;
ReorderBufferTupleBuf *tuple; ReorderBufferTupleBuf *tuple;
HeapTupleHeader header;
change = ReorderBufferGetChange(ctx->reorder); change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT; change->action = REORDER_BUFFER_CHANGE_INSERT;
...@@ -925,43 +933,30 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ...@@ -925,43 +933,30 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
data = ((char *) xlhdr) + SizeOfMultiInsertTuple; data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
datalen = xlhdr->datalen; datalen = xlhdr->datalen;
/* change->data.tp.newtuple =
* CONTAINS_NEW_TUPLE will always be set currently as multi_insert ReorderBufferGetTupleBuf(ctx->reorder, datalen);
* isn't used for catalogs, but better be future proof.
*
* We decode the tuple in pretty much the same way as DecodeXLogTuple,
* but since the layout is slightly different, we can't use it here.
*/
if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
{
HeapTupleHeader header;
change->data.tp.newtuple =
ReorderBufferGetTupleBuf(ctx->reorder, datalen);
tuple = change->data.tp.newtuple; tuple = change->data.tp.newtuple;
header = tuple->tuple.t_data; header = tuple->tuple.t_data;
/* not a disk based tuple */ /* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self); ItemPointerSetInvalid(&tuple->tuple.t_self);
/* /*
* We can only figure this out after reassembling the * We can only figure this out after reassembling the transactions.
* transactions. */
*/ tuple->tuple.t_tableOid = InvalidOid;
tuple->tuple.t_tableOid = InvalidOid;
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
memset(header, 0, SizeofHeapTupleHeader); memset(header, 0, SizeofHeapTupleHeader);
memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader, memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
(char *) data, (char *) data,
datalen); datalen);
header->t_infomask = xlhdr->t_infomask; header->t_infomask = xlhdr->t_infomask;
header->t_infomask2 = xlhdr->t_infomask2; header->t_infomask2 = xlhdr->t_infomask2;
header->t_hoff = xlhdr->t_hoff; header->t_hoff = xlhdr->t_hoff;
}
/* /*
* Reset toast reassembly state only after the last row in the last * Reset toast reassembly state only after the last row in the last
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment