summaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2013-11-08 12:30:43 -0500
committerRobert Haas <rhaas@postgresql.org>2013-11-08 12:30:43 -0500
commit07cacba983ef79be4a84fcd0e0ca3b5fcb85dd65 (patch)
tree7fa0f7c8d7b765b3e901512faef90759904d047c /src/backend/commands/tablecmds.c
parentb97ee66cc1f9319f7b457e7d8a78aab711da2dda (diff)
Add the notion of REPLICA IDENTITY for a table.
Pending patches for logical replication will use this to determine which columns of a tuple ought to be considered as its candidate key. Andres Freund, with minor, mostly cosmetic adjustments by me
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c222
1 files changed, 222 insertions, 0 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 670af18f4c3..c8fa39d025d 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -399,6 +399,7 @@ static void ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
static void drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid);
static void ATExecAddOf(Relation rel, const TypeName *ofTypename, LOCKMODE lockmode);
static void ATExecDropOf(Relation rel, LOCKMODE lockmode);
+static void ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode);
static void ATExecGenericOptions(Relation rel, List *options);
static void copy_relation_data(SMgrRelation rel, SMgrRelation dst,
@@ -2809,6 +2810,7 @@ AlterTableGetLockLevel(List *cmds)
case AT_DisableTrigUser:
case AT_AddIndex: /* from ADD CONSTRAINT */
case AT_AddIndexConstraint:
+ case AT_ReplicaIdentity:
cmd_lockmode = ShareRowExclusiveLock;
break;
@@ -3140,6 +3142,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
cmd->subtype = AT_ValidateConstraintRecurse;
pass = AT_PASS_MISC;
break;
+ case AT_ReplicaIdentity: /* REPLICA IDENTITY ... */
+ ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW);
+ pass = AT_PASS_MISC;
+ /* This command never recurses */
+ /* No command-specific prep needed */
+ break;
case AT_EnableTrig: /* ENABLE TRIGGER variants */
case AT_EnableAlwaysTrig:
case AT_EnableReplicaTrig:
@@ -3440,6 +3448,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_DropOf:
ATExecDropOf(rel, lockmode);
break;
+ case AT_ReplicaIdentity:
+ ATExecReplicaIdentity(rel, (ReplicaIdentityStmt *) cmd->def, lockmode);
+ break;
case AT_GenericOptions:
ATExecGenericOptions(rel, (List *) cmd->def);
break;
@@ -10010,6 +10021,217 @@ ATExecDropOf(Relation rel, LOCKMODE lockmode)
}
/*
+ * relation_mark_replica_identity: Update a table's replica identity
+ *
+ * Iff ri_type = REPLICA_IDENTITY_INDEX, indexOid must be the Oid of a suitable
+ * index. Otherwise, it should be InvalidOid.
+ */
+static void
+relation_mark_replica_identity(Relation rel, char ri_type, Oid indexOid,
+ bool is_internal)
+{
+ Relation pg_index;
+ Relation pg_class;
+ HeapTuple pg_class_tuple;
+ HeapTuple pg_index_tuple;
+ Form_pg_class pg_class_form;
+ Form_pg_index pg_index_form;
+
+ ListCell *index;
+
+ /*
+ * Check whether relreplident has changed, and update it if so.
+ */
+ pg_class = heap_open(RelationRelationId, RowExclusiveLock);
+ pg_class_tuple = SearchSysCacheCopy1(RELOID,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ if (!HeapTupleIsValid(pg_class_tuple))
+ elog(ERROR, "cache lookup failed for relation \"%s\"",
+ RelationGetRelationName(rel));
+ pg_class_form = (Form_pg_class) GETSTRUCT(pg_class_tuple);
+ if (pg_class_form->relreplident != ri_type)
+ {
+ pg_class_form->relreplident = ri_type;
+ simple_heap_update(pg_class, &pg_class_tuple->t_self, pg_class_tuple);
+ CatalogUpdateIndexes(pg_class, pg_class_tuple);
+ }
+ heap_close(pg_class, RowExclusiveLock);
+ heap_freetuple(pg_class_tuple);
+
+ /*
+ * Check whether the correct index is marked indisreplident; if so, we're
+ * done.
+ */
+ if (OidIsValid(indexOid))
+ {
+ Assert(ri_type == REPLICA_IDENTITY_INDEX);
+
+ pg_index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexOid));
+ if (!HeapTupleIsValid(pg_index_tuple))
+ elog(ERROR, "cache lookup failed for index %u", indexOid);
+ pg_index_form = (Form_pg_index) GETSTRUCT(pg_index_tuple);
+
+ if (pg_index_form->indisreplident)
+ {
+ ReleaseSysCache(pg_index_tuple);
+ return;
+ }
+ ReleaseSysCache(pg_index_tuple);
+ }
+
+ /*
+ * Clear the indisreplident flag from any index that had it previously, and
+ * set it for any index that should have it now.
+ */
+ pg_index = heap_open(IndexRelationId, RowExclusiveLock);
+ foreach(index, RelationGetIndexList(rel))
+ {
+ Oid thisIndexOid = lfirst_oid(index);
+ bool dirty = false;
+
+ pg_index_tuple = SearchSysCacheCopy1(INDEXRELID,
+ ObjectIdGetDatum(thisIndexOid));
+ if (!HeapTupleIsValid(pg_index_tuple))
+ elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
+ pg_index_form = (Form_pg_index) GETSTRUCT(pg_index_tuple);
+
+ /*
+ * Unset the bit if set. We know it's wrong because we checked this
+ * earlier.
+ */
+ if (pg_index_form->indisreplident)
+ {
+ dirty = true;
+ pg_index_form->indisreplident = false;
+ }
+ else if (thisIndexOid == indexOid)
+ {
+ dirty = true;
+ pg_index_form->indisreplident = true;
+ }
+
+ if (dirty)
+ {
+ simple_heap_update(pg_index, &pg_index_tuple->t_self, pg_index_tuple);
+ CatalogUpdateIndexes(pg_index, pg_index_tuple);
+ InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0,
+ InvalidOid, is_internal);
+ }
+ heap_freetuple(pg_index_tuple);
+ }
+
+ heap_close(pg_index, RowExclusiveLock);
+}
+
+/*
+ * ALTER TABLE <name> REPLICA IDENTITY ...
+ */
+static void
+ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode)
+{
+ Oid indexOid;
+ Relation indexRel;
+ int key;
+
+ if (stmt->identity_type == REPLICA_IDENTITY_DEFAULT)
+ {
+ relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
+ return;
+ }
+ else if (stmt->identity_type == REPLICA_IDENTITY_FULL)
+ {
+ relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
+ return;
+ }
+ else if (stmt->identity_type == REPLICA_IDENTITY_NOTHING)
+ {
+ relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
+ return;
+ }
+ else if (stmt->identity_type == REPLICA_IDENTITY_INDEX)
+ {
+ /* fallthrough */;
+ }
+ else
+ elog(ERROR, "unexpected identity type %u", stmt->identity_type);
+
+
+ /* Check that the index exists */
+ indexOid = get_relname_relid(stmt->name, rel->rd_rel->relnamespace);
+ if (!OidIsValid(indexOid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("index \"%s\" for table \"%s\" does not exist",
+ stmt->name, RelationGetRelationName(rel))));
+
+ indexRel = index_open(indexOid, ShareLock);
+
+ /* Check that the index is on the relation we're altering. */
+ if (indexRel->rd_index == NULL ||
+ indexRel->rd_index->indrelid != RelationGetRelid(rel))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not an index for table \"%s\"",
+ RelationGetRelationName(indexRel),
+ RelationGetRelationName(rel))));
+ /* The AM must support uniqueness, and the index must in fact be unique. */
+ if (!indexRel->rd_am->amcanunique || !indexRel->rd_index->indisunique)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot use non-unique index \"%s\" as replica identity",
+ RelationGetRelationName(indexRel))));
+ /* Deferred indexes are not guaranteed to be always unique. */
+ if (!indexRel->rd_index->indimmediate)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot use non-immediate index \"%s\" as replica identity",
+ RelationGetRelationName(indexRel))));
+ /* Expression indexes aren't supported. */
+ if (RelationGetIndexExpressions(indexRel) != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot use expression index \"%s\" as replica identity",
+ RelationGetRelationName(indexRel))));
+ /* Predicate indexes aren't supported. */
+ if (RelationGetIndexPredicate(indexRel) != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot use partial index \"%s\" as replica identity",
+ RelationGetRelationName(indexRel))));
+ /* And neither are invalid indexes. */
+ if (!IndexIsValid(indexRel->rd_index))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot use invalid index \"%s\" as replica identity",
+ RelationGetRelationName(indexRel))));
+
+ /* Check index for nullable columns. */
+ for (key = 0; key < indexRel->rd_index->indnatts; key++)
+ {
+ int16 attno = indexRel->rd_index->indkey.values[key];
+ Form_pg_attribute attr;
+
+ /* Of the system columns, only oid is indexable. */
+ if (attno <= 0 && attno != ObjectIdAttributeNumber)
+ elog(ERROR, "internal column %u in unique index \"%s\"",
+ attno, RelationGetRelationName(indexRel));
+
+ attr = rel->rd_att->attrs[attno - 1];
+ if (!attr->attnotnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("index \"%s\" cannot be used as replica identity because column \"%s\" is nullable",
+ RelationGetRelationName(indexRel),
+ NameStr(attr->attname))));
+ }
+
+ /* This index is suitable for use as a replica identity. Mark it. */
+ relation_mark_replica_identity(rel, stmt->identity_type, indexOid, true);
+
+ index_close(indexRel, NoLock);
+}
+
+/*
* ALTER FOREIGN TABLE <name> OPTIONS (...)
*/
static void