summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/repl_gram.y6
-rw-r--r--src/backend/replication/repl_scanner.l1
-rw-r--r--src/backend/replication/walsender.c18
3 files changed, 22 insertions, 3 deletions
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index eb283a86327..e1e8ec29cc4 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void);
%token K_SLOT
%token K_RESERVE_WAL
%token K_TEMPORARY
+%token K_TWO_PHASE
%token K_EXPORT_SNAPSHOT
%token K_NOEXPORT_SNAPSHOT
%token K_USE_SNAPSHOT
@@ -283,6 +284,11 @@ create_slot_opt:
$$ = makeDefElem("reserve_wal",
(Node *)makeInteger(true), -1);
}
+ | K_TWO_PHASE
+ {
+ $$ = makeDefElem("two_phase",
+ (Node *)makeInteger(true), -1);
+ }
;
/* DROP_REPLICATION_SLOT slot */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index dcc3c3fc515..c038a636c38 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -103,6 +103,7 @@ RESERVE_WAL { return K_RESERVE_WAL; }
LOGICAL { return K_LOGICAL; }
SLOT { return K_SLOT; }
TEMPORARY { return K_TEMPORARY; }
+TWO_PHASE { return K_TWO_PHASE; }
EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
USE_SNAPSHOT { return K_USE_SNAPSHOT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 32245363561..92c755f346e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -863,11 +863,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
bool *reserve_wal,
- CRSSnapshotAction *snapshot_action)
+ CRSSnapshotAction *snapshot_action,
+ bool *two_phase)
{
ListCell *lc;
bool snapshot_action_given = false;
bool reserve_wal_given = false;
+ bool two_phase_given = false;
/* Parse options */
foreach(lc, cmd->options)
@@ -905,6 +907,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
reserve_wal_given = true;
*reserve_wal = true;
}
+ else if (strcmp(defel->defname, "two_phase") == 0)
+ {
+ if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ two_phase_given = true;
+ *two_phase = true;
+ }
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
@@ -920,6 +931,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
char xloc[MAXFNAMELEN];
char *slot_name;
bool reserve_wal = false;
+ bool two_phase = false;
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
DestReceiver *dest;
TupOutputState *tstate;
@@ -929,7 +941,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Assert(!MyReplicationSlot);
- parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
+ parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
/* setup state for WalSndSegmentOpen */
sendTimeLineIsHistoric = false;
@@ -954,7 +966,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
- false);
+ two_phase);
}
if (cmd->kind == REPLICATION_KIND_LOGICAL)