diff options
Diffstat (limited to 'src/backend/replication/logical/snapbuild.c')
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 67 |
1 files changed, 53 insertions, 14 deletions
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index cc1f2a9f154..0b303f9a235 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -290,7 +290,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap); static void SnapBuildSnapIncRefcount(Snapshot snap); -static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn); +static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid); /* xlog reading helper functions for SnapBuildProcessRunningXacts */ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); @@ -852,15 +852,15 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, } /* - * Add a new Snapshot to all transactions we're decoding that currently are - * in-progress so they can see new catalog contents made by the transaction - * that just committed. This is necessary because those in-progress - * transactions will use the new catalog's contents from here on (at the very - * least everything they do needs to be compatible with newer catalog - * contents). + * Add a new Snapshot and invalidation messages to all transactions we're + * decoding that currently are in-progress so they can see new catalog contents + * made by the transaction that just committed. This is necessary because those + * in-progress transactions will use the new catalog's contents from here on + * (at the very least everything they do needs to be compatible with newer + * catalog contents). */ static void -SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) +SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid) { dlist_iter txn_i; ReorderBufferTXN *txn; @@ -868,7 +868,8 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) /* * Iterate through all toplevel transactions. This can include * subtransactions which we just don't yet know to be that, but that's - * fine, they will just get an unnecessary snapshot queued. + * fine, they will just get an unnecessary snapshot and invalidations + * queued. */ dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn) { @@ -881,6 +882,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) * transaction which in turn implies we don't yet need a snapshot at * all. We'll add a snapshot when the first change gets queued. * + * Similarly, we don't need to add invalidations to a transaction whose + * base snapshot is not yet set. Once a base snapshot is built, it will + * include the xids of committed transactions that have modified the + * catalog, thus reflecting the new catalog contents. The existing + * catalog cache will have already been invalidated after processing + * the invalidations in the transaction that modified catalogs, + * ensuring that a fresh cache is constructed during decoding. + * * NB: This works correctly even for subtransactions because * ReorderBufferAssignChild() takes care to transfer the base snapshot * to the top-level transaction, and while iterating the changequeue @@ -890,13 +899,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) continue; /* - * We don't need to add snapshot to prepared transactions as they - * should not see the new catalog contents. + * We don't need to add snapshot or invalidations to prepared + * transactions as they should not see the new catalog contents. */ if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn)) continue; - elog(DEBUG2, "adding a new snapshot to %u at %X/%X", + elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X", txn->xid, LSN_FORMAT_ARGS(lsn)); /* @@ -906,6 +915,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) SnapBuildSnapIncRefcount(builder->snapshot); ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn, builder->snapshot); + + /* + * Add invalidation messages to the reorder buffer of in-progress + * transactions except the current committed transaction, for which we + * will execute invalidations at the end. + * + * It is required, otherwise, we will end up using the stale catcache + * contents built by the current transaction even after its decoding, + * which should have been invalidated due to concurrent catalog + * changing transaction. + */ + if (txn->xid != xid) + { + uint32 ninvalidations; + SharedInvalidationMessage *msgs = NULL; + + ninvalidations = ReorderBufferGetInvalidations(builder->reorder, + xid, &msgs); + + if (ninvalidations > 0) + { + Assert(msgs != NULL); + + ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn, + ninvalidations, msgs); + } + } } } @@ -1184,8 +1220,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, /* refcount of the snapshot builder for the new snapshot */ SnapBuildSnapIncRefcount(builder->snapshot); - /* add a new catalog snapshot to all currently running transactions */ - SnapBuildDistributeNewCatalogSnapshot(builder, lsn); + /* + * Add a new catalog snapshot and invalidations messages to all + * currently running transactions. + */ + SnapBuildDistributeSnapshotAndInval(builder, lsn, xid); } } |