diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/repl_gram.y | 6 | ||||
-rw-r--r-- | src/backend/replication/repl_scanner.l | 1 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 18 |
3 files changed, 22 insertions, 3 deletions
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index eb283a86327..e1e8ec29cc4 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void); %token K_SLOT %token K_RESERVE_WAL %token K_TEMPORARY +%token K_TWO_PHASE %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT @@ -283,6 +284,11 @@ create_slot_opt: $$ = makeDefElem("reserve_wal", (Node *)makeInteger(true), -1); } + | K_TWO_PHASE + { + $$ = makeDefElem("two_phase", + (Node *)makeInteger(true), -1); + } ; /* DROP_REPLICATION_SLOT slot */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index dcc3c3fc515..c038a636c38 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -103,6 +103,7 @@ RESERVE_WAL { return K_RESERVE_WAL; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } TEMPORARY { return K_TEMPORARY; } +TWO_PHASE { return K_TWO_PHASE; } EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; } NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; } USE_SNAPSHOT { return K_USE_SNAPSHOT; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 32245363561..92c755f346e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -863,11 +863,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, - CRSSnapshotAction *snapshot_action) + CRSSnapshotAction *snapshot_action, + bool *two_phase) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; + bool two_phase_given = false; /* Parse options */ foreach(lc, cmd->options) @@ -905,6 +907,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, reserve_wal_given = true; *reserve_wal = true; } + else if (strcmp(defel->defname, "two_phase") == 0) + { + if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + two_phase_given = true; + *two_phase = true; + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -920,6 +931,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) char xloc[MAXFNAMELEN]; char *slot_name; bool reserve_wal = false; + bool two_phase = false; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; @@ -929,7 +941,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); - parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action); + parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); /* setup state for WalSndSegmentOpen */ sendTimeLineIsHistoric = false; @@ -954,7 +966,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - false); + two_phase); } if (cmd->kind == REPLICATION_KIND_LOGICAL) |