summaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c271
1 files changed, 227 insertions, 44 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 4397123398e..686f1850cab 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -435,6 +435,9 @@ static void QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation
static void QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel,
char *constrName, HeapTuple contuple,
bool recurse, bool recursing, LOCKMODE lockmode);
+static void QueueNNConstraintValidation(List **wqueue, Relation conrel, Relation rel,
+ HeapTuple contuple, bool recurse, bool recursing,
+ LOCKMODE lockmode);
static int transformColumnNameList(Oid relId, List *colList,
int16 *attnums, Oid *atttypids, Oid *attcollids);
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
@@ -498,7 +501,7 @@ static void add_column_collation_dependency(Oid relid, int32 attnum, Oid collid)
static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
LOCKMODE lockmode);
static void set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum,
- LOCKMODE lockmode);
+ bool is_valid, bool queue_validation);
static ObjectAddress ATExecSetNotNull(List **wqueue, Relation rel,
char *constrname, char *colName,
bool recurse, bool recursing,
@@ -1340,7 +1343,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
nncols = AddRelationNotNullConstraints(rel, stmt->nnconstraints,
old_notnulls);
foreach_int(attrnum, nncols)
- set_attnotnull(NULL, rel, attrnum, NoLock);
+ set_attnotnull(NULL, rel, attrnum, true, false);
ObjectAddressSet(address, RelationRelationId, relationId);
@@ -2738,7 +2741,7 @@ MergeAttributes(List *columns, const List *supers, char relpersistence,
/*
* Request attnotnull on columns that have a not-null constraint
- * that's not marked NO INHERIT.
+ * that's not marked NO INHERIT (even if not valid).
*/
nnconstrs = RelationGetNotNullConstraints(RelationGetRelid(relation),
true, false);
@@ -6207,24 +6210,28 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
{
/*
* If we are rebuilding the tuples OR if we added any new but not
- * verified not-null constraints, check all not-null constraints. This
- * is a bit of overkill but it minimizes risk of bugs.
+ * verified not-null constraints, check all *valid* not-null
+ * constraints. This is a bit of overkill but it minimizes risk of
+ * bugs.
*
- * notnull_attrs does *not* collect attribute numbers for not-null
- * constraints over virtual generated columns; instead, they are
- * collected in notnull_virtual_attrs.
+ * notnull_attrs does *not* collect attribute numbers for valid
+ * not-null constraints over virtual generated columns; instead, they
+ * are collected in notnull_virtual_attrs for verification elsewhere.
*/
for (i = 0; i < newTupDesc->natts; i++)
{
- Form_pg_attribute attr = TupleDescAttr(newTupDesc, i);
+ CompactAttribute *attr = TupleDescCompactAttr(newTupDesc, i);
- if (attr->attnotnull && !attr->attisdropped)
+ if (attr->attnullability == ATTNULLABLE_VALID &&
+ !attr->attisdropped)
{
- if (attr->attgenerated != ATTRIBUTE_GENERATED_VIRTUAL)
- notnull_attrs = lappend_int(notnull_attrs, attr->attnum);
+ Form_pg_attribute wholeatt = TupleDescAttr(newTupDesc, i);
+
+ if (wholeatt->attgenerated != ATTRIBUTE_GENERATED_VIRTUAL)
+ notnull_attrs = lappend_int(notnull_attrs, wholeatt->attnum);
else
notnull_virtual_attrs = lappend_int(notnull_virtual_attrs,
- attr->attnum);
+ wholeatt->attnum);
}
}
if (notnull_attrs || notnull_virtual_attrs)
@@ -7809,18 +7816,23 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
}
/*
- * Helper to set pg_attribute.attnotnull if it isn't set, and to tell phase 3
- * to verify it.
+ * set_attnotnull
+ * Helper to update/validate the pg_attribute status of a not-null
+ * constraint
*
- * When called to alter an existing table, 'wqueue' must be given so that we
- * can queue a check that existing tuples pass the constraint. When called
- * from table creation, 'wqueue' should be passed as NULL.
+ * pg_attribute.attnotnull is set true, if it isn't already.
+ * If queue_validation is true, also set up wqueue to validate the constraint.
+ * wqueue may be given as NULL when validation is not needed (e.g., on table
+ * creation).
*/
static void
set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum,
- LOCKMODE lockmode)
+ bool is_valid, bool queue_validation)
{
Form_pg_attribute attr;
+ CompactAttribute *thisatt;
+
+ Assert(!queue_validation || wqueue);
CheckAlterTableIsSafe(rel);
@@ -7844,8 +7856,11 @@ set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum,
elog(ERROR, "cache lookup failed for attribute %d of relation %u",
attnum, RelationGetRelid(rel));
+ thisatt = TupleDescCompactAttr(RelationGetDescr(rel), attnum - 1);
+ thisatt->attnullability = ATTNULLABLE_VALID;
+
attr = (Form_pg_attribute) GETSTRUCT(tuple);
- Assert(!attr->attnotnull);
+
attr->attnotnull = true;
CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
@@ -7853,7 +7868,8 @@ set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum,
* If the nullness isn't already proven by validated constraints, have
* ALTER TABLE phase 3 test for it.
*/
- if (wqueue && !NotNullImpliedByRelConstraints(rel, attr))
+ if (queue_validation && wqueue &&
+ !NotNullImpliedByRelConstraints(rel, attr))
{
AlteredTableInfo *tab;
@@ -7866,6 +7882,10 @@ set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum,
table_close(attr_rel, RowExclusiveLock);
heap_freetuple(tuple);
}
+ else
+ {
+ CacheInvalidateRelcache(rel);
+ }
}
/*
@@ -7951,6 +7971,15 @@ ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName,
conForm->conislocal = true;
changed = true;
}
+ else if (!conForm->convalidated)
+ {
+ /*
+ * Flip attnotnull and convalidated, and also validate the
+ * constraint.
+ */
+ return ATExecValidateConstraint(wqueue, rel, NameStr(conForm->conname),
+ recurse, recursing, lockmode);
+ }
if (changed)
{
@@ -8013,8 +8042,8 @@ ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName,
InvokeObjectPostAlterHook(RelationRelationId,
RelationGetRelid(rel), attnum);
- /* Mark pg_attribute.attnotnull for the column */
- set_attnotnull(wqueue, rel, attnum, lockmode);
+ /* Mark pg_attribute.attnotnull for the column and queue validation */
+ set_attnotnull(wqueue, rel, attnum, true, true);
/*
* Recurse to propagate the constraint to children that don't have one.
@@ -9417,7 +9446,6 @@ ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd,
bool recurse, LOCKMODE lockmode,
AlterTableUtilityContext *context)
{
- ListCell *lc;
Constraint *pkconstr;
pkconstr = castNode(Constraint, cmd->def);
@@ -9436,33 +9464,73 @@ ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd,
lockmode);
foreach_oid(childrelid, children)
{
- foreach(lc, pkconstr->keys)
+ foreach_node(String, attname, pkconstr->keys)
{
HeapTuple tup;
Form_pg_attribute attrForm;
- char *attname = strVal(lfirst(lc));
- tup = SearchSysCacheAttName(childrelid, attname);
+ tup = SearchSysCacheAttName(childrelid, strVal(attname));
if (!tup)
elog(ERROR, "cache lookup failed for attribute %s of relation %u",
- attname, childrelid);
+ strVal(attname), childrelid);
attrForm = (Form_pg_attribute) GETSTRUCT(tup);
if (!attrForm->attnotnull)
ereport(ERROR,
errmsg("column \"%s\" of table \"%s\" is not marked NOT NULL",
- attname, get_rel_name(childrelid)));
+ strVal(attname), get_rel_name(childrelid)));
ReleaseSysCache(tup);
}
}
}
- /* Insert not-null constraints in the queue for the PK columns */
- foreach(lc, pkconstr->keys)
+ /* Verify that columns are not-null, or request that they be made so */
+ foreach_node(String, column, pkconstr->keys)
{
AlterTableCmd *newcmd;
Constraint *nnconstr;
+ HeapTuple tuple;
+
+ /*
+ * First check if a suitable constraint exists. If it does, we don't
+ * need to request another one. We do need to bail out if it's not
+ * valid, though.
+ */
+ tuple = findNotNullConstraint(RelationGetRelid(rel), strVal(column));
+ if (tuple != NULL)
+ {
+ Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+ /* a NO INHERIT constraint is no good */
+ if (conForm->connoinherit)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot create primary key on column \"%s\"",
+ strVal(column)),
+ /*- translator: third %s is a constraint characteristic such as NOT VALID */
+ errdetail("The constraint \"%s\" on column \"%s\", marked %s, is incompatible with a primary key.",
+ NameStr(conForm->conname), strVal(column), "NO INHERIT"),
+ errhint("You will need to make it inheritable using %s.",
+ "ALTER TABLE ... ALTER CONSTRAINT ... INHERIT"));
+
+ /* an unvalidated constraint is no good */
+ if (!conForm->convalidated)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot create primary key on column \"%s\"",
+ strVal(column)),
+ /*- translator: third %s is a constraint characteristic such as NOT VALID */
+ errdetail("The constraint \"%s\" on column \"%s\", marked %s, is incompatible with a primary key.",
+ NameStr(conForm->conname), strVal(column), "NOT VALID"),
+ errhint("You will need to validate it using %s.",
+ "ALTER TABLE ... VALIDATE CONSTRAINT"));
+
+ /* All good with this one; don't request another */
+ heap_freetuple(tuple);
+ continue;
+ }
- nnconstr = makeNotNullConstraint(lfirst(lc));
+ /* This column is not already not-null, so add it to the queue */
+ nnconstr = makeNotNullConstraint(column);
newcmd = makeNode(AlterTableCmd);
newcmd->subtype = AT_AddConstraint;
@@ -9836,11 +9904,15 @@ ATAddCheckNNConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
constr->conname = ccon->name;
/*
- * If adding a not-null constraint, set the pg_attribute flag and tell
- * phase 3 to verify existing rows, if needed.
+ * If adding a valid not-null constraint, set the pg_attribute flag
+ * and tell phase 3 to verify existing rows, if needed. For an
+ * invalid constraint, just set attnotnull, without queueing
+ * verification.
*/
if (constr->contype == CONSTR_NOTNULL)
- set_attnotnull(wqueue, rel, ccon->attnum, lockmode);
+ set_attnotnull(wqueue, rel, ccon->attnum,
+ !constr->skip_validation,
+ !constr->skip_validation);
ObjectAddressSet(address, ConstraintRelationId, ccon->conoid);
}
@@ -12811,11 +12883,12 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
con = (Form_pg_constraint) GETSTRUCT(tuple);
if (con->contype != CONSTRAINT_FOREIGN &&
- con->contype != CONSTRAINT_CHECK)
+ con->contype != CONSTRAINT_CHECK &&
+ con->contype != CONSTRAINT_NOTNULL)
ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key or check constraint",
- constrName, RelationGetRelationName(rel))));
+ errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key, check, or not-null constraint",
+ constrName, RelationGetRelationName(rel)));
if (!con->conenforced)
ereport(ERROR,
@@ -12833,6 +12906,11 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
QueueCheckConstraintValidation(wqueue, conrel, rel, constrName,
tuple, recurse, recursing, lockmode);
}
+ else if (con->contype == CONSTRAINT_NOTNULL)
+ {
+ QueueNNConstraintValidation(wqueue, conrel, rel,
+ tuple, recurse, recursing, lockmode);
+ }
ObjectAddressSet(address, ConstraintRelationId, con->oid);
}
@@ -13050,6 +13128,109 @@ QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel,
}
/*
+ * QueueNNConstraintValidation
+ *
+ * Add an entry to the wqueue to validate the given not-null constraint in
+ * Phase 3 and update the convalidated field in the pg_constraint catalog for
+ * the specified relation and all its inheriting children.
+ */
+static void
+QueueNNConstraintValidation(List **wqueue, Relation conrel, Relation rel,
+ HeapTuple contuple, bool recurse, bool recursing,
+ LOCKMODE lockmode)
+{
+ Form_pg_constraint con;
+ AlteredTableInfo *tab;
+ HeapTuple copyTuple;
+ Form_pg_constraint copy_con;
+ List *children = NIL;
+ AttrNumber attnum;
+ char *colname;
+
+ con = (Form_pg_constraint) GETSTRUCT(contuple);
+ Assert(con->contype == CONSTRAINT_NOTNULL);
+
+ attnum = extractNotNullColumn(contuple);
+
+ /*
+ * If we're recursing, we've already done this for parent, so skip it.
+ * Also, if the constraint is a NO INHERIT constraint, we shouldn't try to
+ * look for it in the children.
+ *
+ * We recurse before validating on the parent, to reduce risk of
+ * deadlocks.
+ */
+ if (!recursing && !con->connoinherit)
+ children = find_all_inheritors(RelationGetRelid(rel), lockmode, NULL);
+
+ colname = get_attname(RelationGetRelid(rel), attnum, false);
+ foreach_oid(childoid, children)
+ {
+ Relation childrel;
+ HeapTuple contup;
+ Form_pg_constraint childcon;
+ char *conname;
+
+ if (childoid == RelationGetRelid(rel))
+ continue;
+
+ /*
+ * If we are told not to recurse, there had better not be any child
+ * tables, because we can't mark the constraint on the parent valid
+ * unless it is valid for all child tables.
+ */
+ if (!recurse)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("constraint must be validated on child tables too"));
+
+ /*
+ * The column on child might have a different attnum, so search by
+ * column name.
+ */
+ contup = findNotNullConstraint(childoid, colname);
+ if (!contup)
+ elog(ERROR, "cache lookup failed for not-null constraint on column \"%s\" of relation \"%s\"",
+ colname, get_rel_name(childoid));
+ childcon = (Form_pg_constraint) GETSTRUCT(contup);
+ if (childcon->convalidated)
+ continue;
+
+ /* find_all_inheritors already got lock */
+ childrel = table_open(childoid, NoLock);
+ conname = pstrdup(NameStr(childcon->conname));
+
+ /* XXX improve ATExecValidateConstraint API to avoid double search */
+ ATExecValidateConstraint(wqueue, childrel, conname,
+ false, true, lockmode);
+ table_close(childrel, NoLock);
+ }
+
+ /* Set attnotnull appropriately without queueing another validation */
+ set_attnotnull(NULL, rel, attnum, true, false);
+
+ tab = ATGetQueueEntry(wqueue, rel);
+ tab->verify_new_notnull = true;
+
+ /*
+ * Invalidate relcache so that others see the new validated constraint.
+ */
+ CacheInvalidateRelcache(rel);
+
+ /*
+ * Now update the catalogs, while we have the door open.
+ */
+ copyTuple = heap_copytuple(contuple);
+ copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
+ copy_con->convalidated = true;
+ CatalogTupleUpdate(conrel, &copyTuple->t_self, copyTuple);
+
+ InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0);
+
+ heap_freetuple(copyTuple);
+}
+
+/*
* transformColumnNameList - transform list of column names
*
* Lookup each name and return its attnum and, optionally, type and collation
@@ -19770,17 +19951,19 @@ PartConstraintImpliedByRelConstraint(Relation scanrel,
for (i = 1; i <= natts; i++)
{
- Form_pg_attribute att = TupleDescAttr(scanrel->rd_att, i - 1);
+ CompactAttribute *att = TupleDescCompactAttr(scanrel->rd_att, i - 1);
- if (att->attnotnull && !att->attisdropped)
+ /* invalid not-null constraint must be ignored here */
+ if (att->attnullability == ATTNULLABLE_VALID && !att->attisdropped)
{
+ Form_pg_attribute wholeatt = TupleDescAttr(scanrel->rd_att, i - 1);
NullTest *ntest = makeNode(NullTest);
ntest->arg = (Expr *) makeVar(1,
i,
- att->atttypid,
- att->atttypmod,
- att->attcollation,
+ wholeatt->atttypid,
+ wholeatt->atttypmod,
+ wholeatt->attcollation,
0);
ntest->nulltesttype = IS_NOT_NULL;