summaryrefslogtreecommitdiff
path: root/src/backend/utils/cache/relcache.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils/cache/relcache.c')
-rw-r--r--src/backend/utils/cache/relcache.c196
1 files changed, 109 insertions, 87 deletions
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index ff7395c85b5..29702d6eab1 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -282,7 +282,8 @@ static void RelationInitPhysicalAddr(Relation relation);
static void load_critical_index(Oid indexoid, Oid heapoid);
static TupleDesc GetPgClassDescriptor(void);
static TupleDesc GetPgIndexDescriptor(void);
-static void AttrDefaultFetch(Relation relation);
+static void AttrDefaultFetch(Relation relation, int ndef);
+static int AttrDefaultCmp(const void *a, const void *b);
static void CheckConstraintFetch(Relation relation);
static int CheckConstraintCmp(const void *a, const void *b);
static void InitIndexAmRoutine(Relation relation);
@@ -503,7 +504,6 @@ RelationBuildTupleDesc(Relation relation)
ScanKeyData skey[2];
int need;
TupleConstr *constr;
- AttrDefault *attrdef = NULL;
AttrMissing *attrmiss = NULL;
int ndef = 0;
@@ -512,8 +512,8 @@ RelationBuildTupleDesc(Relation relation)
relation->rd_rel->reltype ? relation->rd_rel->reltype : RECORDOID;
relation->rd_att->tdtypmod = -1; /* just to be sure */
- constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
- sizeof(TupleConstr));
+ constr = (TupleConstr *) MemoryContextAllocZero(CacheMemoryContext,
+ sizeof(TupleConstr));
constr->has_not_null = false;
constr->has_generated_stored = false;
@@ -557,10 +557,9 @@ RelationBuildTupleDesc(Relation relation)
attnum = attp->attnum;
if (attnum <= 0 || attnum > RelationGetNumberOfAttributes(relation))
- elog(ERROR, "invalid attribute number %d for %s",
+ elog(ERROR, "invalid attribute number %d for relation \"%s\"",
attp->attnum, RelationGetRelationName(relation));
-
memcpy(TupleDescAttr(relation->rd_att, attnum - 1),
attp,
ATTRIBUTE_FIXED_PART_SIZE);
@@ -570,22 +569,10 @@ RelationBuildTupleDesc(Relation relation)
constr->has_not_null = true;
if (attp->attgenerated == ATTRIBUTE_GENERATED_STORED)
constr->has_generated_stored = true;
-
- /* If the column has a default, fill it into the attrdef array */
if (attp->atthasdef)
- {
- if (attrdef == NULL)
- attrdef = (AttrDefault *)
- MemoryContextAllocZero(CacheMemoryContext,
- RelationGetNumberOfAttributes(relation) *
- sizeof(AttrDefault));
- attrdef[ndef].adnum = attnum;
- attrdef[ndef].adbin = NULL;
-
ndef++;
- }
- /* Likewise for a missing value */
+ /* If the column has a "missing" value, put it in the attrmiss array */
if (attp->atthasmissing)
{
Datum missingval;
@@ -648,7 +635,7 @@ RelationBuildTupleDesc(Relation relation)
table_close(pg_attribute_desc, AccessShareLock);
if (need != 0)
- elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
+ elog(ERROR, "pg_attribute catalog is missing %d attribute(s) for relation OID %u",
need, RelationGetRelid(relation));
/*
@@ -680,33 +667,19 @@ RelationBuildTupleDesc(Relation relation)
constr->has_generated_stored ||
ndef > 0 ||
attrmiss ||
- relation->rd_rel->relchecks)
+ relation->rd_rel->relchecks > 0)
{
relation->rd_att->constr = constr;
if (ndef > 0) /* DEFAULTs */
- {
- if (ndef < RelationGetNumberOfAttributes(relation))
- constr->defval = (AttrDefault *)
- repalloc(attrdef, ndef * sizeof(AttrDefault));
- else
- constr->defval = attrdef;
- constr->num_defval = ndef;
- AttrDefaultFetch(relation);
- }
+ AttrDefaultFetch(relation, ndef);
else
constr->num_defval = 0;
constr->missing = attrmiss;
if (relation->rd_rel->relchecks > 0) /* CHECKs */
- {
- constr->num_check = relation->rd_rel->relchecks;
- constr->check = (ConstrCheck *)
- MemoryContextAllocZero(CacheMemoryContext,
- constr->num_check * sizeof(ConstrCheck));
CheckConstraintFetch(relation);
- }
else
constr->num_check = 0;
}
@@ -4251,21 +4224,29 @@ GetPgIndexDescriptor(void)
/*
* Load any default attribute value definitions for the relation.
+ *
+ * ndef is the number of attributes that were marked atthasdef.
+ *
+ * Note: we don't make it a hard error to be missing some pg_attrdef records.
+ * We can limp along as long as nothing needs to use the default value. Code
+ * that fails to find an expected AttrDefault record should throw an error.
*/
static void
-AttrDefaultFetch(Relation relation)
+AttrDefaultFetch(Relation relation, int ndef)
{
- AttrDefault *attrdef = relation->rd_att->constr->defval;
- int ndef = relation->rd_att->constr->num_defval;
+ AttrDefault *attrdef;
Relation adrel;
SysScanDesc adscan;
ScanKeyData skey;
HeapTuple htup;
- Datum val;
- bool isnull;
- int found;
- int i;
+ int found = 0;
+
+ /* Allocate array with room for as many entries as expected */
+ attrdef = (AttrDefault *)
+ MemoryContextAllocZero(CacheMemoryContext,
+ ndef * sizeof(AttrDefault));
+ /* Search pg_attrdef for relevant entries */
ScanKeyInit(&skey,
Anum_pg_attrdef_adrelid,
BTEqualStrategyNumber, F_OIDEQ,
@@ -4274,65 +4255,94 @@ AttrDefaultFetch(Relation relation)
adrel = table_open(AttrDefaultRelationId, AccessShareLock);
adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
NULL, 1, &skey);
- found = 0;
while (HeapTupleIsValid(htup = systable_getnext(adscan)))
{
Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
- Form_pg_attribute attr = TupleDescAttr(relation->rd_att, adform->adnum - 1);
+ Datum val;
+ bool isnull;
- for (i = 0; i < ndef; i++)
+ /* protect limited size of array */
+ if (found >= ndef)
{
- if (adform->adnum != attrdef[i].adnum)
- continue;
- if (attrdef[i].adbin != NULL)
- elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
- NameStr(attr->attname),
- RelationGetRelationName(relation));
- else
- found++;
-
- val = fastgetattr(htup,
- Anum_pg_attrdef_adbin,
- adrel->rd_att, &isnull);
- if (isnull)
- elog(WARNING, "null adbin for attr %s of rel %s",
- NameStr(attr->attname),
- RelationGetRelationName(relation));
- else
- {
- /* detoast and convert to cstring in caller's context */
- char *s = TextDatumGetCString(val);
-
- attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext, s);
- pfree(s);
- }
+ elog(WARNING, "unexpected pg_attrdef record found for attribute %d of relation \"%s\"",
+ adform->adnum, RelationGetRelationName(relation));
break;
}
- if (i >= ndef)
- elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
+ val = fastgetattr(htup,
+ Anum_pg_attrdef_adbin,
+ adrel->rd_att, &isnull);
+ if (isnull)
+ elog(WARNING, "null adbin for attribute %d of relation \"%s\"",
adform->adnum, RelationGetRelationName(relation));
+ else
+ {
+ /* detoast and convert to cstring in caller's context */
+ char *s = TextDatumGetCString(val);
+
+ attrdef[found].adnum = adform->adnum;
+ attrdef[found].adbin = MemoryContextStrdup(CacheMemoryContext, s);
+ pfree(s);
+ found++;
+ }
}
systable_endscan(adscan);
table_close(adrel, AccessShareLock);
+
+ if (found != ndef)
+ elog(WARNING, "%d pg_attrdef record(s) missing for relation \"%s\"",
+ ndef - found, RelationGetRelationName(relation));
+
+ /*
+ * Sort the AttrDefault entries by adnum, for the convenience of
+ * equalTupleDescs(). (Usually, they already will be in order, but this
+ * might not be so if systable_getnext isn't using an index.)
+ */
+ if (found > 1)
+ qsort(attrdef, found, sizeof(AttrDefault), AttrDefaultCmp);
+
+ /* Install array only after it's fully valid */
+ relation->rd_att->constr->defval = attrdef;
+ relation->rd_att->constr->num_defval = found;
+}
+
+/*
+ * qsort comparator to sort AttrDefault entries by adnum
+ */
+static int
+AttrDefaultCmp(const void *a, const void *b)
+{
+ const AttrDefault *ada = (const AttrDefault *) a;
+ const AttrDefault *adb = (const AttrDefault *) b;
+
+ return ada->adnum - adb->adnum;
}
/*
* Load any check constraints for the relation.
+ *
+ * As with defaults, if we don't find the expected number of them, just warn
+ * here. The executor should throw an error if an INSERT/UPDATE is attempted.
*/
static void
CheckConstraintFetch(Relation relation)
{
- ConstrCheck *check = relation->rd_att->constr->check;
- int ncheck = relation->rd_att->constr->num_check;
+ ConstrCheck *check;
+ int ncheck = relation->rd_rel->relchecks;
Relation conrel;
SysScanDesc conscan;
ScanKeyData skey[1];
HeapTuple htup;
int found = 0;
+ /* Allocate array with room for as many entries as expected */
+ check = (ConstrCheck *)
+ MemoryContextAllocZero(CacheMemoryContext,
+ ncheck * sizeof(ConstrCheck));
+
+ /* Search pg_constraint for relevant entries */
ScanKeyInit(&skey[0],
Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, F_OIDEQ,
@@ -4347,15 +4357,18 @@ CheckConstraintFetch(Relation relation)
Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
Datum val;
bool isnull;
- char *s;
/* We want check constraints only */
if (conform->contype != CONSTRAINT_CHECK)
continue;
+ /* protect limited size of array */
if (found >= ncheck)
- elog(ERROR, "unexpected constraint record found for rel %s",
+ {
+ elog(WARNING, "unexpected pg_constraint record found for relation \"%s\"",
RelationGetRelationName(relation));
+ break;
+ }
check[found].ccvalid = conform->convalidated;
check[found].ccnoinherit = conform->connoinherit;
@@ -4367,27 +4380,36 @@ CheckConstraintFetch(Relation relation)
Anum_pg_constraint_conbin,
conrel->rd_att, &isnull);
if (isnull)
- elog(ERROR, "null conbin for rel %s",
+ elog(WARNING, "null conbin for relation \"%s\"",
RelationGetRelationName(relation));
+ else
+ {
+ /* detoast and convert to cstring in caller's context */
+ char *s = TextDatumGetCString(val);
- /* detoast and convert to cstring in caller's context */
- s = TextDatumGetCString(val);
- check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s);
- pfree(s);
-
- found++;
+ check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s);
+ pfree(s);
+ found++;
+ }
}
systable_endscan(conscan);
table_close(conrel, AccessShareLock);
if (found != ncheck)
- elog(ERROR, "%d constraint record(s) missing for rel %s",
+ elog(WARNING, "%d pg_constraint record(s) missing for relation \"%s\"",
ncheck - found, RelationGetRelationName(relation));
- /* Sort the records so that CHECKs are applied in a deterministic order */
- if (ncheck > 1)
- qsort(check, ncheck, sizeof(ConstrCheck), CheckConstraintCmp);
+ /*
+ * Sort the records by name. This ensures that CHECKs are applied in a
+ * deterministic order, and it also makes equalTupleDescs() faster.
+ */
+ if (found > 1)
+ qsort(check, found, sizeof(ConstrCheck), CheckConstraintCmp);
+
+ /* Install array only after it's fully valid */
+ relation->rd_att->constr->check = check;
+ relation->rd_att->constr->num_check = found;
}
/*