diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 222 |
1 files changed, 222 insertions, 0 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 670af18f4c3..c8fa39d025d 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -399,6 +399,7 @@ static void ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode) static void drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid); static void ATExecAddOf(Relation rel, const TypeName *ofTypename, LOCKMODE lockmode); static void ATExecDropOf(Relation rel, LOCKMODE lockmode); +static void ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode); static void ATExecGenericOptions(Relation rel, List *options); static void copy_relation_data(SMgrRelation rel, SMgrRelation dst, @@ -2809,6 +2810,7 @@ AlterTableGetLockLevel(List *cmds) case AT_DisableTrigUser: case AT_AddIndex: /* from ADD CONSTRAINT */ case AT_AddIndexConstraint: + case AT_ReplicaIdentity: cmd_lockmode = ShareRowExclusiveLock; break; @@ -3140,6 +3142,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, cmd->subtype = AT_ValidateConstraintRecurse; pass = AT_PASS_MISC; break; + case AT_ReplicaIdentity: /* REPLICA IDENTITY ... */ + ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW); + pass = AT_PASS_MISC; + /* This command never recurses */ + /* No command-specific prep needed */ + break; case AT_EnableTrig: /* ENABLE TRIGGER variants */ case AT_EnableAlwaysTrig: case AT_EnableReplicaTrig: @@ -3440,6 +3448,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, case AT_DropOf: ATExecDropOf(rel, lockmode); break; + case AT_ReplicaIdentity: + ATExecReplicaIdentity(rel, (ReplicaIdentityStmt *) cmd->def, lockmode); + break; case AT_GenericOptions: ATExecGenericOptions(rel, (List *) cmd->def); break; @@ -10010,6 +10021,217 @@ ATExecDropOf(Relation rel, LOCKMODE lockmode) } /* + * relation_mark_replica_identity: Update a table's replica identity + * + * Iff ri_type = REPLICA_IDENTITY_INDEX, indexOid must be the Oid of a suitable + * index. Otherwise, it should be InvalidOid. + */ +static void +relation_mark_replica_identity(Relation rel, char ri_type, Oid indexOid, + bool is_internal) +{ + Relation pg_index; + Relation pg_class; + HeapTuple pg_class_tuple; + HeapTuple pg_index_tuple; + Form_pg_class pg_class_form; + Form_pg_index pg_index_form; + + ListCell *index; + + /* + * Check whether relreplident has changed, and update it if so. + */ + pg_class = heap_open(RelationRelationId, RowExclusiveLock); + pg_class_tuple = SearchSysCacheCopy1(RELOID, + ObjectIdGetDatum(RelationGetRelid(rel))); + if (!HeapTupleIsValid(pg_class_tuple)) + elog(ERROR, "cache lookup failed for relation \"%s\"", + RelationGetRelationName(rel)); + pg_class_form = (Form_pg_class) GETSTRUCT(pg_class_tuple); + if (pg_class_form->relreplident != ri_type) + { + pg_class_form->relreplident = ri_type; + simple_heap_update(pg_class, &pg_class_tuple->t_self, pg_class_tuple); + CatalogUpdateIndexes(pg_class, pg_class_tuple); + } + heap_close(pg_class, RowExclusiveLock); + heap_freetuple(pg_class_tuple); + + /* + * Check whether the correct index is marked indisreplident; if so, we're + * done. + */ + if (OidIsValid(indexOid)) + { + Assert(ri_type == REPLICA_IDENTITY_INDEX); + + pg_index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexOid)); + if (!HeapTupleIsValid(pg_index_tuple)) + elog(ERROR, "cache lookup failed for index %u", indexOid); + pg_index_form = (Form_pg_index) GETSTRUCT(pg_index_tuple); + + if (pg_index_form->indisreplident) + { + ReleaseSysCache(pg_index_tuple); + return; + } + ReleaseSysCache(pg_index_tuple); + } + + /* + * Clear the indisreplident flag from any index that had it previously, and + * set it for any index that should have it now. + */ + pg_index = heap_open(IndexRelationId, RowExclusiveLock); + foreach(index, RelationGetIndexList(rel)) + { + Oid thisIndexOid = lfirst_oid(index); + bool dirty = false; + + pg_index_tuple = SearchSysCacheCopy1(INDEXRELID, + ObjectIdGetDatum(thisIndexOid)); + if (!HeapTupleIsValid(pg_index_tuple)) + elog(ERROR, "cache lookup failed for index %u", thisIndexOid); + pg_index_form = (Form_pg_index) GETSTRUCT(pg_index_tuple); + + /* + * Unset the bit if set. We know it's wrong because we checked this + * earlier. + */ + if (pg_index_form->indisreplident) + { + dirty = true; + pg_index_form->indisreplident = false; + } + else if (thisIndexOid == indexOid) + { + dirty = true; + pg_index_form->indisreplident = true; + } + + if (dirty) + { + simple_heap_update(pg_index, &pg_index_tuple->t_self, pg_index_tuple); + CatalogUpdateIndexes(pg_index, pg_index_tuple); + InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0, + InvalidOid, is_internal); + } + heap_freetuple(pg_index_tuple); + } + + heap_close(pg_index, RowExclusiveLock); +} + +/* + * ALTER TABLE <name> REPLICA IDENTITY ... + */ +static void +ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode) +{ + Oid indexOid; + Relation indexRel; + int key; + + if (stmt->identity_type == REPLICA_IDENTITY_DEFAULT) + { + relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true); + return; + } + else if (stmt->identity_type == REPLICA_IDENTITY_FULL) + { + relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true); + return; + } + else if (stmt->identity_type == REPLICA_IDENTITY_NOTHING) + { + relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true); + return; + } + else if (stmt->identity_type == REPLICA_IDENTITY_INDEX) + { + /* fallthrough */; + } + else + elog(ERROR, "unexpected identity type %u", stmt->identity_type); + + + /* Check that the index exists */ + indexOid = get_relname_relid(stmt->name, rel->rd_rel->relnamespace); + if (!OidIsValid(indexOid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("index \"%s\" for table \"%s\" does not exist", + stmt->name, RelationGetRelationName(rel)))); + + indexRel = index_open(indexOid, ShareLock); + + /* Check that the index is on the relation we're altering. */ + if (indexRel->rd_index == NULL || + indexRel->rd_index->indrelid != RelationGetRelid(rel)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not an index for table \"%s\"", + RelationGetRelationName(indexRel), + RelationGetRelationName(rel)))); + /* The AM must support uniqueness, and the index must in fact be unique. */ + if (!indexRel->rd_am->amcanunique || !indexRel->rd_index->indisunique) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot use non-unique index \"%s\" as replica identity", + RelationGetRelationName(indexRel)))); + /* Deferred indexes are not guaranteed to be always unique. */ + if (!indexRel->rd_index->indimmediate) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use non-immediate index \"%s\" as replica identity", + RelationGetRelationName(indexRel)))); + /* Expression indexes aren't supported. */ + if (RelationGetIndexExpressions(indexRel) != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use expression index \"%s\" as replica identity", + RelationGetRelationName(indexRel)))); + /* Predicate indexes aren't supported. */ + if (RelationGetIndexPredicate(indexRel) != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use partial index \"%s\" as replica identity", + RelationGetRelationName(indexRel)))); + /* And neither are invalid indexes. */ + if (!IndexIsValid(indexRel->rd_index)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use invalid index \"%s\" as replica identity", + RelationGetRelationName(indexRel)))); + + /* Check index for nullable columns. */ + for (key = 0; key < indexRel->rd_index->indnatts; key++) + { + int16 attno = indexRel->rd_index->indkey.values[key]; + Form_pg_attribute attr; + + /* Of the system columns, only oid is indexable. */ + if (attno <= 0 && attno != ObjectIdAttributeNumber) + elog(ERROR, "internal column %u in unique index \"%s\"", + attno, RelationGetRelationName(indexRel)); + + attr = rel->rd_att->attrs[attno - 1]; + if (!attr->attnotnull) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("index \"%s\" cannot be used as replica identity because column \"%s\" is nullable", + RelationGetRelationName(indexRel), + NameStr(attr->attname)))); + } + + /* This index is suitable for use as a replica identity. Mark it. */ + relation_mark_replica_identity(rel, stmt->identity_type, indexOid, true); + + index_close(indexRel, NoLock); +} + +/* * ALTER FOREIGN TABLE <name> OPTIONS (...) */ static void |