diff options
Diffstat (limited to 'src/backend/commands/lockcmds.c')
-rw-r--r-- | src/backend/commands/lockcmds.c | 172 |
1 files changed, 100 insertions, 72 deletions
diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c index 875f868b969..91642ce03de 100644 --- a/src/backend/commands/lockcmds.c +++ b/src/backend/commands/lockcmds.c @@ -23,10 +23,12 @@ #include "storage/lmgr.h" #include "utils/acl.h" #include "utils/lsyscache.h" +#include "utils/syscache.h" -static void LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, - bool recurse); - +static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait); +static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode); +static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, + Oid oldrelid, void *arg); /* * LOCK TABLE @@ -51,98 +53,124 @@ LockTableCommand(LockStmt *lockstmt) foreach(p, lockstmt->relations) { RangeVar *rv = (RangeVar *) lfirst(p); - Relation rel; bool recurse = interpretInhOption(rv->inhOpt); Oid reloid; - reloid = RangeVarGetRelid(rv, lockstmt->mode, false, lockstmt->nowait); - rel = relation_open(reloid, NoLock); + reloid = RangeVarGetRelidExtended(rv, lockstmt->mode, false, + lockstmt->nowait, + RangeVarCallbackForLockTable, + (void *) &lockstmt->mode); - LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse); + if (recurse) + LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait); } } /* - * Apply LOCK TABLE recursively over an inheritance tree + * Before acquiring a table lock on the named table, check whether we have + * permission to do so. */ static void -LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, bool recurse) +RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid, + void *arg) { + LOCKMODE lockmode = * (LOCKMODE *) arg; + char relkind; AclResult aclresult; - Oid reloid = RelationGetRelid(rel); - /* Verify adequate privilege */ - if (lockmode == AccessShareLock) - aclresult = pg_class_aclcheck(reloid, GetUserId(), - ACL_SELECT); - else - aclresult = pg_class_aclcheck(reloid, GetUserId(), - ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_CLASS, - RelationGetRelationName(rel)); + if (!OidIsValid(relid)) + return; /* doesn't exist, so no permissions check */ + relkind = get_rel_relkind(relid); + if (!relkind) + return; /* woops, concurrently dropped; no permissions check */ /* Currently, we only allow plain tables to be locked */ - if (rel->rd_rel->relkind != RELKIND_RELATION) + if (relkind != RELKIND_RELATION) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); + rv->relname))); - /* - * If requested, recurse to children. We use find_inheritance_children - * not find_all_inheritors to avoid taking locks far in advance of - * checking privileges. This means we'll visit multiply-inheriting - * children more than once, but that's no problem. - */ - if (recurse) + /* Check permissions. */ + aclresult = LockTableAclCheck(relid, lockmode); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_CLASS, rv->relname); +} + +/* + * Apply LOCK TABLE recursively over an inheritance tree + * + * We use find_inheritance_children not find_all_inheritors to avoid taking + * locks far in advance of checking privileges. This means we'll visit + * multiply-inheriting children more than once, but that's no problem. + */ +static void +LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait) +{ + List *children; + ListCell *lc; + + children = find_inheritance_children(reloid, NoLock); + + foreach(lc, children) { - List *children = find_inheritance_children(reloid, NoLock); - ListCell *lc; - Relation childrel; + Oid childreloid = lfirst_oid(lc); + AclResult aclresult; + + /* Check permissions before acquiring the lock. */ + aclresult = LockTableAclCheck(childreloid, lockmode); + if (aclresult != ACLCHECK_OK) + { + char *relname = get_rel_name(childreloid); + if (!relname) + continue; /* child concurrently dropped, just skip it */ + aclcheck_error(aclresult, ACL_KIND_CLASS, relname); + } - foreach(lc, children) + /* We have enough rights to lock the relation; do so. */ + if (!nowait) + LockRelationOid(childreloid, lockmode); + else if (!ConditionalLockRelationOid(childreloid, lockmode)) { - Oid childreloid = lfirst_oid(lc); - - /* - * Acquire the lock, to protect against concurrent drops. Note - * that a lock against an already-dropped relation's OID won't - * fail. - */ - if (!nowait) - LockRelationOid(childreloid, lockmode); - else if (!ConditionalLockRelationOid(childreloid, lockmode)) - { - /* try to throw error by name; relation could be deleted... */ - char *relname = get_rel_name(childreloid); - - if (relname) - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on relation \"%s\"", - relname))); - else - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on relation with OID %u", - reloid))); - } - - /* - * Now that we have the lock, check to see if the relation really - * exists or not. - */ - childrel = try_relation_open(childreloid, NoLock); - if (!childrel) - { - /* Release useless lock */ - UnlockRelationOid(childreloid, lockmode); - } - - LockTableRecurse(childrel, lockmode, nowait, recurse); + /* try to throw error by name; relation could be deleted... */ + char *relname = get_rel_name(childreloid); + if (!relname) + continue; /* child concurrently dropped, just skip it */ + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation \"%s\"", + relname))); } + + /* + * Even if we got the lock, child might have been concurrently dropped. + * If so, we can skip it. + */ + if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(childreloid))) + { + /* Release useless lock */ + UnlockRelationOid(childreloid, lockmode); + continue; + } + + LockTableRecurse(childreloid, lockmode, nowait); } +} - relation_close(rel, NoLock); /* close rel, keep lock */ +/* + * Check whether the current user is permitted to lock this relation. + */ +static AclResult +LockTableAclCheck(Oid reloid, LOCKMODE lockmode) +{ + AclResult aclresult; + + /* Verify adequate privilege */ + if (lockmode == AccessShareLock) + aclresult = pg_class_aclcheck(reloid, GetUserId(), + ACL_SELECT); + else + aclresult = pg_class_aclcheck(reloid, GetUserId(), + ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE); + return aclresult; } |