diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 603 |
1 files changed, 500 insertions, 103 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 2035a2158f1..249fffeb061 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -85,12 +85,14 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, - HeapTuple newtup, bool all_visible_cleared, - bool new_all_visible_cleared); + HeapTuple newtup, HeapTuple old_key_tup, + bool all_visible_cleared, bool new_all_visible_cleared); static void HeapSatisfiesHOTandKeyUpdate(Relation relation, - Bitmapset *hot_attrs, Bitmapset *key_attrs, - bool *satisfies_hot, bool *satisfies_key, - HeapTuple oldtup, HeapTuple newtup); + Bitmapset *hot_attrs, + Bitmapset *key_attrs, Bitmapset *id_attrs, + bool *satisfies_hot, bool *satisfies_key, + bool *satisfies_id, + HeapTuple oldtup, HeapTuple newtup); static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask, uint16 old_infomask2, TransactionId add_to_xmax, LockTupleMode mode, bool is_update, @@ -108,6 +110,9 @@ static void MultiXactIdWait(MultiXactId multi, MultiXactStatus status, static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status, int *remaining, uint16 infomask); +static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); +static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_modified, + bool *copy); /* @@ -2103,11 +2108,24 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, xl_heap_insert xlrec; xl_heap_header xlhdr; XLogRecPtr recptr; - XLogRecData rdata[3]; + XLogRecData rdata[4]; Page page = BufferGetPage(buffer); uint8 info = XLOG_HEAP_INSERT; + bool need_tuple_data; + + /* + * For logical decoding, we need the tuple even if we're doing a + * full page write, so make sure to log it separately. (XXX We could + * alternatively store a pointer into the FPW). + * + * Also, if this is a catalog, we need to transmit combocids to + * properly decode, so log that as well. + */ + need_tuple_data = RelationIsLogicallyLogged(relation); + if (RelationIsAccessibleInLogicalDecoding(relation)) + log_heap_new_cid(relation, heaptup); - xlrec.all_visible_cleared = all_visible_cleared; + xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; xlrec.target.node = relation->rd_node; xlrec.target.tid = heaptup->t_self; rdata[0].data = (char *) &xlrec; @@ -2126,18 +2144,36 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ rdata[1].data = (char *) &xlhdr; rdata[1].len = SizeOfHeapHeader; - rdata[1].buffer = buffer; + rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer; rdata[1].buffer_std = true; rdata[1].next = &(rdata[2]); /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits); rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits); - rdata[2].buffer = buffer; + rdata[2].buffer = need_tuple_data ? InvalidBuffer : buffer; rdata[2].buffer_std = true; rdata[2].next = NULL; /* + * Make a separate rdata entry for the tuple's buffer if we're + * doing logical decoding, so that an eventual FPW doesn't + * remove the tuple's data. + */ + if (need_tuple_data) + { + rdata[2].next = &(rdata[3]); + + rdata[3].data = NULL; + rdata[3].len = 0; + rdata[3].buffer = buffer; + rdata[3].buffer_std = true; + rdata[3].next = NULL; + + xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + } + + /* * If this is the single and first tuple on page, we can reinit the * page instead of restoring the whole thing. Set flag, and hide * buffer references from XLogInsert. @@ -2146,7 +2182,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { info |= XLOG_HEAP_INIT_PAGE; - rdata[1].buffer = rdata[2].buffer = InvalidBuffer; + rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer; } recptr = XLogInsert(RM_HEAP_ID, info, rdata); @@ -2272,6 +2308,8 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, Page page; bool needwal; Size saveFreeSpace; + bool need_tuple_data = RelationIsLogicallyLogged(relation); + bool need_cids = RelationIsAccessibleInLogicalDecoding(relation); needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation); saveFreeSpace = RelationGetTargetPageFreeSpace(relation, @@ -2358,7 +2396,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, { XLogRecPtr recptr; xl_heap_multi_insert *xlrec; - XLogRecData rdata[2]; + XLogRecData rdata[3]; uint8 info = XLOG_HEAP2_MULTI_INSERT; char *tupledata; int totaldatalen; @@ -2388,7 +2426,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, /* the rest of the scratch space is used for tuple data */ tupledata = scratchptr; - xlrec->all_visible_cleared = all_visible_cleared; + xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; xlrec->node = relation->rd_node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntuples = nthispage; @@ -2420,6 +2458,13 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, datalen); tuphdr->datalen = datalen; scratchptr += datalen; + + /* + * We don't use heap_multi_insert for catalog tuples yet, but + * better be prepared... + */ + if (need_cids) + log_heap_new_cid(relation, heaptup); } totaldatalen = scratchptr - tupledata; Assert((scratchptr - scratch) < BLCKSZ); @@ -2431,17 +2476,34 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, rdata[1].data = tupledata; rdata[1].len = totaldatalen; - rdata[1].buffer = buffer; + rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer; rdata[1].buffer_std = true; rdata[1].next = NULL; /* + * Make a separate rdata entry for the tuple's buffer if + * we're doing logical decoding, so that an eventual FPW + * doesn't remove the tuple's data. + */ + if (need_tuple_data) + { + rdata[1].next = &(rdata[2]); + + rdata[2].data = NULL; + rdata[2].len = 0; + rdata[2].buffer = buffer; + rdata[2].buffer_std = true; + rdata[2].next = NULL; + xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + } + + /* * If we're going to reinitialize the whole page using the WAL * record, hide buffer reference from XLogInsert. */ if (init) { - rdata[1].buffer = InvalidBuffer; + rdata[1].buffer = rdata[2].buffer = InvalidBuffer; info |= XLOG_HEAP_INIT_PAGE; } @@ -2561,6 +2623,8 @@ heap_delete(Relation relation, ItemPointer tid, bool have_tuple_lock = false; bool iscombo; bool all_visible_cleared = false; + HeapTuple old_key_tuple = NULL; /* replica identity of the tuple */ + bool old_key_copied = false; Assert(ItemPointerIsValid(tid)); @@ -2734,6 +2798,12 @@ l1: /* replace cid with a combo cid if necessary */ HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo); + /* + * Compute replica identity tuple before entering the critical section so + * we don't PANIC upon a memory allocation failure. + */ + old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied); + START_CRIT_SECTION(); /* @@ -2786,9 +2856,13 @@ l1: { xl_heap_delete xlrec; XLogRecPtr recptr; - XLogRecData rdata[2]; + XLogRecData rdata[4]; + + /* For logical decode we need combocids to properly decode the catalog */ + if (RelationIsAccessibleInLogicalDecoding(relation)) + log_heap_new_cid(relation, &tp); - xlrec.all_visible_cleared = all_visible_cleared; + xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask, tp.t_data->t_infomask2); xlrec.target.node = relation->rd_node; @@ -2805,6 +2879,37 @@ l1: rdata[1].buffer_std = true; rdata[1].next = NULL; + /* + * Log replica identity of the deleted tuple if there is one + */ + if (old_key_tuple != NULL) + { + xl_heap_header xlhdr; + + xlhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2; + xlhdr.t_infomask = old_key_tuple->t_data->t_infomask; + xlhdr.t_hoff = old_key_tuple->t_data->t_hoff; + + rdata[1].next = &(rdata[2]); + rdata[2].data = (char*)&xlhdr; + rdata[2].len = SizeOfHeapHeader; + rdata[2].buffer = InvalidBuffer; + rdata[2].next = NULL; + + rdata[2].next = &(rdata[3]); + rdata[3].data = (char *) old_key_tuple->t_data + + offsetof(HeapTupleHeaderData, t_bits); + rdata[3].len = old_key_tuple->t_len + - offsetof(HeapTupleHeaderData, t_bits); + rdata[3].buffer = InvalidBuffer; + rdata[3].next = NULL; + + if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + else + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + } + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata); PageSetLSN(page, recptr); @@ -2850,6 +2955,9 @@ l1: pgstat_count_heap_delete(relation); + if (old_key_tuple != NULL && old_key_copied) + heap_freetuple(old_key_tuple); + return HeapTupleMayBeUpdated; } @@ -2934,9 +3042,12 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, TransactionId xid = GetCurrentTransactionId(); Bitmapset *hot_attrs; Bitmapset *key_attrs; + Bitmapset *id_attrs; ItemId lp; HeapTupleData oldtup; HeapTuple heaptup; + HeapTuple old_key_tuple = NULL; + bool old_key_copied = false; Page page; BlockNumber block; MultiXactStatus mxact_status; @@ -2952,6 +3063,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, bool iscombo; bool satisfies_hot; bool satisfies_key; + bool satisfies_id; bool use_hot_update = false; bool key_intact; bool all_visible_cleared = false; @@ -2979,8 +3091,10 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, * Note that we get a copy here, so we need not worry about relcache flush * happening midway through. */ - hot_attrs = RelationGetIndexAttrBitmap(relation, false); - key_attrs = RelationGetIndexAttrBitmap(relation, true); + hot_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_ALL); + key_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY); + id_attrs = RelationGetIndexAttrBitmap(relation, + INDEX_ATTR_BITMAP_IDENTITY_KEY); block = ItemPointerGetBlockNumber(otid); buffer = ReadBuffer(relation, block); @@ -3038,9 +3152,9 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, * is updates that don't manipulate key columns, not those that * serendipitiously arrive at the same key values. */ - HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs, + HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs, id_attrs, &satisfies_hot, &satisfies_key, - &oldtup, newtup); + &satisfies_id, &oldtup, newtup); if (satisfies_key) { *lockmode = LockTupleNoKeyExclusive; @@ -3514,6 +3628,14 @@ l2: PageSetFull(page); } + /* + * Compute replica identity tuple before entering the critical section so + * we don't PANIC upon a memory allocation failure. + * ExtractReplicaIdentity() will return NULL if nothing needs to be + * logged. + */ + old_key_tuple = ExtractReplicaIdentity(relation, &oldtup, !satisfies_id, &old_key_copied); + /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); @@ -3589,11 +3711,23 @@ l2: /* XLOG stuff */ if (RelationNeedsWAL(relation)) { - XLogRecPtr recptr = log_heap_update(relation, buffer, - newbuf, &oldtup, heaptup, - all_visible_cleared, - all_visible_cleared_new); + XLogRecPtr recptr; + /* + * For logical decoding we need combocids to properly decode the + * catalog. + */ + if (RelationIsAccessibleInLogicalDecoding(relation)) + { + log_heap_new_cid(relation, &oldtup); + log_heap_new_cid(relation, heaptup); + } + + recptr = log_heap_update(relation, buffer, + newbuf, &oldtup, heaptup, + old_key_tuple, + all_visible_cleared, + all_visible_cleared_new); if (newbuf != buffer) { PageSetLSN(BufferGetPage(newbuf), recptr); @@ -3644,6 +3778,9 @@ l2: heap_freetuple(heaptup); } + if (old_key_tuple != NULL && old_key_copied) + heap_freetuple(old_key_tuple); + bms_free(hot_attrs); bms_free(key_attrs); @@ -3731,63 +3868,72 @@ heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum, /* * Check which columns are being updated. * - * This simultaneously checks conditions for HOT updates and for FOR KEY - * SHARE updates. Since much of the time they will be checking very similar - * sets of columns, and doing the same tests on them, it makes sense to - * optimize and do them together. + * This simultaneously checks conditions for HOT updates, for FOR KEY + * SHARE updates, and REPLICA IDENTITY concerns. Since much of the time they + * will be checking very similar sets of columns, and doing the same tests on + * them, it makes sense to optimize and do them together. * - * We receive two bitmapsets comprising the two sets of columns we're + * We receive three bitmapsets comprising the three sets of columns we're * interested in. Note these are destructively modified; that is OK since * this is invoked at most once in heap_update. * * hot_result is set to TRUE if it's okay to do a HOT update (i.e. it does not * modified indexed columns); key_result is set to TRUE if the update does not - * modify columns used in the key. + * modify columns used in the key; id_result is set to TRUE if the update does + * not modify columns in any index marked as the REPLICA IDENTITY. */ static void -HeapSatisfiesHOTandKeyUpdate(Relation relation, - Bitmapset *hot_attrs, Bitmapset *key_attrs, +HeapSatisfiesHOTandKeyUpdate(Relation relation, Bitmapset *hot_attrs, + Bitmapset *key_attrs, Bitmapset *id_attrs, bool *satisfies_hot, bool *satisfies_key, + bool *satisfies_id, HeapTuple oldtup, HeapTuple newtup) { int next_hot_attnum; int next_key_attnum; + int next_id_attnum; bool hot_result = true; bool key_result = true; - bool key_done = false; - bool hot_done = false; + bool id_result = true; - next_hot_attnum = bms_first_member(hot_attrs); - if (next_hot_attnum == -1) - hot_done = true; - else - /* Adjust for system attributes */ - next_hot_attnum += FirstLowInvalidHeapAttributeNumber; + /* If REPLICA IDENTITY is set to FULL, id_attrs will be empty. */ + Assert(bms_is_subset(id_attrs, key_attrs)); + Assert(bms_is_subset(key_attrs, hot_attrs)); + /* + * If one of these sets contains no remaining bits, bms_first_member will + * return -1, and after adding FirstLowInvalidHeapAttributeNumber (which + * is negative!) we'll get an attribute number that can't possibly be + * real, and thus won't match any actual attribute number. + */ + next_hot_attnum = bms_first_member(hot_attrs); + next_hot_attnum += FirstLowInvalidHeapAttributeNumber; next_key_attnum = bms_first_member(key_attrs); - if (next_key_attnum == -1) - key_done = true; - else - /* Adjust for system attributes */ - next_key_attnum += FirstLowInvalidHeapAttributeNumber; + next_key_attnum += FirstLowInvalidHeapAttributeNumber; + next_id_attnum = bms_first_member(id_attrs); + next_id_attnum += FirstLowInvalidHeapAttributeNumber; for (;;) { - int check_now; bool changed; + int check_now; - /* both bitmapsets are now empty */ - if (key_done && hot_done) - break; - - /* XXX there's probably an easier way ... */ - if (hot_done) - check_now = next_key_attnum; - if (key_done) + /* + * Since the HOT attributes are a superset of the key attributes and + * the key attributes are a superset of the id attributes, this logic + * is guaranteed to identify the next column that needs to be + * checked. + */ + if (hot_result && next_hot_attnum > FirstLowInvalidHeapAttributeNumber) check_now = next_hot_attnum; + else if (key_result && next_key_attnum > FirstLowInvalidHeapAttributeNumber) + check_now = next_key_attnum; + else if (id_result && next_id_attnum > FirstLowInvalidHeapAttributeNumber) + check_now = next_id_attnum; else - check_now = Min(next_hot_attnum, next_key_attnum); + break; + /* See whether it changed. */ changed = !heap_tuple_attr_equals(RelationGetDescr(relation), check_now, oldtup, newtup); if (changed) @@ -3796,34 +3942,42 @@ HeapSatisfiesHOTandKeyUpdate(Relation relation, hot_result = false; if (check_now == next_key_attnum) key_result = false; - } + if (check_now == next_id_attnum) + id_result = false; - /* if both are false now, we can stop checking */ - if (!hot_result && !key_result) - break; + /* if all are false now, we can stop checking */ + if (!hot_result && !key_result && !id_result) + break; + } - if (check_now == next_hot_attnum) + /* + * Advance the next attribute numbers for the sets that contain + * the attribute we just checked. As we work our way through the + * columns, the next_attnum values will rise; but when each set + * becomes empty, bms_first_member() will return -1 and the attribute + * number will end up with a value less than + * FirstLowInvalidHeapAttributeNumber. + */ + if (hot_result && check_now == next_hot_attnum) { next_hot_attnum = bms_first_member(hot_attrs); - if (next_hot_attnum == -1) - hot_done = true; - else - /* Adjust for system attributes */ - next_hot_attnum += FirstLowInvalidHeapAttributeNumber; + next_hot_attnum += FirstLowInvalidHeapAttributeNumber; } - if (check_now == next_key_attnum) + if (key_result && check_now == next_key_attnum) { next_key_attnum = bms_first_member(key_attrs); - if (next_key_attnum == -1) - key_done = true; - else - /* Adjust for system attributes */ - next_key_attnum += FirstLowInvalidHeapAttributeNumber; + next_key_attnum += FirstLowInvalidHeapAttributeNumber; + } + if (id_result && check_now == next_id_attnum) + { + next_id_attnum = bms_first_member(id_attrs); + next_id_attnum += FirstLowInvalidHeapAttributeNumber; } } *satisfies_hot = hot_result; *satisfies_key = key_result; + *satisfies_id = id_result; } /* @@ -6140,14 +6294,17 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, + HeapTuple old_key_tuple, bool all_visible_cleared, bool new_all_visible_cleared) { xl_heap_update xlrec; - xl_heap_header xlhdr; + xl_heap_header_len xlhdr; + xl_heap_header_len xlhdr_idx; uint8 info; XLogRecPtr recptr; - XLogRecData rdata[4]; + XLogRecData rdata[7]; Page page = BufferGetPage(newbuf); + bool need_tuple_data = RelationIsLogicallyLogged(reln); /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); @@ -6163,9 +6320,12 @@ log_heap_update(Relation reln, Buffer oldbuf, xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask, oldtup->t_data->t_infomask2); xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data); - xlrec.all_visible_cleared = all_visible_cleared; + xlrec.flags = 0; + if (all_visible_cleared) + xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED; xlrec.newtid = newtup->t_self; - xlrec.new_all_visible_cleared = new_all_visible_cleared; + if (new_all_visible_cleared) + xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapUpdate; @@ -6178,33 +6338,86 @@ log_heap_update(Relation reln, Buffer oldbuf, rdata[1].buffer_std = true; rdata[1].next = &(rdata[2]); - xlhdr.t_infomask2 = newtup->t_data->t_infomask2; - xlhdr.t_infomask = newtup->t_data->t_infomask; - xlhdr.t_hoff = newtup->t_data->t_hoff; + xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2; + xlhdr.header.t_infomask = newtup->t_data->t_infomask; + xlhdr.header.t_hoff = newtup->t_data->t_hoff; + xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits); /* - * As with insert records, we need not store the rdata[2] segment if we - * decide to store the whole buffer instead. + * As with insert records, we need not store the rdata[2] segment + * if we decide to store the whole buffer instead unless we're + * doing logical decoding. */ rdata[2].data = (char *) &xlhdr; - rdata[2].len = SizeOfHeapHeader; - rdata[2].buffer = newbuf; + rdata[2].len = SizeOfHeapHeaderLen; + rdata[2].buffer = need_tuple_data ? InvalidBuffer : newbuf; rdata[2].buffer_std = true; rdata[2].next = &(rdata[3]); /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ - rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits); + rdata[3].data = (char *) newtup->t_data + + offsetof(HeapTupleHeaderData, t_bits); rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits); - rdata[3].buffer = newbuf; + rdata[3].buffer = need_tuple_data ? InvalidBuffer : newbuf; rdata[3].buffer_std = true; rdata[3].next = NULL; + /* + * Separate storage for the FPW buffer reference of the new page in the + * wal_level >= logical case. + */ + if (need_tuple_data) + { + rdata[3].next = &(rdata[4]); + + rdata[4].data = NULL, + rdata[4].len = 0; + rdata[4].buffer = newbuf; + rdata[4].buffer_std = true; + rdata[4].next = NULL; + xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + + /* We need to log a tuple identity */ + if (old_key_tuple) + { + /* don't really need this, but its more comfy to decode */ + xlhdr_idx.header.t_infomask2 = old_key_tuple->t_data->t_infomask2; + xlhdr_idx.header.t_infomask = old_key_tuple->t_data->t_infomask; + xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff; + xlhdr_idx.t_len = old_key_tuple->t_len; + + rdata[4].next = &(rdata[5]); + rdata[5].data = (char *) &xlhdr_idx; + rdata[5].len = SizeOfHeapHeaderLen; + rdata[5].buffer = InvalidBuffer; + rdata[5].next = &(rdata[6]); + + /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ + rdata[6].data = (char *) old_key_tuple->t_data + + offsetof(HeapTupleHeaderData, t_bits); + rdata[6].len = old_key_tuple->t_len + - offsetof(HeapTupleHeaderData, t_bits); + rdata[6].buffer = InvalidBuffer; + rdata[6].next = NULL; + + if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + else + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + } + } + /* If new tuple is the single and first tuple on page... */ if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber && PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { + XLogRecData *rcur = &rdata[2]; info |= XLOG_HEAP_INIT_PAGE; - rdata[2].buffer = rdata[3].buffer = InvalidBuffer; + while (rcur != NULL) + { + rcur->buffer = InvalidBuffer; + rcur = rcur->next; + } } recptr = XLogInsert(RM_HEAP_ID, info, rdata); @@ -6340,6 +6553,184 @@ log_newpage_buffer(Buffer buffer, bool page_std) } /* + * Perform XLogInsert of a XLOG_HEAP2_NEW_CID record + * + * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog + * tuples. + */ +static XLogRecPtr +log_heap_new_cid(Relation relation, HeapTuple tup) +{ + xl_heap_new_cid xlrec; + + XLogRecPtr recptr; + XLogRecData rdata[1]; + HeapTupleHeader hdr = tup->t_data; + + Assert(ItemPointerIsValid(&tup->t_self)); + Assert(tup->t_tableOid != InvalidOid); + + xlrec.top_xid = GetTopTransactionId(); + xlrec.target.node = relation->rd_node; + xlrec.target.tid = tup->t_self; + + /* + * If the tuple got inserted & deleted in the same TX we definitely have a + * combocid, set cmin and cmax. + */ + if (hdr->t_infomask & HEAP_COMBOCID) + { + Assert(!(hdr->t_infomask & HEAP_XMAX_INVALID)); + Assert(!(hdr->t_infomask & HEAP_XMIN_INVALID)); + xlrec.cmin = HeapTupleHeaderGetCmin(hdr); + xlrec.cmax = HeapTupleHeaderGetCmax(hdr); + xlrec.combocid = HeapTupleHeaderGetRawCommandId(hdr); + } + /* No combocid, so only cmin or cmax can be set by this TX */ + else + { + /* + * Tuple inserted. + * + * We need to check for LOCK ONLY because multixacts might be + * transferred to the new tuple in case of FOR KEY SHARE updates in + * which case there will be a xmax, although the tuple just got + * inserted. + */ + if (hdr->t_infomask & HEAP_XMAX_INVALID || + HEAP_XMAX_IS_LOCKED_ONLY(hdr->t_infomask)) + { + xlrec.cmin = HeapTupleHeaderGetRawCommandId(hdr); + xlrec.cmax = InvalidCommandId; + } + /* Tuple from a different tx updated or deleted. */ + else + { + xlrec.cmin = InvalidCommandId; + xlrec.cmax = HeapTupleHeaderGetRawCommandId(hdr); + + } + xlrec.combocid = InvalidCommandId; + } + + rdata[0].data = (char *) &xlrec; + rdata[0].len = SizeOfHeapNewCid; + rdata[0].buffer = InvalidBuffer; + rdata[0].next = NULL; + + recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID, rdata); + + return recptr; +} + +/* + * Build a heap tuple representing the configured REPLICA IDENTITY to represent + * the old tuple in a UPDATE or DELETE. + * + * Returns NULL if there's no need to log a identity or if there's no suitable + * key in the Relation relation. + */ +static HeapTuple +ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *copy) +{ + TupleDesc desc = RelationGetDescr(relation); + Relation idx_rel; + TupleDesc idx_desc; + char replident = relation->rd_rel->relreplident; + HeapTuple key_tuple = NULL; + bool copy_oid = false; + bool nulls[MaxHeapAttributeNumber]; + Datum values[MaxHeapAttributeNumber]; + int natt; + + *copy = false; + + if (!RelationIsLogicallyLogged(relation)) + return NULL; + + if (replident == REPLICA_IDENTITY_NOTHING) + return NULL; + + if (replident == REPLICA_IDENTITY_FULL) + { + /* + * When logging the entire old tuple, it very well could contain + * toasted columns. If so, force them to be inlined. + */ + if (HeapTupleHasExternal(tp)) + { + *copy = true; + tp = toast_flatten_tuple(tp, RelationGetDescr(relation)); + } + return tp; + } + + /* if the key hasn't changed and we're only logging the key, we're done */ + if (!key_changed) + return NULL; + + /* needs to already have been fetched? */ + if (relation->rd_indexvalid == 0) + RelationGetIndexList(relation); + + if (!OidIsValid(relation->rd_replidindex)) + { + elog(DEBUG4, "Could not find configured replica identity for table \"%s\"", + RelationGetRelationName(relation)); + return NULL; + } + + idx_rel = RelationIdGetRelation(relation->rd_replidindex); + idx_desc = RelationGetDescr(idx_rel); + + /* deform tuple, so we have fast access to columns */ + heap_deform_tuple(tp, desc, values, nulls); + + /* set all columns to NULL, regardless of whether they actually are */ + memset(nulls, 1, sizeof(nulls)); + + /* + * Now set all columns contained in the index to NOT NULL, they cannot + * currently be NULL. + */ + for (natt = 0; natt < idx_desc->natts; natt++) + { + int attno = idx_rel->rd_index->indkey.values[natt]; + + if (attno == ObjectIdAttributeNumber) + copy_oid = true; + else if (attno < 0) + elog(ERROR, "system column in index"); + else + nulls[attno - 1] = false; + } + + key_tuple = heap_form_tuple(desc, values, nulls); + *copy = true; + RelationClose(idx_rel); + + /* XXX: we could also do this unconditionally, the space is used anyway */ + if (copy_oid) + HeapTupleSetOid(key_tuple, HeapTupleGetOid(tp)); + + /* + * If the tuple, which by here only contains indexed columns, still has + * toasted columns, force them to be inlined. This is somewhat unlikely + * since there's limits on the size of indexed columns, so we don't + * duplicate toast_flatten_tuple()s functionality in the above loop over + * the indexed columns, even if it would be more efficient. + */ + if (HeapTupleHasExternal(key_tuple)) + { + HeapTuple oldtup = key_tuple; + key_tuple = toast_flatten_tuple(oldtup, RelationGetDescr(relation)); + heap_freetuple(oldtup); + } + + return key_tuple; +} + +/* * Handles CLEANUP_INFO */ static void @@ -6714,7 +7105,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); Buffer vmbuffer = InvalidBuffer; @@ -6763,7 +7154,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* Make sure there is no forward chain link in t_ctid */ @@ -6797,7 +7188,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); Buffer vmbuffer = InvalidBuffer; @@ -6868,7 +7259,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) PageSetLSN(page, lsn); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -6931,7 +7322,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->node); Buffer vmbuffer = InvalidBuffer; @@ -7014,7 +7405,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) PageSetLSN(page, lsn); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -7053,7 +7444,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) HeapTupleHeaderData hdr; char data[MaxHeapTupleSize]; } tbuf; - xl_heap_header xlhdr; + xl_heap_header_len xlhdr; int hsize; uint32 newlen; Size freespace; @@ -7062,7 +7453,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid); @@ -7140,7 +7531,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* @@ -7164,7 +7555,7 @@ newt:; * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->new_all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid); @@ -7222,13 +7613,13 @@ newsame:; if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "heap_update_redo: invalid max offset number"); - hsize = SizeOfHeapUpdate + SizeOfHeapHeader; + hsize = SizeOfHeapUpdate + SizeOfHeapHeaderLen; - newlen = record->xl_len - hsize; - Assert(newlen <= MaxHeapTupleSize); memcpy((char *) &xlhdr, (char *) xlrec + SizeOfHeapUpdate, - SizeOfHeapHeader); + SizeOfHeapHeaderLen); + newlen = xlhdr.t_len; + Assert(newlen <= MaxHeapTupleSize); htup = &tbuf.hdr; MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ @@ -7236,9 +7627,9 @@ newsame:; (char *) xlrec + hsize, newlen); newlen += offsetof(HeapTupleHeaderData, t_bits); - htup->t_infomask2 = xlhdr.t_infomask2; - htup->t_infomask = xlhdr.t_infomask; - htup->t_hoff = xlhdr.t_hoff; + htup->t_infomask2 = xlhdr.header.t_infomask2; + htup->t_infomask = xlhdr.header.t_infomask; + htup->t_hoff = xlhdr.header.t_hoff; HeapTupleHeaderSetXmin(htup, record->xl_xid); HeapTupleHeaderSetCmin(htup, FirstCommandId); @@ -7250,7 +7641,7 @@ newsame:; if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); - if (xlrec->new_all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ @@ -7501,6 +7892,12 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record) case XLOG_HEAP2_LOCK_UPDATED: heap_xlog_lock_updated(lsn, record); break; + case XLOG_HEAP2_NEW_CID: + /* + * Nothing to do on a real replay, only used during logical + * decoding. + */ + break; default: elog(PANIC, "heap2_redo: unknown op code %u", info); } |