summaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c313
1 files changed, 271 insertions, 42 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index ffb1308a0c0..ab89935ba73 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -23,6 +23,7 @@
#include "access/relscan.h"
#include "access/sysattr.h"
#include "access/tableam.h"
+#include "access/toast_compression.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/catalog.h"
@@ -527,6 +528,8 @@ static void ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKM
static void ATExecGenericOptions(Relation rel, List *options);
static void ATExecSetRowSecurity(Relation rel, bool rls);
static void ATExecForceNoForceRowSecurity(Relation rel, bool force_rls);
+static ObjectAddress ATExecSetCompression(AlteredTableInfo *tab, Relation rel,
+ const char *column, Node *newValue, LOCKMODE lockmode);
static void index_copy_data(Relation rel, RelFileNode newrnode);
static const char *storage_name(char c);
@@ -558,6 +561,7 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
static List *GetParentedForeignKeyRefs(Relation partition);
static void ATDetachCheckNoForeignKeyRefs(Relation partition);
static void ATExecAlterCollationRefreshVersion(Relation rel, List *coll);
+static char GetAttributeCompression(Form_pg_attribute att, char *compression);
/* ----------------------------------------------------------------
@@ -852,6 +856,18 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
if (colDef->generated)
attr->attgenerated = colDef->generated;
+
+ /*
+ * lookup attribute's compression method and store it in the
+ * attr->attcompression.
+ */
+ if (relkind == RELKIND_RELATION ||
+ relkind == RELKIND_PARTITIONED_TABLE ||
+ relkind == RELKIND_MATVIEW)
+ attr->attcompression =
+ GetAttributeCompression(attr, colDef->compression);
+ else
+ attr->attcompression = InvalidCompressionMethod;
}
/*
@@ -2396,6 +2412,22 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
storage_name(def->storage),
storage_name(attribute->attstorage))));
+ /* Copy/check compression parameter */
+ if (CompressionMethodIsValid(attribute->attcompression))
+ {
+ const char *compression =
+ GetCompressionMethodName(attribute->attcompression);
+
+ if (def->compression == NULL)
+ def->compression = pstrdup(compression);
+ else if (strcmp(def->compression, compression) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("column \"%s\" has a compression method conflict",
+ attributeName),
+ errdetail("%s versus %s", def->compression, compression)));
+ }
+
def->inhcount++;
/* Merge of NOT NULL constraints = OR 'em together */
def->is_not_null |= attribute->attnotnull;
@@ -2430,6 +2462,11 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
def->collOid = attribute->attcollation;
def->constraints = NIL;
def->location = -1;
+ if (CompressionMethodIsValid(attribute->attcompression))
+ def->compression = pstrdup(GetCompressionMethodName(
+ attribute->attcompression));
+ else
+ def->compression = NULL;
inhSchema = lappend(inhSchema, def);
newattmap->attnums[parent_attno - 1] = ++child_attno;
}
@@ -2675,6 +2712,19 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
storage_name(def->storage),
storage_name(newdef->storage))));
+ /* Copy compression parameter */
+ if (def->compression == NULL)
+ def->compression = newdef->compression;
+ else if (newdef->compression != NULL)
+ {
+ if (strcmp(def->compression, newdef->compression) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("column \"%s\" has a compression method conflict",
+ attributeName),
+ errdetail("%s versus %s", def->compression, newdef->compression)));
+ }
+
/* Mark the column as locally defined */
def->is_local = true;
/* Merge of NOT NULL constraints = OR 'em together */
@@ -3961,6 +4011,7 @@ AlterTableGetLockLevel(List *cmds)
case AT_DropIdentity:
case AT_SetIdentity:
case AT_DropExpression:
+ case AT_SetCompression:
cmd_lockmode = AccessExclusiveLock;
break;
@@ -4283,6 +4334,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
+ case AT_SetCompression: /* ALTER COLUMN SET COMPRESSION */
+ ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW);
+ /* This command never recurses */
+ /* No command-specific prep needed */
+ pass = AT_PASS_MISC;
+ break;
case AT_DropColumn: /* DROP COLUMN */
ATSimplePermissions(rel,
ATT_TABLE | ATT_COMPOSITE_TYPE | ATT_FOREIGN_TABLE);
@@ -4626,6 +4683,10 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_SetStorage: /* ALTER COLUMN SET STORAGE */
address = ATExecSetStorage(rel, cmd->name, cmd->def, lockmode);
break;
+ case AT_SetCompression:
+ address = ATExecSetCompression(tab, rel, cmd->name, cmd->def,
+ lockmode);
+ break;
case AT_DropColumn: /* DROP COLUMN */
address = ATExecDropColumn(wqueue, rel, cmd->name,
cmd->behavior, false, false,
@@ -6340,6 +6401,18 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
attribute.attislocal = colDef->is_local;
attribute.attinhcount = colDef->inhcount;
attribute.attcollation = collOid;
+
+ /*
+ * lookup attribute's compression method and store it in the
+ * attr->attcompression.
+ */
+ if (rel->rd_rel->relkind == RELKIND_RELATION ||
+ rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ attribute.attcompression = GetAttributeCompression(&attribute,
+ colDef->compression);
+ else
+ attribute.attcompression = InvalidCompressionMethod;
+
/* attribute.attacl is handled by InsertPgAttributeTuples() */
ReleaseSysCache(typeTuple);
@@ -7713,6 +7786,68 @@ ATExecSetOptions(Relation rel, const char *colName, Node *options,
}
/*
+ * Helper function for ATExecSetStorage and ATExecSetCompression
+ *
+ * Set the attcompression and/or attstorage for the respective index attribute
+ * if the respective input values are valid.
+ */
+static void
+SetIndexStorageProperties(Relation rel, Relation attrelation,
+ AttrNumber attnum, char newcompression,
+ char newstorage, LOCKMODE lockmode)
+{
+ HeapTuple tuple;
+ ListCell *lc;
+ Form_pg_attribute attrtuple;
+
+ foreach(lc, RelationGetIndexList(rel))
+ {
+ Oid indexoid = lfirst_oid(lc);
+ Relation indrel;
+ AttrNumber indattnum = 0;
+
+ indrel = index_open(indexoid, lockmode);
+
+ for (int i = 0; i < indrel->rd_index->indnatts; i++)
+ {
+ if (indrel->rd_index->indkey.values[i] == attnum)
+ {
+ indattnum = i + 1;
+ break;
+ }
+ }
+
+ if (indattnum == 0)
+ {
+ index_close(indrel, lockmode);
+ continue;
+ }
+
+ tuple = SearchSysCacheCopyAttNum(RelationGetRelid(indrel), indattnum);
+
+ if (HeapTupleIsValid(tuple))
+ {
+ attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
+
+ if (CompressionMethodIsValid(newcompression))
+ attrtuple->attcompression = newcompression;
+
+ if (newstorage != '\0')
+ attrtuple->attstorage = newstorage;
+
+ CatalogTupleUpdate(attrelation, &tuple->t_self, tuple);
+
+ InvokeObjectPostAlterHook(RelationRelationId,
+ RelationGetRelid(rel),
+ attrtuple->attnum);
+
+ heap_freetuple(tuple);
+ }
+
+ index_close(indrel, lockmode);
+ }
+}
+/*
* ALTER TABLE ALTER COLUMN SET STORAGE
*
* Return value is the address of the modified column
@@ -7727,7 +7862,6 @@ ATExecSetStorage(Relation rel, const char *colName, Node *newValue, LOCKMODE loc
Form_pg_attribute attrtuple;
AttrNumber attnum;
ObjectAddress address;
- ListCell *lc;
Assert(IsA(newValue, String));
storagemode = strVal(newValue);
@@ -7791,47 +7925,9 @@ ATExecSetStorage(Relation rel, const char *colName, Node *newValue, LOCKMODE loc
* Apply the change to indexes as well (only for simple index columns,
* matching behavior of index.c ConstructTupleDescriptor()).
*/
- foreach(lc, RelationGetIndexList(rel))
- {
- Oid indexoid = lfirst_oid(lc);
- Relation indrel;
- AttrNumber indattnum = 0;
-
- indrel = index_open(indexoid, lockmode);
-
- for (int i = 0; i < indrel->rd_index->indnatts; i++)
- {
- if (indrel->rd_index->indkey.values[i] == attnum)
- {
- indattnum = i + 1;
- break;
- }
- }
-
- if (indattnum == 0)
- {
- index_close(indrel, lockmode);
- continue;
- }
-
- tuple = SearchSysCacheCopyAttNum(RelationGetRelid(indrel), indattnum);
-
- if (HeapTupleIsValid(tuple))
- {
- attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
- attrtuple->attstorage = newstorage;
-
- CatalogTupleUpdate(attrelation, &tuple->t_self, tuple);
-
- InvokeObjectPostAlterHook(RelationRelationId,
- RelationGetRelid(rel),
- attrtuple->attnum);
-
- heap_freetuple(tuple);
- }
-
- index_close(indrel, lockmode);
- }
+ SetIndexStorageProperties(rel, attrelation, attnum,
+ InvalidCompressionMethod,
+ newstorage, lockmode);
table_close(attrelation, RowExclusiveLock);
@@ -11859,6 +11955,23 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
ReleaseSysCache(typeTuple);
+ /* Setup attribute compression */
+ if (rel->rd_rel->relkind == RELKIND_RELATION ||
+ rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ /*
+ * No compression for plain/external storage, otherwise, default
+ * compression method if it is not already set, refer comments atop
+ * attcompression parameter in pg_attribute.h.
+ */
+ if (!IsStorageCompressible(tform->typstorage))
+ attTup->attcompression = InvalidCompressionMethod;
+ else if (!CompressionMethodIsValid(attTup->attcompression))
+ attTup->attcompression = GetDefaultToastCompression();
+ }
+ else
+ attTup->attcompression = InvalidCompressionMethod;
+
CatalogTupleUpdate(attrelation, &heapTup->t_self, heapTup);
table_close(attrelation, RowExclusiveLock);
@@ -14940,6 +15053,89 @@ ATExecGenericOptions(Relation rel, List *options)
}
/*
+ * ALTER TABLE ALTER COLUMN SET COMPRESSION
+ *
+ * Return value is the address of the modified column
+ */
+static ObjectAddress
+ATExecSetCompression(AlteredTableInfo *tab,
+ Relation rel,
+ const char *column,
+ Node *newValue,
+ LOCKMODE lockmode)
+{
+ Relation attrel;
+ HeapTuple tuple;
+ Form_pg_attribute atttableform;
+ AttrNumber attnum;
+ char *compression;
+ char typstorage;
+ Oid cmoid;
+ Datum values[Natts_pg_attribute];
+ bool nulls[Natts_pg_attribute];
+ bool replace[Natts_pg_attribute];
+ ObjectAddress address;
+
+ Assert(IsA(newValue, String));
+ compression = strVal(newValue);
+
+ attrel = table_open(AttributeRelationId, RowExclusiveLock);
+
+ tuple = SearchSysCacheAttName(RelationGetRelid(rel), column);
+ if (!HeapTupleIsValid(tuple))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" of relation \"%s\" does not exist",
+ column, RelationGetRelationName(rel))));
+
+ /* prevent them from altering a system attribute */
+ atttableform = (Form_pg_attribute) GETSTRUCT(tuple);
+ attnum = atttableform->attnum;
+ if (attnum <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot alter system column \"%s\"", column)));
+
+ typstorage = get_typstorage(atttableform->atttypid);
+
+ /* prevent from setting compression methods for uncompressible type */
+ if (!IsStorageCompressible(typstorage))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("column data type %s does not support compression",
+ format_type_be(atttableform->atttypid))));
+
+ /* initialize buffers for new tuple values */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replace, false, sizeof(replace));
+
+ /* get the attribute compression method. */
+ cmoid = GetAttributeCompression(atttableform, compression);
+
+ atttableform->attcompression = cmoid;
+ CatalogTupleUpdate(attrel, &tuple->t_self, tuple);
+
+ InvokeObjectPostAlterHook(RelationRelationId,
+ RelationGetRelid(rel),
+ atttableform->attnum);
+
+ ReleaseSysCache(tuple);
+
+ /* apply changes to the index column as well */
+ SetIndexStorageProperties(rel, attrel, attnum, cmoid, '\0', lockmode);
+ table_close(attrel, RowExclusiveLock);
+
+ /* make changes visible */
+ CommandCounterIncrement();
+
+ ObjectAddressSubSet(address, RelationRelationId,
+ RelationGetRelid(rel), atttableform->attnum);
+ return address;
+}
+
+
+/*
* Preparation phase for SET LOGGED/UNLOGGED
*
* This verifies that we're not trying to change a temp table. Also,
@@ -17641,3 +17837,36 @@ ATExecAlterCollationRefreshVersion(Relation rel, List *coll)
index_update_collation_versions(rel->rd_id, get_collation_oid(coll, false));
CacheInvalidateRelcache(rel);
}
+
+/*
+ * resolve column compression specification to compression method.
+ */
+static char
+GetAttributeCompression(Form_pg_attribute att, char *compression)
+{
+ char typstorage = get_typstorage(att->atttypid);
+ char cmethod;
+
+ /*
+ * No compression for plain/external storage, refer comments atop
+ * attcompression parameter in pg_attribute.h
+ */
+ if (!IsStorageCompressible(typstorage))
+ {
+ if (compression == NULL)
+ return InvalidCompressionMethod;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("column data type %s does not support compression",
+ format_type_be(att->atttypid))));
+ }
+
+ /* fallback to default compression if it's not specified */
+ if (compression == NULL)
+ cmethod = GetDefaultToastCompression();
+ else
+ cmethod = CompressionNameToMethod(compression);
+
+ return cmethod;
+}