diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 271 |
1 files changed, 227 insertions, 44 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 4397123398e..686f1850cab 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -435,6 +435,9 @@ static void QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation static void QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel, char *constrName, HeapTuple contuple, bool recurse, bool recursing, LOCKMODE lockmode); +static void QueueNNConstraintValidation(List **wqueue, Relation conrel, Relation rel, + HeapTuple contuple, bool recurse, bool recursing, + LOCKMODE lockmode); static int transformColumnNameList(Oid relId, List *colList, int16 *attnums, Oid *atttypids, Oid *attcollids); static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, @@ -498,7 +501,7 @@ static void add_column_collation_dependency(Oid relid, int32 attnum, Oid collid) static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, bool recurse, LOCKMODE lockmode); static void set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum, - LOCKMODE lockmode); + bool is_valid, bool queue_validation); static ObjectAddress ATExecSetNotNull(List **wqueue, Relation rel, char *constrname, char *colName, bool recurse, bool recursing, @@ -1340,7 +1343,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, nncols = AddRelationNotNullConstraints(rel, stmt->nnconstraints, old_notnulls); foreach_int(attrnum, nncols) - set_attnotnull(NULL, rel, attrnum, NoLock); + set_attnotnull(NULL, rel, attrnum, true, false); ObjectAddressSet(address, RelationRelationId, relationId); @@ -2738,7 +2741,7 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, /* * Request attnotnull on columns that have a not-null constraint - * that's not marked NO INHERIT. + * that's not marked NO INHERIT (even if not valid). */ nnconstrs = RelationGetNotNullConstraints(RelationGetRelid(relation), true, false); @@ -6207,24 +6210,28 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) { /* * If we are rebuilding the tuples OR if we added any new but not - * verified not-null constraints, check all not-null constraints. This - * is a bit of overkill but it minimizes risk of bugs. + * verified not-null constraints, check all *valid* not-null + * constraints. This is a bit of overkill but it minimizes risk of + * bugs. * - * notnull_attrs does *not* collect attribute numbers for not-null - * constraints over virtual generated columns; instead, they are - * collected in notnull_virtual_attrs. + * notnull_attrs does *not* collect attribute numbers for valid + * not-null constraints over virtual generated columns; instead, they + * are collected in notnull_virtual_attrs for verification elsewhere. */ for (i = 0; i < newTupDesc->natts; i++) { - Form_pg_attribute attr = TupleDescAttr(newTupDesc, i); + CompactAttribute *attr = TupleDescCompactAttr(newTupDesc, i); - if (attr->attnotnull && !attr->attisdropped) + if (attr->attnullability == ATTNULLABLE_VALID && + !attr->attisdropped) { - if (attr->attgenerated != ATTRIBUTE_GENERATED_VIRTUAL) - notnull_attrs = lappend_int(notnull_attrs, attr->attnum); + Form_pg_attribute wholeatt = TupleDescAttr(newTupDesc, i); + + if (wholeatt->attgenerated != ATTRIBUTE_GENERATED_VIRTUAL) + notnull_attrs = lappend_int(notnull_attrs, wholeatt->attnum); else notnull_virtual_attrs = lappend_int(notnull_virtual_attrs, - attr->attnum); + wholeatt->attnum); } } if (notnull_attrs || notnull_virtual_attrs) @@ -7809,18 +7816,23 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse, } /* - * Helper to set pg_attribute.attnotnull if it isn't set, and to tell phase 3 - * to verify it. + * set_attnotnull + * Helper to update/validate the pg_attribute status of a not-null + * constraint * - * When called to alter an existing table, 'wqueue' must be given so that we - * can queue a check that existing tuples pass the constraint. When called - * from table creation, 'wqueue' should be passed as NULL. + * pg_attribute.attnotnull is set true, if it isn't already. + * If queue_validation is true, also set up wqueue to validate the constraint. + * wqueue may be given as NULL when validation is not needed (e.g., on table + * creation). */ static void set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum, - LOCKMODE lockmode) + bool is_valid, bool queue_validation) { Form_pg_attribute attr; + CompactAttribute *thisatt; + + Assert(!queue_validation || wqueue); CheckAlterTableIsSafe(rel); @@ -7844,8 +7856,11 @@ set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum, elog(ERROR, "cache lookup failed for attribute %d of relation %u", attnum, RelationGetRelid(rel)); + thisatt = TupleDescCompactAttr(RelationGetDescr(rel), attnum - 1); + thisatt->attnullability = ATTNULLABLE_VALID; + attr = (Form_pg_attribute) GETSTRUCT(tuple); - Assert(!attr->attnotnull); + attr->attnotnull = true; CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); @@ -7853,7 +7868,8 @@ set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum, * If the nullness isn't already proven by validated constraints, have * ALTER TABLE phase 3 test for it. */ - if (wqueue && !NotNullImpliedByRelConstraints(rel, attr)) + if (queue_validation && wqueue && + !NotNullImpliedByRelConstraints(rel, attr)) { AlteredTableInfo *tab; @@ -7866,6 +7882,10 @@ set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum, table_close(attr_rel, RowExclusiveLock); heap_freetuple(tuple); } + else + { + CacheInvalidateRelcache(rel); + } } /* @@ -7951,6 +7971,15 @@ ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName, conForm->conislocal = true; changed = true; } + else if (!conForm->convalidated) + { + /* + * Flip attnotnull and convalidated, and also validate the + * constraint. + */ + return ATExecValidateConstraint(wqueue, rel, NameStr(conForm->conname), + recurse, recursing, lockmode); + } if (changed) { @@ -8013,8 +8042,8 @@ ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName, InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), attnum); - /* Mark pg_attribute.attnotnull for the column */ - set_attnotnull(wqueue, rel, attnum, lockmode); + /* Mark pg_attribute.attnotnull for the column and queue validation */ + set_attnotnull(wqueue, rel, attnum, true, true); /* * Recurse to propagate the constraint to children that don't have one. @@ -9417,7 +9446,6 @@ ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd, bool recurse, LOCKMODE lockmode, AlterTableUtilityContext *context) { - ListCell *lc; Constraint *pkconstr; pkconstr = castNode(Constraint, cmd->def); @@ -9436,33 +9464,73 @@ ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd, lockmode); foreach_oid(childrelid, children) { - foreach(lc, pkconstr->keys) + foreach_node(String, attname, pkconstr->keys) { HeapTuple tup; Form_pg_attribute attrForm; - char *attname = strVal(lfirst(lc)); - tup = SearchSysCacheAttName(childrelid, attname); + tup = SearchSysCacheAttName(childrelid, strVal(attname)); if (!tup) elog(ERROR, "cache lookup failed for attribute %s of relation %u", - attname, childrelid); + strVal(attname), childrelid); attrForm = (Form_pg_attribute) GETSTRUCT(tup); if (!attrForm->attnotnull) ereport(ERROR, errmsg("column \"%s\" of table \"%s\" is not marked NOT NULL", - attname, get_rel_name(childrelid))); + strVal(attname), get_rel_name(childrelid))); ReleaseSysCache(tup); } } } - /* Insert not-null constraints in the queue for the PK columns */ - foreach(lc, pkconstr->keys) + /* Verify that columns are not-null, or request that they be made so */ + foreach_node(String, column, pkconstr->keys) { AlterTableCmd *newcmd; Constraint *nnconstr; + HeapTuple tuple; + + /* + * First check if a suitable constraint exists. If it does, we don't + * need to request another one. We do need to bail out if it's not + * valid, though. + */ + tuple = findNotNullConstraint(RelationGetRelid(rel), strVal(column)); + if (tuple != NULL) + { + Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* a NO INHERIT constraint is no good */ + if (conForm->connoinherit) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot create primary key on column \"%s\"", + strVal(column)), + /*- translator: third %s is a constraint characteristic such as NOT VALID */ + errdetail("The constraint \"%s\" on column \"%s\", marked %s, is incompatible with a primary key.", + NameStr(conForm->conname), strVal(column), "NO INHERIT"), + errhint("You will need to make it inheritable using %s.", + "ALTER TABLE ... ALTER CONSTRAINT ... INHERIT")); + + /* an unvalidated constraint is no good */ + if (!conForm->convalidated) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot create primary key on column \"%s\"", + strVal(column)), + /*- translator: third %s is a constraint characteristic such as NOT VALID */ + errdetail("The constraint \"%s\" on column \"%s\", marked %s, is incompatible with a primary key.", + NameStr(conForm->conname), strVal(column), "NOT VALID"), + errhint("You will need to validate it using %s.", + "ALTER TABLE ... VALIDATE CONSTRAINT")); + + /* All good with this one; don't request another */ + heap_freetuple(tuple); + continue; + } - nnconstr = makeNotNullConstraint(lfirst(lc)); + /* This column is not already not-null, so add it to the queue */ + nnconstr = makeNotNullConstraint(column); newcmd = makeNode(AlterTableCmd); newcmd->subtype = AT_AddConstraint; @@ -9836,11 +9904,15 @@ ATAddCheckNNConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, constr->conname = ccon->name; /* - * If adding a not-null constraint, set the pg_attribute flag and tell - * phase 3 to verify existing rows, if needed. + * If adding a valid not-null constraint, set the pg_attribute flag + * and tell phase 3 to verify existing rows, if needed. For an + * invalid constraint, just set attnotnull, without queueing + * verification. */ if (constr->contype == CONSTR_NOTNULL) - set_attnotnull(wqueue, rel, ccon->attnum, lockmode); + set_attnotnull(wqueue, rel, ccon->attnum, + !constr->skip_validation, + !constr->skip_validation); ObjectAddressSet(address, ConstraintRelationId, ccon->conoid); } @@ -12811,11 +12883,12 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName, con = (Form_pg_constraint) GETSTRUCT(tuple); if (con->contype != CONSTRAINT_FOREIGN && - con->contype != CONSTRAINT_CHECK) + con->contype != CONSTRAINT_CHECK && + con->contype != CONSTRAINT_NOTNULL) ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key or check constraint", - constrName, RelationGetRelationName(rel)))); + errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key, check, or not-null constraint", + constrName, RelationGetRelationName(rel))); if (!con->conenforced) ereport(ERROR, @@ -12833,6 +12906,11 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName, QueueCheckConstraintValidation(wqueue, conrel, rel, constrName, tuple, recurse, recursing, lockmode); } + else if (con->contype == CONSTRAINT_NOTNULL) + { + QueueNNConstraintValidation(wqueue, conrel, rel, + tuple, recurse, recursing, lockmode); + } ObjectAddressSet(address, ConstraintRelationId, con->oid); } @@ -13050,6 +13128,109 @@ QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel, } /* + * QueueNNConstraintValidation + * + * Add an entry to the wqueue to validate the given not-null constraint in + * Phase 3 and update the convalidated field in the pg_constraint catalog for + * the specified relation and all its inheriting children. + */ +static void +QueueNNConstraintValidation(List **wqueue, Relation conrel, Relation rel, + HeapTuple contuple, bool recurse, bool recursing, + LOCKMODE lockmode) +{ + Form_pg_constraint con; + AlteredTableInfo *tab; + HeapTuple copyTuple; + Form_pg_constraint copy_con; + List *children = NIL; + AttrNumber attnum; + char *colname; + + con = (Form_pg_constraint) GETSTRUCT(contuple); + Assert(con->contype == CONSTRAINT_NOTNULL); + + attnum = extractNotNullColumn(contuple); + + /* + * If we're recursing, we've already done this for parent, so skip it. + * Also, if the constraint is a NO INHERIT constraint, we shouldn't try to + * look for it in the children. + * + * We recurse before validating on the parent, to reduce risk of + * deadlocks. + */ + if (!recursing && !con->connoinherit) + children = find_all_inheritors(RelationGetRelid(rel), lockmode, NULL); + + colname = get_attname(RelationGetRelid(rel), attnum, false); + foreach_oid(childoid, children) + { + Relation childrel; + HeapTuple contup; + Form_pg_constraint childcon; + char *conname; + + if (childoid == RelationGetRelid(rel)) + continue; + + /* + * If we are told not to recurse, there had better not be any child + * tables, because we can't mark the constraint on the parent valid + * unless it is valid for all child tables. + */ + if (!recurse) + ereport(ERROR, + errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("constraint must be validated on child tables too")); + + /* + * The column on child might have a different attnum, so search by + * column name. + */ + contup = findNotNullConstraint(childoid, colname); + if (!contup) + elog(ERROR, "cache lookup failed for not-null constraint on column \"%s\" of relation \"%s\"", + colname, get_rel_name(childoid)); + childcon = (Form_pg_constraint) GETSTRUCT(contup); + if (childcon->convalidated) + continue; + + /* find_all_inheritors already got lock */ + childrel = table_open(childoid, NoLock); + conname = pstrdup(NameStr(childcon->conname)); + + /* XXX improve ATExecValidateConstraint API to avoid double search */ + ATExecValidateConstraint(wqueue, childrel, conname, + false, true, lockmode); + table_close(childrel, NoLock); + } + + /* Set attnotnull appropriately without queueing another validation */ + set_attnotnull(NULL, rel, attnum, true, false); + + tab = ATGetQueueEntry(wqueue, rel); + tab->verify_new_notnull = true; + + /* + * Invalidate relcache so that others see the new validated constraint. + */ + CacheInvalidateRelcache(rel); + + /* + * Now update the catalogs, while we have the door open. + */ + copyTuple = heap_copytuple(contuple); + copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); + copy_con->convalidated = true; + CatalogTupleUpdate(conrel, ©Tuple->t_self, copyTuple); + + InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0); + + heap_freetuple(copyTuple); +} + +/* * transformColumnNameList - transform list of column names * * Lookup each name and return its attnum and, optionally, type and collation @@ -19770,17 +19951,19 @@ PartConstraintImpliedByRelConstraint(Relation scanrel, for (i = 1; i <= natts; i++) { - Form_pg_attribute att = TupleDescAttr(scanrel->rd_att, i - 1); + CompactAttribute *att = TupleDescCompactAttr(scanrel->rd_att, i - 1); - if (att->attnotnull && !att->attisdropped) + /* invalid not-null constraint must be ignored here */ + if (att->attnullability == ATTNULLABLE_VALID && !att->attisdropped) { + Form_pg_attribute wholeatt = TupleDescAttr(scanrel->rd_att, i - 1); NullTest *ntest = makeNode(NullTest); ntest->arg = (Expr *) makeVar(1, i, - att->atttypid, - att->atttypmod, - att->attcollation, + wholeatt->atttypid, + wholeatt->atttypmod, + wholeatt->attcollation, 0); ntest->nulltesttype = IS_NOT_NULL; |