diff options
Diffstat (limited to 'src/backend/replication/logical/slotsync.c')
| -rw-r--r-- | src/backend/replication/logical/slotsync.c | 163 | 
1 files changed, 119 insertions, 44 deletions
| diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 97440cb6bf0..bda0de52db9 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -162,22 +162,78 @@ static void update_synced_slots_inactive_since(void);   * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is   * modified, and decoding from the corresponding LSN's can reach a   * consistent snapshot. + * + * *remote_slot_precedes will be true if the remote slot's LSN or xmin + * precedes locally reserved position.   */  static bool  update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, -						 bool *found_consistent_snapshot) +						 bool *found_consistent_snapshot, +						 bool *remote_slot_precedes)  {  	ReplicationSlot *slot = MyReplicationSlot; -	bool		slot_updated = false; +	bool		updated_xmin_or_lsn = false; +	bool		updated_config = false;  	Assert(slot->data.invalidated == RS_INVAL_NONE);  	if (found_consistent_snapshot)  		*found_consistent_snapshot = false; -	if (remote_slot->confirmed_lsn != slot->data.confirmed_flush || -		remote_slot->restart_lsn != slot->data.restart_lsn || -		remote_slot->catalog_xmin != slot->data.catalog_xmin) +	if (remote_slot_precedes) +		*remote_slot_precedes = false; + +	/* +	 * Don't overwrite if we already have a newer catalog_xmin and +	 * restart_lsn. +	 */ +	if (remote_slot->restart_lsn < slot->data.restart_lsn || +		TransactionIdPrecedes(remote_slot->catalog_xmin, +							  slot->data.catalog_xmin)) +	{ +		/* +		 * This can happen in following situations: +		 * +		 * If the slot is temporary, it means either the initial WAL location +		 * reserved for the local slot is ahead of the remote slot's +		 * restart_lsn or the initial xmin_horizon computed for the local slot +		 * is ahead of the remote slot. +		 * +		 * If the slot is persistent, restart_lsn of the synced slot could +		 * still be ahead of the remote slot. Since we use slot advance +		 * functionality to keep snapbuild/slot updated, it is possible that +		 * the restart_lsn is advanced to a later position than it has on the +		 * primary. This can happen when slot advancing machinery finds +		 * running xacts record after reaching the consistent state at a later +		 * point than the primary where it serializes the snapshot and updates +		 * the restart_lsn. +		 * +		 * We LOG the message if the slot is temporary as it can help the user +		 * to understand why the slot is not sync-ready. In the case of a +		 * persistent slot, it would be a more common case and won't directly +		 * impact the users, so we used DEBUG1 level to log the message. +		 */ +		ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1, +				errmsg("could not sync slot \"%s\" as remote slot precedes local slot", +					   remote_slot->name), +				errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.", +						  LSN_FORMAT_ARGS(remote_slot->restart_lsn), +						  remote_slot->catalog_xmin, +						  LSN_FORMAT_ARGS(slot->data.restart_lsn), +						  slot->data.catalog_xmin)); + +		if (remote_slot_precedes) +			*remote_slot_precedes = true; +	} + +	/* +	 * Attempt to sync LSNs and xmins only if remote slot is ahead of local +	 * slot. +	 */ +	else if (remote_slot->confirmed_lsn > slot->data.confirmed_flush || +			 remote_slot->restart_lsn > slot->data.restart_lsn || +			 TransactionIdFollows(remote_slot->catalog_xmin, +								  slot->data.catalog_xmin))  	{  		/*  		 * We can't directly copy the remote slot's LSN or xmin unless there @@ -198,7 +254,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,  			slot->data.restart_lsn = remote_slot->restart_lsn;  			slot->data.confirmed_flush = remote_slot->confirmed_lsn;  			slot->data.catalog_xmin = remote_slot->catalog_xmin; -			slot->effective_catalog_xmin = remote_slot->catalog_xmin;  			SpinLockRelease(&slot->mutex);  			if (found_consistent_snapshot) @@ -208,12 +263,18 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,  		{  			LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,  												found_consistent_snapshot); -		} -		ReplicationSlotsComputeRequiredXmin(false); -		ReplicationSlotsComputeRequiredLSN(); +			/* Sanity check */ +			if (slot->data.confirmed_flush != remote_slot->confirmed_lsn) +				ereport(ERROR, +						errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot", +										remote_slot->name), +						errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.", +										   LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), +										   LSN_FORMAT_ARGS(slot->data.confirmed_flush))); +		} -		slot_updated = true; +		updated_xmin_or_lsn = true;  	}  	if (remote_dbid != slot->data.database || @@ -233,10 +294,37 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,  		slot->data.failover = remote_slot->failover;  		SpinLockRelease(&slot->mutex); -		slot_updated = true; +		updated_config = true;  	} -	return slot_updated; +	/* +	 * We have to write the changed xmin to disk *before* we change the +	 * in-memory value, otherwise after a crash we wouldn't know that some +	 * catalog tuples might have been removed already. +	 */ +	if (updated_config || updated_xmin_or_lsn) +	{ +		ReplicationSlotMarkDirty(); +		ReplicationSlotSave(); +	} + +	/* +	 * Now the new xmin is safely on disk, we can let the global value +	 * advance. We do not take ProcArrayLock or similar since we only advance +	 * xmin here and there's not much harm done by a concurrent computation +	 * missing that. +	 */ +	if (updated_xmin_or_lsn) +	{ +		SpinLockAcquire(&slot->mutex); +		slot->effective_catalog_xmin = remote_slot->catalog_xmin; +		SpinLockRelease(&slot->mutex); + +		ReplicationSlotsComputeRequiredXmin(false); +		ReplicationSlotsComputeRequiredLSN(); +	} + +	return updated_config || updated_xmin_or_lsn;  }  /* @@ -460,14 +548,17 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)  {  	ReplicationSlot *slot = MyReplicationSlot;  	bool		found_consistent_snapshot = false; +	bool		remote_slot_precedes = false; + +	(void) update_local_synced_slot(remote_slot, remote_dbid, +									&found_consistent_snapshot, +									&remote_slot_precedes);  	/*  	 * Check if the primary server has caught up. Refer to the comment atop  	 * the file for details on this check.  	 */ -	if (remote_slot->restart_lsn < slot->data.restart_lsn || -		TransactionIdPrecedes(remote_slot->catalog_xmin, -							  slot->data.catalog_xmin)) +	if (remote_slot_precedes)  	{  		/*  		 * The remote slot didn't catch up to locally reserved position. @@ -476,23 +567,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)  		 * current location when recreating the slot in the next cycle. It may  		 * take more time to create such a slot. Therefore, we keep this slot  		 * and attempt the synchronization in the next cycle. -		 * -		 * XXX should this be changed to elog(DEBUG1) perhaps?  		 */ -		ereport(LOG, -				errmsg("could not sync slot \"%s\" as remote slot precedes local slot", -					   remote_slot->name), -				errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.", -						  LSN_FORMAT_ARGS(remote_slot->restart_lsn), -						  remote_slot->catalog_xmin, -						  LSN_FORMAT_ARGS(slot->data.restart_lsn), -						  slot->data.catalog_xmin));  		return false;  	} -	(void) update_local_synced_slot(remote_slot, remote_dbid, -									&found_consistent_snapshot); -  	/*  	 * Don't persist the slot if it cannot reach the consistent point from the  	 * restart_lsn. See comments atop this file. @@ -633,23 +711,20 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)  			/*  			 * Sanity check: As long as the invalidations are handled  			 * appropriately as above, this should never happen. +			 * +			 * We don't need to check restart_lsn here. See the comments in +			 * update_local_synced_slot() for details.  			 */ -			if (remote_slot->restart_lsn < slot->data.restart_lsn) -				elog(ERROR, -					 "cannot synchronize local slot \"%s\" LSN(%X/%X)" -					 " to remote slot's LSN(%X/%X) as synchronization" -					 " would move it backwards", remote_slot->name, -					 LSN_FORMAT_ARGS(slot->data.restart_lsn), -					 LSN_FORMAT_ARGS(remote_slot->restart_lsn)); - -			/* Make sure the slot changes persist across server restart */ -			if (update_local_synced_slot(remote_slot, remote_dbid, NULL)) -			{ -				ReplicationSlotMarkDirty(); -				ReplicationSlotSave(); - -				slot_updated = true; -			} +			if (remote_slot->confirmed_lsn < slot->data.confirmed_flush) +				ereport(ERROR, +						errmsg_internal("cannot synchronize local slot \"%s\"", +										remote_slot->name), +						errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).", +										   LSN_FORMAT_ARGS(slot->data.confirmed_flush), +										   LSN_FORMAT_ARGS(remote_slot->confirmed_lsn))); + +			slot_updated = update_local_synced_slot(remote_slot, remote_dbid, +													NULL, NULL);  		}  	}  	/* Otherwise create the slot first. */ | 
