diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/catalog/system_functions.sql | 9 | ||||
-rw-r--r-- | src/backend/replication/logical/origin.c | 23 | ||||
-rw-r--r-- | src/include/catalog/catversion.h | 2 | ||||
-rw-r--r-- | src/include/catalog/pg_proc.dat | 2 |
4 files changed, 29 insertions, 7 deletions
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 566f308e443..2d946d6d9e9 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -650,6 +650,13 @@ LANGUAGE INTERNAL CALLED ON NULL INPUT VOLATILE PARALLEL SAFE AS 'pg_stat_reset_slru'; +CREATE OR REPLACE FUNCTION + pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0) +RETURNS void +LANGUAGE INTERNAL +STRICT VOLATILE PARALLEL UNSAFE +AS 'pg_replication_origin_session_setup'; + -- -- The default permissions for functions mean that anyone can execute them. -- A number of functions shouldn't be executable by just anyone, but rather @@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public; -REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public; +REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public; REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public; diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 87f10e50dcc..bcd5d9aad62 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -1167,6 +1167,14 @@ replorigin_session_setup(RepOriginId node, int acquired_by) curstate->roident, curstate->acquired_by))); } + else if (curstate->acquired_by != acquired_by) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d", + node, acquired_by))); + } + /* ok, found slot */ session_replication_state = curstate; break; @@ -1181,6 +1189,12 @@ replorigin_session_setup(RepOriginId node, int acquired_by) errhint("Increase \"max_active_replication_origins\" and try again."))); else if (session_replication_state == NULL) { + if (acquired_by) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use PID %d for inactive replication origin with ID %d", + acquired_by, node))); + /* initialize new slot */ session_replication_state = &replication_states[free_slot]; Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr); @@ -1193,9 +1207,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by) if (acquired_by == 0) session_replication_state->acquired_by = MyProcPid; - else if (session_replication_state->acquired_by != acquired_by) - elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d", - node, acquired_by); + else + Assert(session_replication_state->acquired_by == acquired_by); LWLockRelease(ReplicationOriginLock); @@ -1374,12 +1387,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS) { char *name; RepOriginId origin; + int pid; replorigin_check_prerequisites(true, false); name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); origin = replorigin_by_name(name, false); - replorigin_session_setup(origin, 0); + pid = PG_GETARG_INT32(1); + replorigin_session_setup(origin, pid); replorigin_session_origin = origin; diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index ef0d0f92165..62c21d3670d 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202509091 +#define CATALOG_VERSION_NO 202509191 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 03e82d28c87..01eba3b5a19 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12235,7 +12235,7 @@ { oid => '6006', descr => 'configure session to maintain replication progress tracking for the passed in origin', proname => 'pg_replication_origin_session_setup', provolatile => 'v', - proparallel => 'u', prorettype => 'void', proargtypes => 'text', + proparallel => 'u', prorettype => 'void', proargtypes => 'text int4', prosrc => 'pg_replication_origin_session_setup' }, { oid => '6007', descr => 'teardown configured replication progress tracking', |