diff options
Diffstat (limited to 'src/backend/access/heap/rewriteheap.c')
-rw-r--r-- | src/backend/access/heap/rewriteheap.c | 606 |
1 files changed, 605 insertions, 1 deletions
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index c34ab9865f8..239c7dad0c9 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -102,17 +102,34 @@ */ #include "postgres.h" +#include <sys/stat.h> +#include <unistd.h> + +#include "miscadmin.h" + #include "access/heapam.h" #include "access/heapam_xlog.h" #include "access/rewriteheap.h" #include "access/transam.h" #include "access/tuptoaster.h" +#include "access/xact.h" + +#include "catalog/catalog.h" + +#include "lib/ilist.h" + +#include "replication/logical.h" +#include "replication/slot.h" + #include "storage/bufmgr.h" +#include "storage/fd.h" #include "storage/smgr.h" + #include "utils/memutils.h" #include "utils/rel.h" #include "utils/tqual.h" +#include "storage/procarray.h" /* * State associated with a rewrite operation. This is opaque to the user @@ -120,21 +137,28 @@ */ typedef struct RewriteStateData { + Relation rs_old_rel; /* source heap */ Relation rs_new_rel; /* destination heap */ Page rs_buffer; /* page currently being built */ BlockNumber rs_blockno; /* block where page will go */ bool rs_buffer_valid; /* T if any tuples in buffer */ bool rs_use_wal; /* must we WAL-log inserts? */ + bool rs_logical_rewrite; /* do we need to do logical rewriting */ TransactionId rs_oldest_xmin; /* oldest xmin used by caller to * determine tuple visibility */ TransactionId rs_freeze_xid;/* Xid that will be used as freeze cutoff * point */ + TransactionId rs_logical_xmin; /* Xid that will be used as cutoff + * point for logical rewrites */ MultiXactId rs_cutoff_multi;/* MultiXactId that will be used as cutoff * point for multixacts */ MemoryContext rs_cxt; /* for hash tables and entries and tuples in * them */ + XLogRecPtr rs_begin_lsn; /* XLogInsertLsn when starting the rewrite */ HTAB *rs_unresolved_tups; /* unmatched A tuples */ HTAB *rs_old_new_tid_map; /* unmatched B tuples */ + HTAB *rs_logical_mappings; /* logical remapping files */ + uint32 rs_num_rewrite_mappings; /* # in memory mappings */ } RewriteStateData; /* @@ -169,14 +193,45 @@ typedef struct typedef OldToNewMappingData *OldToNewMapping; +/* + * In-Memory data for a xid that might need logical remapping entries + * to be logged. + */ +typedef struct RewriteMappingFile +{ + TransactionId xid; /* xid that might need to see the row */ + int vfd; /* fd of mappings file */ + off_t off; /* how far have we written yet */ + uint32 num_mappings; /* number of in-memory mappings */ + dlist_head mappings; /* list of in-memory mappings */ + char path[MAXPGPATH]; /* path, for error messages */ +} RewriteMappingFile; + +/* + * A single In-Memeory logical rewrite mapping, hanging of + * RewriteMappingFile->mappings. + */ +typedef struct RewriteMappingDataEntry +{ + LogicalRewriteMappingData map; /* map between old and new location of + * the tuple */ + dlist_node node; +} RewriteMappingDataEntry; + /* prototypes for internal functions */ static void raw_heap_insert(RewriteState state, HeapTuple tup); +/* internal logical remapping prototypes */ +static void logical_begin_heap_rewrite(RewriteState state); +static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple); +static void logical_end_heap_rewrite(RewriteState state); + /* * Begin a rewrite of a table * + * old_heap old, locked heap relation tuples will be read from * new_heap new, locked heap relation to insert tuples to * oldest_xmin xid used by the caller to determine which tuples are dead * freeze_xid xid before which tuples will be frozen @@ -187,7 +242,7 @@ static void raw_heap_insert(RewriteState state, HeapTuple tup); * to be used in subsequent calls to the other functions. */ RewriteState -begin_heap_rewrite(Relation new_heap, TransactionId oldest_xmin, +begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi, bool use_wal) { @@ -210,6 +265,7 @@ begin_heap_rewrite(Relation new_heap, TransactionId oldest_xmin, /* Create and fill in the state struct */ state = palloc0(sizeof(RewriteStateData)); + state->rs_old_rel = old_heap; state->rs_new_rel = new_heap; state->rs_buffer = (Page) palloc(BLCKSZ); /* new_heap needn't be empty, just locked */ @@ -244,6 +300,8 @@ begin_heap_rewrite(Relation new_heap, TransactionId oldest_xmin, MemoryContextSwitchTo(old_cxt); + logical_begin_heap_rewrite(state); + return state; } @@ -301,6 +359,8 @@ end_heap_rewrite(RewriteState state) if (RelationNeedsWAL(state->rs_new_rel)) heap_sync(state->rs_new_rel); + logical_end_heap_rewrite(state); + /* Deleting the context frees everything */ MemoryContextDelete(state->rs_cxt); } @@ -429,6 +489,8 @@ rewrite_heap_tuple(RewriteState state, raw_heap_insert(state, new_tuple); new_tid = new_tuple->t_self; + logical_rewrite_heap_tuple(state, old_tid, new_tuple); + /* * If the tuple is the updated version of a row, and the prior version * wouldn't be DEAD yet, then we need to either resolve the prior @@ -678,3 +740,545 @@ raw_heap_insert(RewriteState state, HeapTuple tup) if (heaptup != tup) heap_freetuple(heaptup); } + +/* ------------------------------------------------------------------------ + * Logical rewrite support + * + * When doing logical decoding - which relies on using cmin/cmax of catalog + * tuples, via xl_heap_new_cid records - heap rewrites have to log enough + * information to allow the decoding backend to updates its internal mapping + * of (relfilenode,ctid) => (cmin, cmax) to be correct for the rewritten heap. + * + * For that, every time we find a tuple that's been modified in a catalog + * relation within the xmin horizon of any decoding slot, we log a mapping + * from the old to the new location. + * + * To deal with rewrites that abort the filename of a mapping file contains + * the xid of the transaction performing the rewrite, which then can be + * checked before being read in. + * + * For efficiency we don't immediately spill every single map mapping for a + * row to disk but only do so in batches when we've collected several of them + * in memory or when end_heap_rewrite() has been called. + * + * Crash-Safety: This module diverts from the usual patterns of doing WAL + * since it cannot rely on checkpoint flushing out all buffers and thus + * waiting for exlusive locks on buffers. Usually the XLogInsert() covering + * buffer modifications is performed while the buffer(s) that are being + * modified are exlusively locked guaranteeing that both the WAL record and + * the modified heap are on either side of the checkpoint. But since the + * mapping files we log aren't in shared_buffers that interlock doesn't work. + * + * Instead we simply write the mapping files out to disk, *before* the + * XLogInsert() is performed. That guarantees that either the XLogInsert() is + * inserted after the checkpoint's redo pointer or that the checkpoint (via + * LogicalRewriteHeapCheckpoint()) has flushed the (partial) mapping file to + * disk. That leaves the tail end that has not yet been flushed open to + * corruption, which is solved by including the current offset in the + * xl_heap_rewrite_mapping records and truncating the mapping file to it + * during replay. Every time a rewrite is finished all generated mapping files + * are synced to disk. + * + * Note that if we were only concerned about crash safety we wouldn't have to + * deal with WAL logging at all - an fsync() at the end of a rewrite would be + * sufficient for crash safety. Any mapping that hasn't been safely flushed to + * disk has to be by an aborted (explicitly or via a crash) transaction and is + * ignored by virtue of the xid in it's name being subject to a + * TransactionDidCommit() check. But we want to support having standbys via + * physical replication, both for availability and to to do logical decoding + * there. + * ------------------------------------------------------------------------ + */ + +/* + * Do preparations for logging logical mappings during a rewrite if + * necessary. If we detect that we don't need to log anything we'll prevent + * any further action by the various logical rewrite functions. + */ +static void +logical_begin_heap_rewrite(RewriteState state) +{ + HASHCTL hash_ctl; + TransactionId logical_xmin; + + /* + * We only need to persist these mappings if the rewritten table can be + * accessed during logical decoding, if not, we can skip doing any + * additional work. + */ + state->rs_logical_rewrite = + RelationIsAccessibleInLogicalDecoding(state->rs_old_rel); + + if (!state->rs_logical_rewrite) + return; + + Assert(ReplicationSlotCtl != NULL); + + ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin); + + /* + * If there are no logical slots in progress we don't need to do anything, + * there cannot be any remappings for relevant rows yet. The relation's + * lock protects us against races. + */ + if (logical_xmin == InvalidTransactionId) + { + state->rs_logical_rewrite = false; + return; + } + + state->rs_logical_xmin = logical_xmin; + state->rs_begin_lsn = GetXLogInsertRecPtr(); + state->rs_num_rewrite_mappings = 0; + + memset(&hash_ctl, 0, sizeof(hash_ctl)); + hash_ctl.keysize = sizeof(TransactionId); + hash_ctl.entrysize = sizeof(RewriteMappingFile); + hash_ctl.hcxt = state->rs_cxt; + hash_ctl.hash = tag_hash; + + state->rs_logical_mappings = + hash_create("Logical rewrite mapping", + 128, /* arbitrary initial size */ + &hash_ctl, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); +} + +/* + * Flush all logical in-memory mappings to disk, but don't fsync them yet. + */ +static void +logical_heap_rewrite_flush_mappings(RewriteState state) +{ + HASH_SEQ_STATUS seq_status; + RewriteMappingFile *src; + dlist_mutable_iter iter; + + Assert(state->rs_logical_rewrite); + + /* no logical rewrite in progress, no need to iterate over mappings */ + if (state->rs_num_rewrite_mappings == 0) + return; + + elog(DEBUG1, "flushing %u logical rewrite mapping entries", + state->rs_num_rewrite_mappings); + + hash_seq_init(&seq_status, state->rs_logical_mappings); + while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL) + { + XLogRecData rdata[2]; + char *waldata; + char *waldata_start; + xl_heap_rewrite_mapping xlrec; + Oid dboid; + uint32 len; + int written; + + /* this file hasn't got any new mappings */ + if (src->num_mappings == 0) + continue; + + if (state->rs_old_rel->rd_rel->relisshared) + dboid = InvalidOid; + else + dboid = MyDatabaseId; + + xlrec.num_mappings = src->num_mappings; + xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel); + xlrec.mapped_xid = src->xid; + xlrec.mapped_db = dboid; + xlrec.offset = src->off; + xlrec.start_lsn = state->rs_begin_lsn; + + rdata[0].data = (char *) (&xlrec); + rdata[0].len = sizeof(xlrec); + rdata[0].buffer = InvalidBuffer; + rdata[0].next = &(rdata[1]); + + /* write all mappings consecutively */ + len = src->num_mappings * sizeof(LogicalRewriteMappingData); + waldata = palloc(len); + waldata_start = waldata; + + /* + * collect data we need to write out, but don't modify ondisk data yet + */ + dlist_foreach_modify(iter, &src->mappings) + { + RewriteMappingDataEntry *pmap; + + pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur); + + memcpy(waldata, &pmap->map, sizeof(pmap->map)); + waldata += sizeof(pmap->map); + + /* remove from the list and free */ + dlist_delete(&pmap->node); + pfree(pmap); + + /* update bookkeeping */ + state->rs_num_rewrite_mappings--; + src->num_mappings--; + } + + /* + * Note that we deviate from the usual WAL coding practices here, + * check the above "Logical rewrite support" comment for reasoning. + */ + written = FileWrite(src->vfd, waldata_start, len); + if (written != len) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path, + written, len))); + src->off += len; + + Assert(src->num_mappings == 0); + + rdata[1].data = waldata_start; + rdata[1].len = len; + rdata[1].buffer = InvalidBuffer; + rdata[1].next = NULL; + + /* write xlog record */ + XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE, rdata); + + } + Assert(state->rs_num_rewrite_mappings == 0); +} + +/* + * Logical remapping part of end_heap_rewrite(). + */ +static void +logical_end_heap_rewrite(RewriteState state) +{ + HASH_SEQ_STATUS seq_status; + RewriteMappingFile *src; + + /* done, no logical rewrite in progress */ + if (!state->rs_logical_rewrite) + return; + + /* writeout remaining in-memory entries */ + if (state->rs_num_rewrite_mappings > 0 ) + logical_heap_rewrite_flush_mappings(state); + + /* Iterate over all mappings we have written and fsync the files. */ + hash_seq_init(&seq_status, state->rs_logical_mappings); + while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL) + { + if(FileSync(src->vfd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", src->path))); + FileClose(src->vfd); + } + /* memory context cleanup will deal with the rest */ +} + +/* + * Log a single (old->new) mapping for 'xid'. + */ +static void +logical_rewrite_log_mapping(RewriteState state, TransactionId xid, + LogicalRewriteMappingData *map) +{ + RewriteMappingFile *src; + RewriteMappingDataEntry *pmap; + Oid relid; + bool found; + + relid = RelationGetRelid(state->rs_old_rel); + + /* look for existing mappings for this 'mapped' xid */ + src = hash_search(state->rs_logical_mappings, &xid, + HASH_ENTER, &found); + + /* + * We haven't yet had the need to map anything for this xid, create + * per-xid data structures. + */ + if (!found) + { + char path[MAXPGPATH]; + Oid dboid; + + if (state->rs_old_rel->rd_rel->relisshared) + dboid = InvalidOid; + else + dboid = MyDatabaseId; + + snprintf(path, MAXPGPATH, + "pg_llog/mappings/" LOGICAL_REWRITE_FORMAT, + dboid, relid, + (uint32) (state->rs_begin_lsn >> 32), + (uint32) state->rs_begin_lsn, + xid, GetCurrentTransactionId()); + + dlist_init(&src->mappings); + src->num_mappings = 0; + src->off = 0; + memcpy(src->path, path, sizeof(path)); + src->vfd = PathNameOpenFile(path, + O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (src->vfd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", path))); + } + + pmap = MemoryContextAlloc(state->rs_cxt, + sizeof(RewriteMappingDataEntry)); + memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData)); + dlist_push_tail(&src->mappings, &pmap->node); + src->num_mappings++; + state->rs_num_rewrite_mappings++; + + /* + * Write out buffer every time we've too many in-memory entries across all + * mapping files. + */ + if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */) + logical_heap_rewrite_flush_mappings(state); +} + +/* + * Perform logical remapping for a tuple that's mapped from old_tid to + * new_tuple->t_self by rewrite_heap_tuple() iff necessary for the tuple. + */ +static void +logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, + HeapTuple new_tuple) +{ + ItemPointerData new_tid = new_tuple->t_self; + TransactionId cutoff = state->rs_logical_xmin; + TransactionId xmin; + TransactionId xmax; + bool do_log_xmin = false; + bool do_log_xmax = false; + LogicalRewriteMappingData map; + + /* no logical rewrite in progress, we don't need to log anything */ + if (!state->rs_logical_rewrite) + return; + + xmin = HeapTupleHeaderGetXmin(new_tuple->t_data); + /* use *GetUpdateXid to correctly deal with multixacts */ + xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data); + + /* + * Log the mapping iff the tuple has been created recently. + */ + if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff)) + do_log_xmin = true; + + if (!TransactionIdIsNormal(xmax)) + { + /* + * no xmax is set, can't have any permanent ones, so this check is + * sufficient + */ + } + else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask)) + { + /* only locked, we don't care */ + } + else if (!TransactionIdPrecedes(xmax, cutoff)) + { + /* tuple has been deleted recently, log */ + do_log_xmax = true; + } + + /* if neither needs to be logged, we're done */ + if (!do_log_xmin && !do_log_xmax) + return; + + /* fill out mapping information */ + map.old_node = state->rs_old_rel->rd_node; + map.old_tid = old_tid; + map.new_node = state->rs_new_rel->rd_node; + map.new_tid = new_tid; + + /* --- + * Now persist the mapping for the individual xids that are affected. We + * need to log for both xmin and xmax if they aren't the same transaction + * since the mapping files are per "affected" xid. + * We don't muster all that much effort detecting whether xmin and xmax + * are actually the same transaction, we just check whether the xid is the + * same disregarding subtransactions. Logging too much is relatively + * harmless and we could never do the check fully since subtransaction + * data is thrown away during restarts. + * --- + */ + if (do_log_xmin) + logical_rewrite_log_mapping(state, xmin, &map); + /* separately log mapping for xmax unless it'd be redundant */ + if (do_log_xmax && !TransactionIdEquals(xmin, xmax)) + logical_rewrite_log_mapping(state, xmax, &map); +} + +/* + * Replay XLOG_HEAP2_REWRITE records + */ +void +heap_xlog_logical_rewrite(XLogRecPtr lsn, XLogRecord *r) +{ + char path[MAXPGPATH]; + int fd; + xl_heap_rewrite_mapping *xlrec; + uint32 len; + char *data; + + xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r); + + snprintf(path, MAXPGPATH, + "pg_llog/mappings/" LOGICAL_REWRITE_FORMAT, + xlrec->mapped_db, xlrec->mapped_rel, + (uint32) (xlrec->start_lsn >> 32), + (uint32) xlrec->start_lsn, + xlrec->mapped_xid, r->xl_xid); + + fd = OpenTransientFile(path, + O_CREAT | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", path))); + /* + * Truncate all data that's not guaranteed to have been safely fsynced (by + * previous record or by the last checkpoint). + */ + if (ftruncate(fd, xlrec->offset) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not truncate file \"%s\" to %u: %m", + path, (uint32) xlrec->offset))); + + /* now seek to the position we want to write our data to */ + if (lseek(fd, xlrec->offset, SEEK_SET) != xlrec->offset) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek to the end of file \"%s\": %m", + path))); + + data = XLogRecGetData(r) + sizeof(*xlrec); + + len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData); + + /* write out tail end of mapping file (again) */ + if (write(fd, data, len) != len) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", path))); + /* + * Now fsync all previously written data. We could improve things and only + * do this for the last write to a file, but the required bookkeeping + * doesn't seem worth the trouble. + */ + if (pg_fsync(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", path))); + + CloseTransientFile(fd); +} + +/* --- + * Perform a checkpoint for logical rewrite mappings + * + * This serves two tasks: + * 1) Remove all mappings not needed anymore based on the logical restart LSN + * 2) Flush all remaining mappings to disk, so that replay after a checkpoint + * only has to deal with the parts of a mapping that have been written out + * after the checkpoint started. + * --- + */ +void +CheckPointLogicalRewriteHeap(void) +{ + XLogRecPtr cutoff; + XLogRecPtr redo; + DIR *mappings_dir; + struct dirent *mapping_de; + char path[MAXPGPATH]; + + /* + * We start of with a minimum of the last redo pointer. No new decoding + * slot will start before that, so that's a safe upper bound for removal. + */ + redo = GetRedoRecPtr(); + + /* now check for the restart ptrs from existing slots */ + cutoff = ReplicationSlotsComputeLogicalRestartLSN(); + + /* don't start earlier than the restart lsn */ + if (cutoff != InvalidXLogRecPtr && redo < cutoff) + cutoff = redo; + + mappings_dir = AllocateDir("pg_llog/mappings"); + while ((mapping_de = ReadDir(mappings_dir, "pg_llog/mappings")) != NULL) + { + struct stat statbuf; + Oid dboid; + Oid relid; + XLogRecPtr lsn; + TransactionId rewrite_xid; + TransactionId create_xid; + uint32 hi, lo; + + if (strcmp(mapping_de->d_name, ".") == 0 || + strcmp(mapping_de->d_name, "..") == 0) + continue; + + snprintf(path, MAXPGPATH, "pg_llog/mappings/%s", mapping_de->d_name); + if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode)) + continue; + + /* Skip over files that cannot be ours. */ + if (strncmp(mapping_de->d_name, "map-", 4) != 0) + continue; + + if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT, + &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6) + elog(ERROR,"could not parse filename \"%s\"", mapping_de->d_name); + + lsn = ((uint64) hi) << 32 | lo; + + if (lsn < cutoff || cutoff == InvalidXLogRecPtr) + { + elog(DEBUG1, "removing logical rewrite file \"%s\"", path); + if (unlink(path) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not unlink file \"%s\": %m", path))); + } + else + { + int fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0); + + /* + * The file cannot vanish due to concurrency since this function + * is the only one removing logical mappings and it's run while + * CheckpointLock is held exclusively. + */ + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + /* + * We could try to avoid fsyncing files that either haven't + * changed or have only been created since the checkpoint's start, + * but it's currently not deemed worth the effort. + */ + else if (pg_fsync(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", path))); + CloseTransientFile(fd); + } + } + FreeDir(mappings_dir); +} |