diff options
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/alter.c | 2 | ||||
-rw-r--r-- | src/backend/commands/indexcmds.c | 136 | ||||
-rw-r--r-- | src/backend/commands/lockcmds.c | 172 | ||||
-rw-r--r-- | src/backend/commands/sequence.c | 4 | ||||
-rw-r--r-- | src/backend/commands/tablecmds.c | 141 | ||||
-rw-r--r-- | src/backend/commands/trigger.c | 3 | ||||
-rw-r--r-- | src/backend/commands/vacuum.c | 2 |
7 files changed, 287 insertions, 173 deletions
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index c3212246643..eb0584e9910 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -111,7 +111,7 @@ ExecRenameStmt(RenameStmt *stmt) * in RenameRelation, renameatt, or renametrig. */ relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, - false, false); + false); switch (stmt->renameType) { diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 69aa5bf6466..386b95bca20 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -63,7 +63,10 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo, static Oid GetIndexOpClass(List *opclass, Oid attrType, char *accessMethodName, Oid accessMethodId); static char *ChooseIndexNameAddition(List *colnames); - +static void RangeVarCallbackForReindexTable(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg); +static void RangeVarCallbackForReindexIndex(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg); /* * CheckIndexCompatible @@ -129,7 +132,7 @@ CheckIndexCompatible(Oid oldId, Datum d; /* Caller should already have the relation locked in some way. */ - relationId = RangeVarGetRelid(heapRelation, NoLock, false, false); + relationId = RangeVarGetRelid(heapRelation, NoLock, false); /* * We can pretend isconstraint = false unconditionally. It only serves to * decide the text of an error message that should never happen for us. @@ -1725,33 +1728,74 @@ void ReindexIndex(RangeVar *indexRelation) { Oid indOid; - HeapTuple tuple; + Oid heapOid = InvalidOid; + + /* lock level used here should match index lock reindex_index() */ + indOid = RangeVarGetRelidExtended(indexRelation, AccessExclusiveLock, + false, false, + RangeVarCallbackForReindexIndex, + (void *) &heapOid); + + reindex_index(indOid, false); +} + +/* + * Check permissions on table before acquiring relation lock; also lock + * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid + * deadlocks. + */ +static void +RangeVarCallbackForReindexIndex(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg) +{ + char relkind; + Oid *heapOid = (Oid *) arg; /* - * XXX: This is not safe in the presence of concurrent DDL. We should - * take AccessExclusiveLock here, but that would violate the rule that - * indexes should only be locked after their parent tables. For now, - * we live with it. + * If we previously locked some other index's heap, and the name we're + * looking up no longer refers to that relation, release the now-useless + * lock. */ - indOid = RangeVarGetRelid(indexRelation, NoLock, false, false); - tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indOid)); - if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for relation %u", indOid); + if (relId != oldRelId && OidIsValid(oldRelId)) + { + /* lock level here should match reindex_index() heap lock */ + UnlockRelationOid(*heapOid, ShareLock); + *heapOid = InvalidOid; + } + + /* If the relation does not exist, there's nothing more to do. */ + if (!OidIsValid(relId)) + return; - if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX) + /* + * If the relation does exist, check whether it's an index. But note + * that the relation might have been dropped between the time we did the + * name lookup and now. In that case, there's nothing to do. + */ + relkind = get_rel_relkind(relId); + if (!relkind) + return; + if (relkind != RELKIND_INDEX) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not an index", - indexRelation->relname))); + errmsg("\"%s\" is not an index", relation->relname))); /* Check permissions */ - if (!pg_class_ownercheck(indOid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - indexRelation->relname); - - ReleaseSysCache(tuple); + if (!pg_class_ownercheck(relId, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname); - reindex_index(indOid, false); + /* Lock heap before index to avoid deadlock. */ + if (relId != oldRelId) + { + /* + * Lock level here should match reindex_index() heap lock. + * If the OID isn't valid, it means the index as concurrently dropped, + * which is not a problem for us; just return normally. + */ + *heapOid = IndexGetRelation(relId, true); + if (OidIsValid(*heapOid)) + LockRelationOid(*heapOid, ShareLock); + } } /* @@ -1762,27 +1806,10 @@ void ReindexTable(RangeVar *relation) { Oid heapOid; - HeapTuple tuple; /* The lock level used here should match reindex_relation(). */ - heapOid = RangeVarGetRelid(relation, ShareLock, false, false); - tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid)); - if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ - elog(ERROR, "cache lookup failed for relation %u", heapOid); - - if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION && - ((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_TOASTVALUE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - relation->relname))); - - /* Check permissions */ - if (!pg_class_ownercheck(heapOid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - relation->relname); - - ReleaseSysCache(tuple); + heapOid = RangeVarGetRelidExtended(relation, ShareLock, false, false, + RangeVarCallbackForReindexTable, NULL); if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST)) ereport(NOTICE, @@ -1791,6 +1818,37 @@ ReindexTable(RangeVar *relation) } /* + * Check permissions on table before acquiring relation lock. + */ +static void +RangeVarCallbackForReindexTable(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg) +{ + char relkind; + + /* Nothing to do if the relation was not found. */ + if (!OidIsValid(relId)) + return; + + /* + * If the relation does exist, check whether it's an index. But note + * that the relation might have been dropped between the time we did the + * name lookup and now. In that case, there's nothing to do. + */ + relkind = get_rel_relkind(relId); + if (!relkind) + return; + if (relkind != RELKIND_RELATION && relkind != RELKIND_TOASTVALUE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table", relation->relname))); + + /* Check permissions */ + if (!pg_class_ownercheck(relId, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname); +} + +/* * ReindexDatabase * Recreate indexes of a database. * diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c index 875f868b969..91642ce03de 100644 --- a/src/backend/commands/lockcmds.c +++ b/src/backend/commands/lockcmds.c @@ -23,10 +23,12 @@ #include "storage/lmgr.h" #include "utils/acl.h" #include "utils/lsyscache.h" +#include "utils/syscache.h" -static void LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, - bool recurse); - +static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait); +static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode); +static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, + Oid oldrelid, void *arg); /* * LOCK TABLE @@ -51,98 +53,124 @@ LockTableCommand(LockStmt *lockstmt) foreach(p, lockstmt->relations) { RangeVar *rv = (RangeVar *) lfirst(p); - Relation rel; bool recurse = interpretInhOption(rv->inhOpt); Oid reloid; - reloid = RangeVarGetRelid(rv, lockstmt->mode, false, lockstmt->nowait); - rel = relation_open(reloid, NoLock); + reloid = RangeVarGetRelidExtended(rv, lockstmt->mode, false, + lockstmt->nowait, + RangeVarCallbackForLockTable, + (void *) &lockstmt->mode); - LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse); + if (recurse) + LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait); } } /* - * Apply LOCK TABLE recursively over an inheritance tree + * Before acquiring a table lock on the named table, check whether we have + * permission to do so. */ static void -LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, bool recurse) +RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid, + void *arg) { + LOCKMODE lockmode = * (LOCKMODE *) arg; + char relkind; AclResult aclresult; - Oid reloid = RelationGetRelid(rel); - /* Verify adequate privilege */ - if (lockmode == AccessShareLock) - aclresult = pg_class_aclcheck(reloid, GetUserId(), - ACL_SELECT); - else - aclresult = pg_class_aclcheck(reloid, GetUserId(), - ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_CLASS, - RelationGetRelationName(rel)); + if (!OidIsValid(relid)) + return; /* doesn't exist, so no permissions check */ + relkind = get_rel_relkind(relid); + if (!relkind) + return; /* woops, concurrently dropped; no permissions check */ /* Currently, we only allow plain tables to be locked */ - if (rel->rd_rel->relkind != RELKIND_RELATION) + if (relkind != RELKIND_RELATION) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); + rv->relname))); - /* - * If requested, recurse to children. We use find_inheritance_children - * not find_all_inheritors to avoid taking locks far in advance of - * checking privileges. This means we'll visit multiply-inheriting - * children more than once, but that's no problem. - */ - if (recurse) + /* Check permissions. */ + aclresult = LockTableAclCheck(relid, lockmode); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_CLASS, rv->relname); +} + +/* + * Apply LOCK TABLE recursively over an inheritance tree + * + * We use find_inheritance_children not find_all_inheritors to avoid taking + * locks far in advance of checking privileges. This means we'll visit + * multiply-inheriting children more than once, but that's no problem. + */ +static void +LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait) +{ + List *children; + ListCell *lc; + + children = find_inheritance_children(reloid, NoLock); + + foreach(lc, children) { - List *children = find_inheritance_children(reloid, NoLock); - ListCell *lc; - Relation childrel; + Oid childreloid = lfirst_oid(lc); + AclResult aclresult; + + /* Check permissions before acquiring the lock. */ + aclresult = LockTableAclCheck(childreloid, lockmode); + if (aclresult != ACLCHECK_OK) + { + char *relname = get_rel_name(childreloid); + if (!relname) + continue; /* child concurrently dropped, just skip it */ + aclcheck_error(aclresult, ACL_KIND_CLASS, relname); + } - foreach(lc, children) + /* We have enough rights to lock the relation; do so. */ + if (!nowait) + LockRelationOid(childreloid, lockmode); + else if (!ConditionalLockRelationOid(childreloid, lockmode)) { - Oid childreloid = lfirst_oid(lc); - - /* - * Acquire the lock, to protect against concurrent drops. Note - * that a lock against an already-dropped relation's OID won't - * fail. - */ - if (!nowait) - LockRelationOid(childreloid, lockmode); - else if (!ConditionalLockRelationOid(childreloid, lockmode)) - { - /* try to throw error by name; relation could be deleted... */ - char *relname = get_rel_name(childreloid); - - if (relname) - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on relation \"%s\"", - relname))); - else - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on relation with OID %u", - reloid))); - } - - /* - * Now that we have the lock, check to see if the relation really - * exists or not. - */ - childrel = try_relation_open(childreloid, NoLock); - if (!childrel) - { - /* Release useless lock */ - UnlockRelationOid(childreloid, lockmode); - } - - LockTableRecurse(childrel, lockmode, nowait, recurse); + /* try to throw error by name; relation could be deleted... */ + char *relname = get_rel_name(childreloid); + if (!relname) + continue; /* child concurrently dropped, just skip it */ + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation \"%s\"", + relname))); } + + /* + * Even if we got the lock, child might have been concurrently dropped. + * If so, we can skip it. + */ + if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(childreloid))) + { + /* Release useless lock */ + UnlockRelationOid(childreloid, lockmode); + continue; + } + + LockTableRecurse(childreloid, lockmode, nowait); } +} - relation_close(rel, NoLock); /* close rel, keep lock */ +/* + * Check whether the current user is permitted to lock this relation. + */ +static AclResult +LockTableAclCheck(Oid reloid, LOCKMODE lockmode) +{ + AclResult aclresult; + + /* Verify adequate privilege */ + if (lockmode == AccessShareLock) + aclresult = pg_class_aclcheck(reloid, GetUserId(), + ACL_SELECT); + else + aclresult = pg_class_aclcheck(reloid, GetUserId(), + ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE); + return aclresult; } diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 54660f44805..41e5301fc5f 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -425,7 +425,7 @@ AlterSequence(AlterSeqStmt *stmt) List *owned_by; /* Open and lock sequence. */ - relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false, false); + relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false); init_sequence(relid, &elm, &seqrel); /* allow ALTER to sequence owner only */ @@ -513,7 +513,7 @@ nextval(PG_FUNCTION_ARGS) * whether the performance penalty is material in practice, but for now, * we do it this way. */ - relid = RangeVarGetRelid(sequence, NoLock, false, false); + relid = RangeVarGetRelid(sequence, NoLock, false); PG_RETURN_INT64(nextval_internal(relid)); } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c4622c04125..1c3fe6a9630 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -235,6 +235,12 @@ static const struct dropmsgstrings dropmsgstringarray[] = { {'\0', 0, NULL, NULL, NULL, NULL} }; +struct DropRelationCallbackState +{ + char relkind; + Oid heapOid; +}; + /* Alter table target-type flags for ATSimplePermissions */ #define ATT_TABLE 0x0001 #define ATT_VIEW 0x0002 @@ -375,6 +381,9 @@ static void copy_relation_data(SMgrRelation rel, SMgrRelation dst, ForkNumber forkNum, char relpersistence); static const char *storage_name(char c); +static void RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, + Oid oldRelOid, void *arg); + /* ---------------------------------------------------------------- * DefineRelation @@ -751,9 +760,8 @@ RemoveRelations(DropStmt *drop) { RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell)); Oid relOid; - HeapTuple tuple; - Form_pg_class classform; ObjectAddress obj; + struct DropRelationCallbackState state; /* * These next few steps are a great deal like relation_openrv, but we @@ -767,15 +775,13 @@ RemoveRelations(DropStmt *drop) */ AcceptInvalidationMessages(); - /* - * Look up the appropriate relation using namespace search. - * - * XXX: Doing this without a lock is unsafe in the presence of - * concurrent DDL, but acquiring a lock here might violate the rule - * that a table must be locked before its corresponding index. - * So, for now, we ignore the hazard. - */ - relOid = RangeVarGetRelid(rel, NoLock, true, false); + /* Look up the appropriate relation using namespace search. */ + state.relkind = relkind; + state.heapOid = InvalidOid; + relOid = RangeVarGetRelidExtended(rel, AccessExclusiveLock, true, + false, + RangeVarCallbackForDropRelation, + (void *) &state); /* Not there? */ if (!OidIsValid(relOid)) @@ -784,57 +790,12 @@ RemoveRelations(DropStmt *drop) continue; } - /* - * In DROP INDEX, attempt to acquire lock on the parent table before - * locking the index. index_drop() will need this anyway, and since - * regular queries lock tables before their indexes, we risk deadlock - * if we do it the other way around. No error if we don't find a - * pg_index entry, though --- that most likely means it isn't an - * index, and we'll fail below. - */ - if (relkind == RELKIND_INDEX) - { - tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(relOid)); - if (HeapTupleIsValid(tuple)) - { - Form_pg_index index = (Form_pg_index) GETSTRUCT(tuple); - - LockRelationOid(index->indrelid, AccessExclusiveLock); - ReleaseSysCache(tuple); - } - } - - /* Get the lock before trying to fetch the syscache entry */ - LockRelationOid(relOid, AccessExclusiveLock); - - tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid)); - if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for relation %u", relOid); - classform = (Form_pg_class) GETSTRUCT(tuple); - - if (classform->relkind != relkind) - DropErrorMsgWrongType(rel->relname, classform->relkind, relkind); - - /* Allow DROP to either table owner or schema owner */ - if (!pg_class_ownercheck(relOid, GetUserId()) && - !pg_namespace_ownercheck(classform->relnamespace, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - rel->relname); - - if (!allowSystemTableMods && IsSystemClass(classform)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - rel->relname))); - /* OK, we're ready to delete this one */ obj.classId = RelationRelationId; obj.objectId = relOid; obj.objectSubId = 0; add_exact_object_address(&obj, objects); - - ReleaseSysCache(tuple); } performMultipleDeletions(objects, drop->behavior); @@ -843,6 +804,74 @@ RemoveRelations(DropStmt *drop) } /* + * Before acquiring a table lock, check whether we have sufficient rights. + * In the case of DROP INDEX, also try to lock the table before the index. + */ +static void +RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid, + void *arg) +{ + HeapTuple tuple; + struct DropRelationCallbackState *state; + char relkind; + Form_pg_class classform; + + state = (struct DropRelationCallbackState *) arg; + relkind = state->relkind; + + /* + * If we previously locked some other index's heap, and the name we're + * looking up no longer refers to that relation, release the now-useless + * lock. + */ + if (relOid != oldRelOid && OidIsValid(state->heapOid)) + { + UnlockRelationOid(state->heapOid, AccessExclusiveLock); + state->heapOid = InvalidOid; + } + + /* Didn't find a relation, so need for locking or permission checks. */ + if (!OidIsValid(relOid)) + return; + + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid)); + if (!HeapTupleIsValid(tuple)) + return; /* concurrently dropped, so nothing to do */ + classform = (Form_pg_class) GETSTRUCT(tuple); + + if (classform->relkind != relkind) + DropErrorMsgWrongType(rel->relname, classform->relkind, relkind); + + /* Allow DROP to either table owner or schema owner */ + if (!pg_class_ownercheck(relOid, GetUserId()) && + !pg_namespace_ownercheck(classform->relnamespace, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + rel->relname); + + if (!allowSystemTableMods && IsSystemClass(classform)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied: \"%s\" is a system catalog", + rel->relname))); + + ReleaseSysCache(tuple); + + /* + * In DROP INDEX, attempt to acquire lock on the parent table before + * locking the index. index_drop() will need this anyway, and since + * regular queries lock tables before their indexes, we risk deadlock + * if we do it the other way around. No error if we don't find a + * pg_index entry, though --- the relation may have been droppd. + */ + if (relkind == RELKIND_INDEX && relOid != oldRelOid) + { + state->heapOid = IndexGetRelation(relOid, true); + if (OidIsValid(state->heapOid)) + LockRelationOid(state->heapOid, AccessExclusiveLock); + } +} + +/* * ExecuteTruncate * Executes a TRUNCATE command. * diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 5589528e5ca..b205deca29f 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -201,8 +201,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString, * we might end up creating a pg_constraint entry referencing a * nonexistent table. */ - constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false, - false); + constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false); } /* permission checks */ diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 5eec56950cc..e70dbedbd05 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -330,7 +330,7 @@ get_rel_oids(Oid relid, const RangeVar *vacrel) * alternative, since we're going to commit this transaction and * begin a new one between now and then. */ - relid = RangeVarGetRelid(vacrel, NoLock, false, false); + relid = RangeVarGetRelid(vacrel, NoLock, false); /* Make a relation list entry for this guy */ oldcontext = MemoryContextSwitchTo(vac_context); |