summaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xact.c
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2015-04-29 19:30:53 +0200
committerAndres Freund <andres@anarazel.de>2015-04-29 19:30:53 +0200
commit5aa2350426c4fdb3d04568b65aadac397012bbcb (patch)
tree954c3123dc58905bbda6407565383c65850204e7 /src/backend/access/transam/xact.c
parentc6e96a2f986e4dad72c14b14d4cc17d02b2a6aad (diff)
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
Diffstat (limited to 'src/backend/access/transam/xact.c')
-rw-r--r--src/backend/access/transam/xact.c76
1 files changed, 59 insertions, 17 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 1495bb499f5..511bcbbc519 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -40,8 +40,10 @@
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/logical.h"
#include "replication/walsender.h"
#include "replication/syncrep.h"
+#include "replication/origin.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@@ -1073,21 +1075,27 @@ RecordTransactionCommit(void)
nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit,
InvalidTransactionId /* plain commit */);
- }
- /*
- * We only need to log the commit timestamp separately if the node
- * identifier is a valid value; the commit record above already contains
- * the timestamp info otherwise, and will be used to load it.
- */
- if (markXidCommitted)
- {
- CommitTsNodeId node_id;
+ /*
+ * Record plain commit ts if not replaying remote actions, or if no
+ * timestamp is configured.
+ */
+ if (replorigin_sesssion_origin == InvalidRepOriginId ||
+ replorigin_sesssion_origin == DoNotReplicateId ||
+ replorigin_sesssion_origin_timestamp == 0)
+ replorigin_sesssion_origin_timestamp = xactStopTimestamp;
+ else
+ replorigin_session_advance(replorigin_sesssion_origin_lsn,
+ XactLastRecEnd);
- node_id = CommitTsGetDefaultNodeId();
+ /*
+ * We don't need to WAL log origin or timestamp here, the commit
+ * record contains all the necessary information and will redo the SET
+ * action during replay.
+ */
TransactionTreeSetCommitTsData(xid, nchildren, children,
- xactStopTimestamp,
- node_id, node_id != InvalidCommitTsNodeId);
+ replorigin_sesssion_origin_timestamp,
+ replorigin_sesssion_origin, false);
}
/*
@@ -1176,9 +1184,11 @@ RecordTransactionCommit(void)
if (wrote_xlog && markXidCommitted)
SyncRepWaitForLSN(XactLastRecEnd);
+ /* remember end of last commit record */
+ XactLastCommitEnd = XactLastRecEnd;
+
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
-
cleanup:
/* Clean up local data */
if (rels)
@@ -4611,6 +4621,7 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xact_relfilenodes xl_relfilenodes;
xl_xact_invals xl_invals;
xl_xact_twophase xl_twophase;
+ xl_xact_origin xl_origin;
uint8 info;
@@ -4668,6 +4679,15 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_twophase.xid = twophase_xid;
}
+ /* dump transaction origin information */
+ if (replorigin_sesssion_origin != InvalidRepOriginId)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+
+ xl_origin.origin_lsn = replorigin_sesssion_origin_lsn;
+ xl_origin.origin_timestamp = replorigin_sesssion_origin_timestamp;
+ }
+
if (xl_xinfo.xinfo != 0)
info |= XLOG_XACT_HAS_INFO;
@@ -4709,6 +4729,12 @@ XactLogCommitRecord(TimestampTz commit_time,
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+ XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+
+ /* we allow filtering by xacts */
+ XLogIncludeOrigin();
+
return XLogInsert(RM_XACT_ID, info);
}
@@ -4806,10 +4832,12 @@ XactLogAbortRecord(TimestampTz abort_time,
static void
xact_redo_commit(xl_xact_parsed_commit *parsed,
TransactionId xid,
- XLogRecPtr lsn)
+ XLogRecPtr lsn,
+ RepOriginId origin_id)
{
TransactionId max_xid;
int i;
+ TimestampTz commit_time;
max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
@@ -4829,9 +4857,16 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
LWLockRelease(XidGenLock);
}
+ Assert(!!(parsed->xinfo & XACT_XINFO_HAS_ORIGIN) == (origin_id != InvalidRepOriginId));
+
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ commit_time = parsed->origin_timestamp;
+ else
+ commit_time = parsed->xact_time;
+
/* Set the transaction commit timestamp and metadata */
TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts,
- parsed->xact_time, InvalidCommitTsNodeId,
+ commit_time, origin_id,
false);
if (standbyState == STANDBY_DISABLED)
@@ -4892,6 +4927,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
StandbyReleaseLockTree(xid, 0, NULL);
}
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ /* recover apply progress */
+ replorigin_advance(origin_id, parsed->origin_lsn, lsn,
+ false /* backward */, false /* WAL */);
+ }
+
/* Make sure files supposed to be dropped are dropped */
if (parsed->nrels > 0)
{
@@ -5047,13 +5089,13 @@ xact_redo(XLogReaderState *record)
{
Assert(!TransactionIdIsValid(parsed.twophase_xid));
xact_redo_commit(&parsed, XLogRecGetXid(record),
- record->EndRecPtr);
+ record->EndRecPtr, XLogRecGetOrigin(record));
}
else
{
Assert(TransactionIdIsValid(parsed.twophase_xid));
xact_redo_commit(&parsed, parsed.twophase_xid,
- record->EndRecPtr);
+ record->EndRecPtr, XLogRecGetOrigin(record));
RemoveTwoPhaseFile(parsed.twophase_xid, false);
}
}