summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/snapbuild.c50
1 files changed, 31 insertions, 19 deletions
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 1ff2c12240d..bf72ad45ec7 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -969,28 +969,40 @@ SnapBuildPurgeOlderTxn(SnapBuild *builder)
pfree(workspace);
/*
- * Either all the xacts got purged or none. It is only possible to
- * partially remove the xids from this array if one or more of the xids
- * are still running but not all. That can happen if we start decoding
- * from a point (LSN where the snapshot state became consistent) where all
- * the xacts in this were running and then at least one of those got
- * committed and a few are still running. We will never start from such a
- * point because we won't move the slot's restart_lsn past the point where
- * the oldest running transaction's restart_decoding_lsn is.
+ * Purge xids in ->catchange as well. The purged array must also be sorted
+ * in xidComparator order.
*/
- if (builder->catchange.xcnt == 0 ||
- TransactionIdFollowsOrEquals(builder->catchange.xip[0],
- builder->xmin))
- return;
+ if (builder->catchange.xcnt > 0)
+ {
+ /*
+ * Since catchange.xip is sorted, we find the lower bound of xids that
+ * are still interesting.
+ */
+ for (off = 0; off < builder->catchange.xcnt; off++)
+ {
+ if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
+ builder->xmin))
+ break;
+ }
- Assert(TransactionIdFollows(builder->xmin,
- builder->catchange.xip[builder->catchange.xcnt - 1]));
- pfree(builder->catchange.xip);
- builder->catchange.xip = NULL;
- builder->catchange.xcnt = 0;
+ surviving_xids = builder->catchange.xcnt - off;
- elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u",
- builder->xmin);
+ if (surviving_xids > 0)
+ {
+ memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
+ surviving_xids * sizeof(TransactionId));
+ }
+ else
+ {
+ pfree(builder->catchange.xip);
+ builder->catchange.xip = NULL;
+ }
+
+ elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
+ (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
+ builder->xmin, builder->xmax);
+ builder->catchange.xcnt = surviving_xids;
+ }
}
/*