summaryrefslogtreecommitdiff
path: root/src/backend/commands/lockcmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/lockcmds.c')
-rw-r--r--src/backend/commands/lockcmds.c172
1 files changed, 100 insertions, 72 deletions
diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c
index 875f868b969..91642ce03de 100644
--- a/src/backend/commands/lockcmds.c
+++ b/src/backend/commands/lockcmds.c
@@ -23,10 +23,12 @@
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/lsyscache.h"
+#include "utils/syscache.h"
-static void LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait,
- bool recurse);
-
+static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait);
+static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode);
+static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid,
+ Oid oldrelid, void *arg);
/*
* LOCK TABLE
@@ -51,98 +53,124 @@ LockTableCommand(LockStmt *lockstmt)
foreach(p, lockstmt->relations)
{
RangeVar *rv = (RangeVar *) lfirst(p);
- Relation rel;
bool recurse = interpretInhOption(rv->inhOpt);
Oid reloid;
- reloid = RangeVarGetRelid(rv, lockstmt->mode, false, lockstmt->nowait);
- rel = relation_open(reloid, NoLock);
+ reloid = RangeVarGetRelidExtended(rv, lockstmt->mode, false,
+ lockstmt->nowait,
+ RangeVarCallbackForLockTable,
+ (void *) &lockstmt->mode);
- LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse);
+ if (recurse)
+ LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait);
}
}
/*
- * Apply LOCK TABLE recursively over an inheritance tree
+ * Before acquiring a table lock on the named table, check whether we have
+ * permission to do so.
*/
static void
-LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, bool recurse)
+RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
+ void *arg)
{
+ LOCKMODE lockmode = * (LOCKMODE *) arg;
+ char relkind;
AclResult aclresult;
- Oid reloid = RelationGetRelid(rel);
- /* Verify adequate privilege */
- if (lockmode == AccessShareLock)
- aclresult = pg_class_aclcheck(reloid, GetUserId(),
- ACL_SELECT);
- else
- aclresult = pg_class_aclcheck(reloid, GetUserId(),
- ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE);
- if (aclresult != ACLCHECK_OK)
- aclcheck_error(aclresult, ACL_KIND_CLASS,
- RelationGetRelationName(rel));
+ if (!OidIsValid(relid))
+ return; /* doesn't exist, so no permissions check */
+ relkind = get_rel_relkind(relid);
+ if (!relkind)
+ return; /* woops, concurrently dropped; no permissions check */
/* Currently, we only allow plain tables to be locked */
- if (rel->rd_rel->relkind != RELKIND_RELATION)
+ if (relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table",
- RelationGetRelationName(rel))));
+ rv->relname)));
- /*
- * If requested, recurse to children. We use find_inheritance_children
- * not find_all_inheritors to avoid taking locks far in advance of
- * checking privileges. This means we'll visit multiply-inheriting
- * children more than once, but that's no problem.
- */
- if (recurse)
+ /* Check permissions. */
+ aclresult = LockTableAclCheck(relid, lockmode);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_CLASS, rv->relname);
+}
+
+/*
+ * Apply LOCK TABLE recursively over an inheritance tree
+ *
+ * We use find_inheritance_children not find_all_inheritors to avoid taking
+ * locks far in advance of checking privileges. This means we'll visit
+ * multiply-inheriting children more than once, but that's no problem.
+ */
+static void
+LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
+{
+ List *children;
+ ListCell *lc;
+
+ children = find_inheritance_children(reloid, NoLock);
+
+ foreach(lc, children)
{
- List *children = find_inheritance_children(reloid, NoLock);
- ListCell *lc;
- Relation childrel;
+ Oid childreloid = lfirst_oid(lc);
+ AclResult aclresult;
+
+ /* Check permissions before acquiring the lock. */
+ aclresult = LockTableAclCheck(childreloid, lockmode);
+ if (aclresult != ACLCHECK_OK)
+ {
+ char *relname = get_rel_name(childreloid);
+ if (!relname)
+ continue; /* child concurrently dropped, just skip it */
+ aclcheck_error(aclresult, ACL_KIND_CLASS, relname);
+ }
- foreach(lc, children)
+ /* We have enough rights to lock the relation; do so. */
+ if (!nowait)
+ LockRelationOid(childreloid, lockmode);
+ else if (!ConditionalLockRelationOid(childreloid, lockmode))
{
- Oid childreloid = lfirst_oid(lc);
-
- /*
- * Acquire the lock, to protect against concurrent drops. Note
- * that a lock against an already-dropped relation's OID won't
- * fail.
- */
- if (!nowait)
- LockRelationOid(childreloid, lockmode);
- else if (!ConditionalLockRelationOid(childreloid, lockmode))
- {
- /* try to throw error by name; relation could be deleted... */
- char *relname = get_rel_name(childreloid);
-
- if (relname)
- ereport(ERROR,
- (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
- errmsg("could not obtain lock on relation \"%s\"",
- relname)));
- else
- ereport(ERROR,
- (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
- errmsg("could not obtain lock on relation with OID %u",
- reloid)));
- }
-
- /*
- * Now that we have the lock, check to see if the relation really
- * exists or not.
- */
- childrel = try_relation_open(childreloid, NoLock);
- if (!childrel)
- {
- /* Release useless lock */
- UnlockRelationOid(childreloid, lockmode);
- }
-
- LockTableRecurse(childrel, lockmode, nowait, recurse);
+ /* try to throw error by name; relation could be deleted... */
+ char *relname = get_rel_name(childreloid);
+ if (!relname)
+ continue; /* child concurrently dropped, just skip it */
+ ereport(ERROR,
+ (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ errmsg("could not obtain lock on relation \"%s\"",
+ relname)));
}
+
+ /*
+ * Even if we got the lock, child might have been concurrently dropped.
+ * If so, we can skip it.
+ */
+ if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(childreloid)))
+ {
+ /* Release useless lock */
+ UnlockRelationOid(childreloid, lockmode);
+ continue;
+ }
+
+ LockTableRecurse(childreloid, lockmode, nowait);
}
+}
- relation_close(rel, NoLock); /* close rel, keep lock */
+/*
+ * Check whether the current user is permitted to lock this relation.
+ */
+static AclResult
+LockTableAclCheck(Oid reloid, LOCKMODE lockmode)
+{
+ AclResult aclresult;
+
+ /* Verify adequate privilege */
+ if (lockmode == AccessShareLock)
+ aclresult = pg_class_aclcheck(reloid, GetUserId(),
+ ACL_SELECT);
+ else
+ aclresult = pg_class_aclcheck(reloid, GetUserId(),
+ ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE);
+ return aclresult;
}