diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 313 |
1 files changed, 271 insertions, 42 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index ffb1308a0c0..ab89935ba73 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -23,6 +23,7 @@ #include "access/relscan.h" #include "access/sysattr.h" #include "access/tableam.h" +#include "access/toast_compression.h" #include "access/xact.h" #include "access/xlog.h" #include "catalog/catalog.h" @@ -527,6 +528,8 @@ static void ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKM static void ATExecGenericOptions(Relation rel, List *options); static void ATExecSetRowSecurity(Relation rel, bool rls); static void ATExecForceNoForceRowSecurity(Relation rel, bool force_rls); +static ObjectAddress ATExecSetCompression(AlteredTableInfo *tab, Relation rel, + const char *column, Node *newValue, LOCKMODE lockmode); static void index_copy_data(Relation rel, RelFileNode newrnode); static const char *storage_name(char c); @@ -558,6 +561,7 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, static List *GetParentedForeignKeyRefs(Relation partition); static void ATDetachCheckNoForeignKeyRefs(Relation partition); static void ATExecAlterCollationRefreshVersion(Relation rel, List *coll); +static char GetAttributeCompression(Form_pg_attribute att, char *compression); /* ---------------------------------------------------------------- @@ -852,6 +856,18 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, if (colDef->generated) attr->attgenerated = colDef->generated; + + /* + * lookup attribute's compression method and store it in the + * attr->attcompression. + */ + if (relkind == RELKIND_RELATION || + relkind == RELKIND_PARTITIONED_TABLE || + relkind == RELKIND_MATVIEW) + attr->attcompression = + GetAttributeCompression(attr, colDef->compression); + else + attr->attcompression = InvalidCompressionMethod; } /* @@ -2396,6 +2412,22 @@ MergeAttributes(List *schema, List *supers, char relpersistence, storage_name(def->storage), storage_name(attribute->attstorage)))); + /* Copy/check compression parameter */ + if (CompressionMethodIsValid(attribute->attcompression)) + { + const char *compression = + GetCompressionMethodName(attribute->attcompression); + + if (def->compression == NULL) + def->compression = pstrdup(compression); + else if (strcmp(def->compression, compression) != 0) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("column \"%s\" has a compression method conflict", + attributeName), + errdetail("%s versus %s", def->compression, compression))); + } + def->inhcount++; /* Merge of NOT NULL constraints = OR 'em together */ def->is_not_null |= attribute->attnotnull; @@ -2430,6 +2462,11 @@ MergeAttributes(List *schema, List *supers, char relpersistence, def->collOid = attribute->attcollation; def->constraints = NIL; def->location = -1; + if (CompressionMethodIsValid(attribute->attcompression)) + def->compression = pstrdup(GetCompressionMethodName( + attribute->attcompression)); + else + def->compression = NULL; inhSchema = lappend(inhSchema, def); newattmap->attnums[parent_attno - 1] = ++child_attno; } @@ -2675,6 +2712,19 @@ MergeAttributes(List *schema, List *supers, char relpersistence, storage_name(def->storage), storage_name(newdef->storage)))); + /* Copy compression parameter */ + if (def->compression == NULL) + def->compression = newdef->compression; + else if (newdef->compression != NULL) + { + if (strcmp(def->compression, newdef->compression) != 0) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("column \"%s\" has a compression method conflict", + attributeName), + errdetail("%s versus %s", def->compression, newdef->compression))); + } + /* Mark the column as locally defined */ def->is_local = true; /* Merge of NOT NULL constraints = OR 'em together */ @@ -3961,6 +4011,7 @@ AlterTableGetLockLevel(List *cmds) case AT_DropIdentity: case AT_SetIdentity: case AT_DropExpression: + case AT_SetCompression: cmd_lockmode = AccessExclusiveLock; break; @@ -4283,6 +4334,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, /* No command-specific prep needed */ pass = AT_PASS_MISC; break; + case AT_SetCompression: /* ALTER COLUMN SET COMPRESSION */ + ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW); + /* This command never recurses */ + /* No command-specific prep needed */ + pass = AT_PASS_MISC; + break; case AT_DropColumn: /* DROP COLUMN */ ATSimplePermissions(rel, ATT_TABLE | ATT_COMPOSITE_TYPE | ATT_FOREIGN_TABLE); @@ -4626,6 +4683,10 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, case AT_SetStorage: /* ALTER COLUMN SET STORAGE */ address = ATExecSetStorage(rel, cmd->name, cmd->def, lockmode); break; + case AT_SetCompression: + address = ATExecSetCompression(tab, rel, cmd->name, cmd->def, + lockmode); + break; case AT_DropColumn: /* DROP COLUMN */ address = ATExecDropColumn(wqueue, rel, cmd->name, cmd->behavior, false, false, @@ -6340,6 +6401,18 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel, attribute.attislocal = colDef->is_local; attribute.attinhcount = colDef->inhcount; attribute.attcollation = collOid; + + /* + * lookup attribute's compression method and store it in the + * attr->attcompression. + */ + if (rel->rd_rel->relkind == RELKIND_RELATION || + rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + attribute.attcompression = GetAttributeCompression(&attribute, + colDef->compression); + else + attribute.attcompression = InvalidCompressionMethod; + /* attribute.attacl is handled by InsertPgAttributeTuples() */ ReleaseSysCache(typeTuple); @@ -7713,6 +7786,68 @@ ATExecSetOptions(Relation rel, const char *colName, Node *options, } /* + * Helper function for ATExecSetStorage and ATExecSetCompression + * + * Set the attcompression and/or attstorage for the respective index attribute + * if the respective input values are valid. + */ +static void +SetIndexStorageProperties(Relation rel, Relation attrelation, + AttrNumber attnum, char newcompression, + char newstorage, LOCKMODE lockmode) +{ + HeapTuple tuple; + ListCell *lc; + Form_pg_attribute attrtuple; + + foreach(lc, RelationGetIndexList(rel)) + { + Oid indexoid = lfirst_oid(lc); + Relation indrel; + AttrNumber indattnum = 0; + + indrel = index_open(indexoid, lockmode); + + for (int i = 0; i < indrel->rd_index->indnatts; i++) + { + if (indrel->rd_index->indkey.values[i] == attnum) + { + indattnum = i + 1; + break; + } + } + + if (indattnum == 0) + { + index_close(indrel, lockmode); + continue; + } + + tuple = SearchSysCacheCopyAttNum(RelationGetRelid(indrel), indattnum); + + if (HeapTupleIsValid(tuple)) + { + attrtuple = (Form_pg_attribute) GETSTRUCT(tuple); + + if (CompressionMethodIsValid(newcompression)) + attrtuple->attcompression = newcompression; + + if (newstorage != '\0') + attrtuple->attstorage = newstorage; + + CatalogTupleUpdate(attrelation, &tuple->t_self, tuple); + + InvokeObjectPostAlterHook(RelationRelationId, + RelationGetRelid(rel), + attrtuple->attnum); + + heap_freetuple(tuple); + } + + index_close(indrel, lockmode); + } +} +/* * ALTER TABLE ALTER COLUMN SET STORAGE * * Return value is the address of the modified column @@ -7727,7 +7862,6 @@ ATExecSetStorage(Relation rel, const char *colName, Node *newValue, LOCKMODE loc Form_pg_attribute attrtuple; AttrNumber attnum; ObjectAddress address; - ListCell *lc; Assert(IsA(newValue, String)); storagemode = strVal(newValue); @@ -7791,47 +7925,9 @@ ATExecSetStorage(Relation rel, const char *colName, Node *newValue, LOCKMODE loc * Apply the change to indexes as well (only for simple index columns, * matching behavior of index.c ConstructTupleDescriptor()). */ - foreach(lc, RelationGetIndexList(rel)) - { - Oid indexoid = lfirst_oid(lc); - Relation indrel; - AttrNumber indattnum = 0; - - indrel = index_open(indexoid, lockmode); - - for (int i = 0; i < indrel->rd_index->indnatts; i++) - { - if (indrel->rd_index->indkey.values[i] == attnum) - { - indattnum = i + 1; - break; - } - } - - if (indattnum == 0) - { - index_close(indrel, lockmode); - continue; - } - - tuple = SearchSysCacheCopyAttNum(RelationGetRelid(indrel), indattnum); - - if (HeapTupleIsValid(tuple)) - { - attrtuple = (Form_pg_attribute) GETSTRUCT(tuple); - attrtuple->attstorage = newstorage; - - CatalogTupleUpdate(attrelation, &tuple->t_self, tuple); - - InvokeObjectPostAlterHook(RelationRelationId, - RelationGetRelid(rel), - attrtuple->attnum); - - heap_freetuple(tuple); - } - - index_close(indrel, lockmode); - } + SetIndexStorageProperties(rel, attrelation, attnum, + InvalidCompressionMethod, + newstorage, lockmode); table_close(attrelation, RowExclusiveLock); @@ -11859,6 +11955,23 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, ReleaseSysCache(typeTuple); + /* Setup attribute compression */ + if (rel->rd_rel->relkind == RELKIND_RELATION || + rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + /* + * No compression for plain/external storage, otherwise, default + * compression method if it is not already set, refer comments atop + * attcompression parameter in pg_attribute.h. + */ + if (!IsStorageCompressible(tform->typstorage)) + attTup->attcompression = InvalidCompressionMethod; + else if (!CompressionMethodIsValid(attTup->attcompression)) + attTup->attcompression = GetDefaultToastCompression(); + } + else + attTup->attcompression = InvalidCompressionMethod; + CatalogTupleUpdate(attrelation, &heapTup->t_self, heapTup); table_close(attrelation, RowExclusiveLock); @@ -14940,6 +15053,89 @@ ATExecGenericOptions(Relation rel, List *options) } /* + * ALTER TABLE ALTER COLUMN SET COMPRESSION + * + * Return value is the address of the modified column + */ +static ObjectAddress +ATExecSetCompression(AlteredTableInfo *tab, + Relation rel, + const char *column, + Node *newValue, + LOCKMODE lockmode) +{ + Relation attrel; + HeapTuple tuple; + Form_pg_attribute atttableform; + AttrNumber attnum; + char *compression; + char typstorage; + Oid cmoid; + Datum values[Natts_pg_attribute]; + bool nulls[Natts_pg_attribute]; + bool replace[Natts_pg_attribute]; + ObjectAddress address; + + Assert(IsA(newValue, String)); + compression = strVal(newValue); + + attrel = table_open(AttributeRelationId, RowExclusiveLock); + + tuple = SearchSysCacheAttName(RelationGetRelid(rel), column); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + column, RelationGetRelationName(rel)))); + + /* prevent them from altering a system attribute */ + atttableform = (Form_pg_attribute) GETSTRUCT(tuple); + attnum = atttableform->attnum; + if (attnum <= 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter system column \"%s\"", column))); + + typstorage = get_typstorage(atttableform->atttypid); + + /* prevent from setting compression methods for uncompressible type */ + if (!IsStorageCompressible(typstorage)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("column data type %s does not support compression", + format_type_be(atttableform->atttypid)))); + + /* initialize buffers for new tuple values */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replace, false, sizeof(replace)); + + /* get the attribute compression method. */ + cmoid = GetAttributeCompression(atttableform, compression); + + atttableform->attcompression = cmoid; + CatalogTupleUpdate(attrel, &tuple->t_self, tuple); + + InvokeObjectPostAlterHook(RelationRelationId, + RelationGetRelid(rel), + atttableform->attnum); + + ReleaseSysCache(tuple); + + /* apply changes to the index column as well */ + SetIndexStorageProperties(rel, attrel, attnum, cmoid, '\0', lockmode); + table_close(attrel, RowExclusiveLock); + + /* make changes visible */ + CommandCounterIncrement(); + + ObjectAddressSubSet(address, RelationRelationId, + RelationGetRelid(rel), atttableform->attnum); + return address; +} + + +/* * Preparation phase for SET LOGGED/UNLOGGED * * This verifies that we're not trying to change a temp table. Also, @@ -17641,3 +17837,36 @@ ATExecAlterCollationRefreshVersion(Relation rel, List *coll) index_update_collation_versions(rel->rd_id, get_collation_oid(coll, false)); CacheInvalidateRelcache(rel); } + +/* + * resolve column compression specification to compression method. + */ +static char +GetAttributeCompression(Form_pg_attribute att, char *compression) +{ + char typstorage = get_typstorage(att->atttypid); + char cmethod; + + /* + * No compression for plain/external storage, refer comments atop + * attcompression parameter in pg_attribute.h + */ + if (!IsStorageCompressible(typstorage)) + { + if (compression == NULL) + return InvalidCompressionMethod; + + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("column data type %s does not support compression", + format_type_be(att->atttypid)))); + } + + /* fallback to default compression if it's not specified */ + if (compression == NULL) + cmethod = GetDefaultToastCompression(); + else + cmethod = CompressionNameToMethod(compression); + + return cmethod; +} |