summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/origin.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/origin.c')
-rw-r--r--src/backend/replication/logical/origin.c23
1 files changed, 19 insertions, 4 deletions
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e50dcc..bcd5d9aad62 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1167,6 +1167,14 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
curstate->roident, curstate->acquired_by)));
}
+ else if (curstate->acquired_by != acquired_by)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
+ node, acquired_by)));
+ }
+
/* ok, found slot */
session_replication_state = curstate;
break;
@@ -1181,6 +1189,12 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
errhint("Increase \"max_active_replication_origins\" and try again.")));
else if (session_replication_state == NULL)
{
+ if (acquired_by)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot use PID %d for inactive replication origin with ID %d",
+ acquired_by, node)));
+
/* initialize new slot */
session_replication_state = &replication_states[free_slot];
Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
@@ -1193,9 +1207,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
if (acquired_by == 0)
session_replication_state->acquired_by = MyProcPid;
- else if (session_replication_state->acquired_by != acquired_by)
- elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
- node, acquired_by);
+ else
+ Assert(session_replication_state->acquired_by == acquired_by);
LWLockRelease(ReplicationOriginLock);
@@ -1374,12 +1387,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
+ int pid;
replorigin_check_prerequisites(true, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
- replorigin_session_setup(origin, 0);
+ pid = PG_GETARG_INT32(1);
+ replorigin_session_setup(origin, pid);
replorigin_session_origin = origin;