summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/catalog/system_functions.sql9
-rw-r--r--src/backend/replication/logical/origin.c23
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat2
4 files changed, 29 insertions, 7 deletions
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308e443..2d946d6d9e9 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
AS 'pg_stat_reset_slru';
+CREATE OR REPLACE FUNCTION
+ pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE PARALLEL UNSAFE
+AS 'pg_replication_origin_session_setup';
+
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e50dcc..bcd5d9aad62 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1167,6 +1167,14 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
curstate->roident, curstate->acquired_by)));
}
+ else if (curstate->acquired_by != acquired_by)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
+ node, acquired_by)));
+ }
+
/* ok, found slot */
session_replication_state = curstate;
break;
@@ -1181,6 +1189,12 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
errhint("Increase \"max_active_replication_origins\" and try again.")));
else if (session_replication_state == NULL)
{
+ if (acquired_by)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot use PID %d for inactive replication origin with ID %d",
+ acquired_by, node)));
+
/* initialize new slot */
session_replication_state = &replication_states[free_slot];
Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
@@ -1193,9 +1207,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
if (acquired_by == 0)
session_replication_state->acquired_by = MyProcPid;
- else if (session_replication_state->acquired_by != acquired_by)
- elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
- node, acquired_by);
+ else
+ Assert(session_replication_state->acquired_by == acquired_by);
LWLockRelease(ReplicationOriginLock);
@@ -1374,12 +1387,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
+ int pid;
replorigin_check_prerequisites(true, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
- replorigin_session_setup(origin, 0);
+ pid = PG_GETARG_INT32(1);
+ replorigin_session_setup(origin, pid);
replorigin_session_origin = origin;
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index ef0d0f92165..62c21d3670d 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202509091
+#define CATALOG_VERSION_NO 202509191
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 03e82d28c87..01eba3b5a19 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12235,7 +12235,7 @@
{ oid => '6006',
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
- proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+ proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
{ oid => '6007', descr => 'teardown configured replication progress tracking',