diff options
| author | Amit Kapila <akapila@postgresql.org> | 2022-03-08 08:08:32 +0530 | 
|---|---|---|
| committer | Amit Kapila <akapila@postgresql.org> | 2022-03-08 08:08:32 +0530 | 
| commit | d3e8368c4b6e5110d8b3d12859850aeaae08dffb (patch) | |
| tree | 2c5ee9eb72722f2fe5950ec8a4b671dbc454add3 /src/backend/replication | |
| parent | 4228cabb72bb57e1df4c9d92613f1fcd4baadd5a (diff) | |
Add the additional information to the logical replication worker errcontext.
This commits adds both the finish LSN (commit_lsn in case transaction got
committed, prepare_lsn in case of a prepared transaction, etc.) and
replication origin name to the existing error context message.
This will help users in specifying the origin name and transaction finish
LSN to pg_replication_origin_advance() SQL function to skip a particular
transaction.
Author: Masahiko Sawada
Reviewed-by: Takamichi Osumi, Euler Taveira, and Amit Kapila
Discussion: https://postgr.es/m/CAD21AoBarBf2oTF71ig2g_o=3Z_Dt6_sOpMQma1kFgbnA5OZ_w@mail.gmail.com
Diffstat (limited to 'src/backend/replication')
| -rw-r--r-- | src/backend/replication/logical/worker.c | 75 | 
1 files changed, 56 insertions, 19 deletions
| diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 92aa794706d..8653e1d8402 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg  	/* Remote node information */  	int			remote_attnum;	/* -1 if invalid */  	TransactionId remote_xid; +	XLogRecPtr	finish_lsn; +	char	   *origin_name;  } ApplyErrorCallbackArg;  static ApplyErrorCallbackArg apply_error_callback_arg = @@ -234,6 +236,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =  	.rel = NULL,  	.remote_attnum = -1,  	.remote_xid = InvalidTransactionId, +	.finish_lsn = InvalidXLogRecPtr, +	.origin_name = NULL,  };  static MemoryContext ApplyMessageContext = NULL; @@ -332,7 +336,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);  /* Functions for apply error callback */  static void apply_error_callback(void *arg); -static inline void set_apply_error_context_xact(TransactionId xid); +static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);  static inline void reset_apply_error_context_info(void);  /* @@ -785,7 +789,7 @@ apply_handle_begin(StringInfo s)  	LogicalRepBeginData begin_data;  	logicalrep_read_begin(s, &begin_data); -	set_apply_error_context_xact(begin_data.xid); +	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);  	remote_final_lsn = begin_data.final_lsn; @@ -837,7 +841,7 @@ apply_handle_begin_prepare(StringInfo s)  				 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));  	logicalrep_read_begin_prepare(s, &begin_data); -	set_apply_error_context_xact(begin_data.xid); +	set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);  	remote_final_lsn = begin_data.prepare_lsn; @@ -936,7 +940,7 @@ apply_handle_commit_prepared(StringInfo s)  	char		gid[GIDSIZE];  	logicalrep_read_commit_prepared(s, &prepare_data); -	set_apply_error_context_xact(prepare_data.xid); +	set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);  	/* Compute GID for two_phase transactions. */  	TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, @@ -977,7 +981,7 @@ apply_handle_rollback_prepared(StringInfo s)  	char		gid[GIDSIZE];  	logicalrep_read_rollback_prepared(s, &rollback_data); -	set_apply_error_context_xact(rollback_data.xid); +	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);  	/* Compute GID for two_phase transactions. */  	TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, @@ -1042,7 +1046,7 @@ apply_handle_stream_prepare(StringInfo s)  				 errmsg_internal("tablesync worker received a STREAM PREPARE message")));  	logicalrep_read_stream_prepare(s, &prepare_data); -	set_apply_error_context_xact(prepare_data.xid); +	set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);  	elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); @@ -1124,7 +1128,7 @@ apply_handle_stream_start(StringInfo s)  				(errcode(ERRCODE_PROTOCOL_VIOLATION),  				 errmsg_internal("invalid transaction ID in streamed replication transaction"))); -	set_apply_error_context_xact(stream_xid); +	set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);  	/*  	 * Initialize the worker's stream_fileset if we haven't yet. This will be @@ -1213,7 +1217,7 @@ apply_handle_stream_abort(StringInfo s)  	 */  	if (xid == subxid)  	{ -		set_apply_error_context_xact(xid); +		set_apply_error_context_xact(xid, InvalidXLogRecPtr);  		stream_cleanup_files(MyLogicalRepWorker->subid, xid);  	}  	else @@ -1239,7 +1243,7 @@ apply_handle_stream_abort(StringInfo s)  		bool		found = false;  		char		path[MAXPGPATH]; -		set_apply_error_context_xact(subxid); +		set_apply_error_context_xact(subxid, InvalidXLogRecPtr);  		subidx = -1;  		begin_replication_step(); @@ -1424,7 +1428,7 @@ apply_handle_stream_commit(StringInfo s)  				 errmsg_internal("STREAM COMMIT message without STREAM STOP")));  	xid = logicalrep_read_stream_commit(s, &commit_data); -	set_apply_error_context_xact(xid); +	set_apply_error_context_xact(xid, commit_data.commit_lsn);  	elog(DEBUG1, "received commit for streamed transaction %u", xid); @@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)  		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);  		pfree(syncslotname); + +		/* +		 * Allocate the origin name in long-lived context for error context +		 * message. +		 */ +		ReplicationOriginNameForTablesync(MySubscription->oid, +										  MyLogicalRepWorker->relid, +										  originname, +										  sizeof(originname)); +		apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, +																   originname);  	}  	else  	{ @@ -3542,6 +3557,13 @@ ApplyWorkerMain(Datum main_arg)  		 * does some initializations on the upstream so let's still call it.  		 */  		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + +		/* +		 * Allocate the origin name in long-lived context for error context +		 * message. +		 */ +		apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, +																   originname);  	}  	/* @@ -3651,36 +3673,51 @@ apply_error_callback(void *arg)  	if (apply_error_callback_arg.command == 0)  		return; +	Assert(errarg->origin_name); +  	if (errarg->rel == NULL)  	{  		if (!TransactionIdIsValid(errarg->remote_xid)) -			errcontext("processing remote data during \"%s\"", +			errcontext("processing remote data for replication origin \"%s\" during \"%s\"", +					   errarg->origin_name,  					   logicalrep_message_type(errarg->command)); -		else -			errcontext("processing remote data during \"%s\" in transaction %u", +		else if (XLogRecPtrIsInvalid(errarg->finish_lsn)) +			errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u", +					   errarg->origin_name,  					   logicalrep_message_type(errarg->command),  					   errarg->remote_xid); +		else +			errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X", +					   errarg->origin_name, +					   logicalrep_message_type(errarg->command), +					   errarg->remote_xid, +					   LSN_FORMAT_ARGS(errarg->finish_lsn));  	}  	else if (errarg->remote_attnum < 0) -		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u", +		errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X", +				   errarg->origin_name,  				   logicalrep_message_type(errarg->command),  				   errarg->rel->remoterel.nspname,  				   errarg->rel->remoterel.relname, -				   errarg->remote_xid); +				   errarg->remote_xid, +				   LSN_FORMAT_ARGS(errarg->finish_lsn));  	else -		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u", +		errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X", +				   errarg->origin_name,  				   logicalrep_message_type(errarg->command),  				   errarg->rel->remoterel.nspname,  				   errarg->rel->remoterel.relname,  				   errarg->rel->remoterel.attnames[errarg->remote_attnum], -				   errarg->remote_xid); +				   errarg->remote_xid, +				   LSN_FORMAT_ARGS(errarg->finish_lsn));  }  /* Set transaction information of apply error callback */  static inline void -set_apply_error_context_xact(TransactionId xid) +set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)  {  	apply_error_callback_arg.remote_xid = xid; +	apply_error_callback_arg.finish_lsn = lsn;  }  /* Reset all information of apply error callback */ @@ -3690,5 +3727,5 @@ reset_apply_error_context_info(void)  	apply_error_callback_arg.command = 0;  	apply_error_callback_arg.rel = NULL;  	apply_error_callback_arg.remote_attnum = -1; -	set_apply_error_context_xact(InvalidTransactionId); +	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);  } | 
