diff options
Diffstat (limited to 'src/backend/utils/cache/relcache.c')
-rw-r--r-- | src/backend/utils/cache/relcache.c | 196 |
1 files changed, 109 insertions, 87 deletions
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index ff7395c85b5..29702d6eab1 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -282,7 +282,8 @@ static void RelationInitPhysicalAddr(Relation relation); static void load_critical_index(Oid indexoid, Oid heapoid); static TupleDesc GetPgClassDescriptor(void); static TupleDesc GetPgIndexDescriptor(void); -static void AttrDefaultFetch(Relation relation); +static void AttrDefaultFetch(Relation relation, int ndef); +static int AttrDefaultCmp(const void *a, const void *b); static void CheckConstraintFetch(Relation relation); static int CheckConstraintCmp(const void *a, const void *b); static void InitIndexAmRoutine(Relation relation); @@ -503,7 +504,6 @@ RelationBuildTupleDesc(Relation relation) ScanKeyData skey[2]; int need; TupleConstr *constr; - AttrDefault *attrdef = NULL; AttrMissing *attrmiss = NULL; int ndef = 0; @@ -512,8 +512,8 @@ RelationBuildTupleDesc(Relation relation) relation->rd_rel->reltype ? relation->rd_rel->reltype : RECORDOID; relation->rd_att->tdtypmod = -1; /* just to be sure */ - constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext, - sizeof(TupleConstr)); + constr = (TupleConstr *) MemoryContextAllocZero(CacheMemoryContext, + sizeof(TupleConstr)); constr->has_not_null = false; constr->has_generated_stored = false; @@ -557,10 +557,9 @@ RelationBuildTupleDesc(Relation relation) attnum = attp->attnum; if (attnum <= 0 || attnum > RelationGetNumberOfAttributes(relation)) - elog(ERROR, "invalid attribute number %d for %s", + elog(ERROR, "invalid attribute number %d for relation \"%s\"", attp->attnum, RelationGetRelationName(relation)); - memcpy(TupleDescAttr(relation->rd_att, attnum - 1), attp, ATTRIBUTE_FIXED_PART_SIZE); @@ -570,22 +569,10 @@ RelationBuildTupleDesc(Relation relation) constr->has_not_null = true; if (attp->attgenerated == ATTRIBUTE_GENERATED_STORED) constr->has_generated_stored = true; - - /* If the column has a default, fill it into the attrdef array */ if (attp->atthasdef) - { - if (attrdef == NULL) - attrdef = (AttrDefault *) - MemoryContextAllocZero(CacheMemoryContext, - RelationGetNumberOfAttributes(relation) * - sizeof(AttrDefault)); - attrdef[ndef].adnum = attnum; - attrdef[ndef].adbin = NULL; - ndef++; - } - /* Likewise for a missing value */ + /* If the column has a "missing" value, put it in the attrmiss array */ if (attp->atthasmissing) { Datum missingval; @@ -648,7 +635,7 @@ RelationBuildTupleDesc(Relation relation) table_close(pg_attribute_desc, AccessShareLock); if (need != 0) - elog(ERROR, "catalog is missing %d attribute(s) for relid %u", + elog(ERROR, "pg_attribute catalog is missing %d attribute(s) for relation OID %u", need, RelationGetRelid(relation)); /* @@ -680,33 +667,19 @@ RelationBuildTupleDesc(Relation relation) constr->has_generated_stored || ndef > 0 || attrmiss || - relation->rd_rel->relchecks) + relation->rd_rel->relchecks > 0) { relation->rd_att->constr = constr; if (ndef > 0) /* DEFAULTs */ - { - if (ndef < RelationGetNumberOfAttributes(relation)) - constr->defval = (AttrDefault *) - repalloc(attrdef, ndef * sizeof(AttrDefault)); - else - constr->defval = attrdef; - constr->num_defval = ndef; - AttrDefaultFetch(relation); - } + AttrDefaultFetch(relation, ndef); else constr->num_defval = 0; constr->missing = attrmiss; if (relation->rd_rel->relchecks > 0) /* CHECKs */ - { - constr->num_check = relation->rd_rel->relchecks; - constr->check = (ConstrCheck *) - MemoryContextAllocZero(CacheMemoryContext, - constr->num_check * sizeof(ConstrCheck)); CheckConstraintFetch(relation); - } else constr->num_check = 0; } @@ -4251,21 +4224,29 @@ GetPgIndexDescriptor(void) /* * Load any default attribute value definitions for the relation. + * + * ndef is the number of attributes that were marked atthasdef. + * + * Note: we don't make it a hard error to be missing some pg_attrdef records. + * We can limp along as long as nothing needs to use the default value. Code + * that fails to find an expected AttrDefault record should throw an error. */ static void -AttrDefaultFetch(Relation relation) +AttrDefaultFetch(Relation relation, int ndef) { - AttrDefault *attrdef = relation->rd_att->constr->defval; - int ndef = relation->rd_att->constr->num_defval; + AttrDefault *attrdef; Relation adrel; SysScanDesc adscan; ScanKeyData skey; HeapTuple htup; - Datum val; - bool isnull; - int found; - int i; + int found = 0; + + /* Allocate array with room for as many entries as expected */ + attrdef = (AttrDefault *) + MemoryContextAllocZero(CacheMemoryContext, + ndef * sizeof(AttrDefault)); + /* Search pg_attrdef for relevant entries */ ScanKeyInit(&skey, Anum_pg_attrdef_adrelid, BTEqualStrategyNumber, F_OIDEQ, @@ -4274,65 +4255,94 @@ AttrDefaultFetch(Relation relation) adrel = table_open(AttrDefaultRelationId, AccessShareLock); adscan = systable_beginscan(adrel, AttrDefaultIndexId, true, NULL, 1, &skey); - found = 0; while (HeapTupleIsValid(htup = systable_getnext(adscan))) { Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup); - Form_pg_attribute attr = TupleDescAttr(relation->rd_att, adform->adnum - 1); + Datum val; + bool isnull; - for (i = 0; i < ndef; i++) + /* protect limited size of array */ + if (found >= ndef) { - if (adform->adnum != attrdef[i].adnum) - continue; - if (attrdef[i].adbin != NULL) - elog(WARNING, "multiple attrdef records found for attr %s of rel %s", - NameStr(attr->attname), - RelationGetRelationName(relation)); - else - found++; - - val = fastgetattr(htup, - Anum_pg_attrdef_adbin, - adrel->rd_att, &isnull); - if (isnull) - elog(WARNING, "null adbin for attr %s of rel %s", - NameStr(attr->attname), - RelationGetRelationName(relation)); - else - { - /* detoast and convert to cstring in caller's context */ - char *s = TextDatumGetCString(val); - - attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext, s); - pfree(s); - } + elog(WARNING, "unexpected pg_attrdef record found for attribute %d of relation \"%s\"", + adform->adnum, RelationGetRelationName(relation)); break; } - if (i >= ndef) - elog(WARNING, "unexpected attrdef record found for attr %d of rel %s", + val = fastgetattr(htup, + Anum_pg_attrdef_adbin, + adrel->rd_att, &isnull); + if (isnull) + elog(WARNING, "null adbin for attribute %d of relation \"%s\"", adform->adnum, RelationGetRelationName(relation)); + else + { + /* detoast and convert to cstring in caller's context */ + char *s = TextDatumGetCString(val); + + attrdef[found].adnum = adform->adnum; + attrdef[found].adbin = MemoryContextStrdup(CacheMemoryContext, s); + pfree(s); + found++; + } } systable_endscan(adscan); table_close(adrel, AccessShareLock); + + if (found != ndef) + elog(WARNING, "%d pg_attrdef record(s) missing for relation \"%s\"", + ndef - found, RelationGetRelationName(relation)); + + /* + * Sort the AttrDefault entries by adnum, for the convenience of + * equalTupleDescs(). (Usually, they already will be in order, but this + * might not be so if systable_getnext isn't using an index.) + */ + if (found > 1) + qsort(attrdef, found, sizeof(AttrDefault), AttrDefaultCmp); + + /* Install array only after it's fully valid */ + relation->rd_att->constr->defval = attrdef; + relation->rd_att->constr->num_defval = found; +} + +/* + * qsort comparator to sort AttrDefault entries by adnum + */ +static int +AttrDefaultCmp(const void *a, const void *b) +{ + const AttrDefault *ada = (const AttrDefault *) a; + const AttrDefault *adb = (const AttrDefault *) b; + + return ada->adnum - adb->adnum; } /* * Load any check constraints for the relation. + * + * As with defaults, if we don't find the expected number of them, just warn + * here. The executor should throw an error if an INSERT/UPDATE is attempted. */ static void CheckConstraintFetch(Relation relation) { - ConstrCheck *check = relation->rd_att->constr->check; - int ncheck = relation->rd_att->constr->num_check; + ConstrCheck *check; + int ncheck = relation->rd_rel->relchecks; Relation conrel; SysScanDesc conscan; ScanKeyData skey[1]; HeapTuple htup; int found = 0; + /* Allocate array with room for as many entries as expected */ + check = (ConstrCheck *) + MemoryContextAllocZero(CacheMemoryContext, + ncheck * sizeof(ConstrCheck)); + + /* Search pg_constraint for relevant entries */ ScanKeyInit(&skey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, @@ -4347,15 +4357,18 @@ CheckConstraintFetch(Relation relation) Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup); Datum val; bool isnull; - char *s; /* We want check constraints only */ if (conform->contype != CONSTRAINT_CHECK) continue; + /* protect limited size of array */ if (found >= ncheck) - elog(ERROR, "unexpected constraint record found for rel %s", + { + elog(WARNING, "unexpected pg_constraint record found for relation \"%s\"", RelationGetRelationName(relation)); + break; + } check[found].ccvalid = conform->convalidated; check[found].ccnoinherit = conform->connoinherit; @@ -4367,27 +4380,36 @@ CheckConstraintFetch(Relation relation) Anum_pg_constraint_conbin, conrel->rd_att, &isnull); if (isnull) - elog(ERROR, "null conbin for rel %s", + elog(WARNING, "null conbin for relation \"%s\"", RelationGetRelationName(relation)); + else + { + /* detoast and convert to cstring in caller's context */ + char *s = TextDatumGetCString(val); - /* detoast and convert to cstring in caller's context */ - s = TextDatumGetCString(val); - check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s); - pfree(s); - - found++; + check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s); + pfree(s); + found++; + } } systable_endscan(conscan); table_close(conrel, AccessShareLock); if (found != ncheck) - elog(ERROR, "%d constraint record(s) missing for rel %s", + elog(WARNING, "%d pg_constraint record(s) missing for relation \"%s\"", ncheck - found, RelationGetRelationName(relation)); - /* Sort the records so that CHECKs are applied in a deterministic order */ - if (ncheck > 1) - qsort(check, ncheck, sizeof(ConstrCheck), CheckConstraintCmp); + /* + * Sort the records by name. This ensures that CHECKs are applied in a + * deterministic order, and it also makes equalTupleDescs() faster. + */ + if (found > 1) + qsort(check, found, sizeof(ConstrCheck), CheckConstraintCmp); + + /* Install array only after it's fully valid */ + relation->rd_att->constr->check = check; + relation->rd_att->constr->num_check = found; } /* |