diff options
author | Amit Kapila <akapila@postgresql.org> | 2021-06-30 08:45:47 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2021-06-30 08:45:47 +0530 |
commit | cda03cfed6b8bd5f64567bccbc9578fba035691e (patch) | |
tree | 5bfc6c435b73b5c1772d0269e5ddd4934dabd356 /src/backend/replication/walsender.c | |
parent | 17707c059cf4bf610e3b1833df5ca17cf223fe5f (diff) |
Allow enabling two-phase option via replication protocol.
Extend the replication command CREATE_REPLICATION_SLOT to support the
TWO_PHASE option. This will allow decoding commands like PREPARE
TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED for slots created with
this option. The decoding of the transaction happens at prepare command.
This patch also adds support of two-phase in pg_recvlogical via a new
option --two-phase.
This option will also be used by future patches that allow streaming of
transactions at prepare time for built-in logical replication. With this,
the out-of-core logical replication solutions can enable replication of
two-phase transactions via replication protocol.
Author: Ajin Cherian
Reviewed-By: Jeff Davis, Vignesh C, Amit Kapila
Discussion:
https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
https://postgr.es/m/64b9f783c6e125f18f88fbc0c0234e34e71d8639.camel@j-davis.com
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 18 |
1 files changed, 15 insertions, 3 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 32245363561..92c755f346e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -863,11 +863,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, - CRSSnapshotAction *snapshot_action) + CRSSnapshotAction *snapshot_action, + bool *two_phase) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; + bool two_phase_given = false; /* Parse options */ foreach(lc, cmd->options) @@ -905,6 +907,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, reserve_wal_given = true; *reserve_wal = true; } + else if (strcmp(defel->defname, "two_phase") == 0) + { + if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + two_phase_given = true; + *two_phase = true; + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -920,6 +931,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) char xloc[MAXFNAMELEN]; char *slot_name; bool reserve_wal = false; + bool two_phase = false; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; @@ -929,7 +941,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); - parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action); + parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); /* setup state for WalSndSegmentOpen */ sendTimeLineIsHistoric = false; @@ -954,7 +966,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - false); + two_phase); } if (cmd->kind == REPLICATION_KIND_LOGICAL) |